This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new c206c0748a [#9074] improve(catalog-lakehouse-generic): Refactor the
code structure of the generic lakehouse catalog (#9160)
c206c0748a is described below
commit c206c0748a9e6b0a96b9eb68e593ef87df3d8e23
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Nov 26 11:22:05 2025 +0800
[#9074] improve(catalog-lakehouse-generic): Refactor the code structure of
the generic lakehouse catalog (#9160)
### What changes were proposed in this pull request?
Refactor the code of lakehouse-generic catalog.
### Why are the changes needed?
Fix: #9074
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs.
---
.../main/java/org/apache/gravitino/Catalog.java | 14 +
api/src/main/java/org/apache/gravitino/Schema.java | 13 +
build.gradle.kts | 2 +-
.../GenericLakehouseCatalogOperations.java | 487 ---------------------
.../GenericLakehouseTablePropertiesMetadata.java | 81 ----
.../lakehouse/LakehouseCatalogOperations.java | 39 --
.../catalog/lakehouse/LakehouseTableFormat.java | 41 --
.../lakehouse/lance/LanceCatalogOperations.java | 299 -------------
.../catalog/lakehouse/utils/EntityConverter.java | 42 --
.../lakehouse/utils/TestEntityConverter.java | 78 ----
.../build.gradle.kts | 7 +-
.../catalog/lakehouse/generic/GenericCatalog.java} | 27 +-
.../generic/GenericCatalogCapability.java} | 13 +-
.../generic/GenericCatalogOperations.java | 363 +++++++++++++++
.../generic/GenericCatalogPropertiesMetadata.java} | 21 +-
.../generic/GenericSchemaPropertiesMetadata.java} | 21 +-
.../generic/GenericTablePropertiesMetadata.java} | 39 +-
.../lakehouse/generic/LakehouseTableDelegator.java | 60 +++
.../generic/LakehouseTableDelegatorFactory.java | 65 +++
.../lakehouse/lance/LanceTableDelegator.java | 68 +++
.../lakehouse/lance/LanceTableOperations.java | 282 ++++++++++++
.../services/org.apache.gravitino.CatalogProvider | 2 +-
...alog.lakehouse.generic.LakehouseTableDelegator} | 2 +-
.../src/main/resources/lakehouse-generic.conf} | 0
.../generic/TestGenericCatalogOperations.java} | 8 +-
.../lakehouse/generic}/TestPropertiesMetadata.java | 52 +--
.../test/CatalogGenericCatalogLanceIT.java} | 14 +-
.../catalog/TableOperationDispatcher.java | 123 ++----
.../connector/GenericLakehouseColumn.java | 57 ---
.../gravitino/connector/GenericLakehouseTable.java | 69 ---
lance/lance-common/build.gradle.kts | 10 +
.../GravitinoLanceNameSpaceOperations.java | 2 +-
.../gravitino/GravitinoLanceNamespaceWrapper.java | 2 +-
.../gravitino/GravitinoLanceTableOperations.java | 3 +-
lance/lance-rest-server/build.gradle.kts | 12 +-
.../lance/integration/test/LanceRESTServiceIT.java | 14 +-
settings.gradle.kts | 2 +-
37 files changed, 1027 insertions(+), 1407 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/Catalog.java
b/api/src/main/java/org/apache/gravitino/Catalog.java
index 9fbf9f3ac5..680c4c6b1a 100644
--- a/api/src/main/java/org/apache/gravitino/Catalog.java
+++ b/api/src/main/java/org/apache/gravitino/Catalog.java
@@ -132,6 +132,20 @@ public interface Catalog extends Auditable {
/** The property indicates the catalog is in use. */
String PROPERTY_IN_USE = "in-use";
+ /**
+ * The property name for the catalog location. This property indicates the
physical location of
+ * the catalog's data, such as a file path or a URI.
+ *
+ * <p>The location property is optional, it can be specified when creating
the catalog.
+ *
+ * <p>It depends on the catalog implementation to decide whether to leverage
this property. It
+ * also depends on the catalog implementation to decide whether to allow
altering this property
+ * after catalog creation.
+ *
+ * <p>The behavior of altering this property (moving the catalog data) is
also catalog specific.
+ */
+ String PROPERTY_LOCATION = "location";
+
/**
* The property to specify the cloud that the catalog is running on. The
value should be one of
* the {@link CloudName}.
diff --git a/api/src/main/java/org/apache/gravitino/Schema.java
b/api/src/main/java/org/apache/gravitino/Schema.java
index 292ed59e52..c42d760011 100644
--- a/api/src/main/java/org/apache/gravitino/Schema.java
+++ b/api/src/main/java/org/apache/gravitino/Schema.java
@@ -37,6 +37,19 @@ import org.apache.gravitino.tag.SupportsTags;
@Evolving
public interface Schema extends Auditable {
+ /**
+ * The property name for the location of the schema. This property indicates
the physical storage
+ * location of the schema, such as a directory path in a file system or a
URI.
+ *
+ * <p>This property is optional, and it can be specified during schema
creation.
+ *
+ * <p>It depends on the catalog implementation to decide whether to use this
property or not. It
+ * also depends on the catalog implementation to decide whether this
property can be altered after
+ * schema creation. Besides, the behavior of altering this property (moving
data or not) is also
+ * determined by the catalog implementation.
+ */
+ String PROPERTY_LOCATION = "location";
+
/**
* @return The name of the Schema.
*/
diff --git a/build.gradle.kts b/build.gradle.kts
index 4ad385c5cb..d805b00002 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -952,7 +952,7 @@ tasks {
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
":catalogs:catalog-lakehouse-paimon:copyLibAndConfig",
":catalogs:catalog-model:copyLibAndConfig",
- ":catalogs:catalog-generic-lakehouse:copyLibAndConfig"
+ ":catalogs:catalog-lakehouse-generic:copyLibAndConfig"
)
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
deleted file mode 100644
index d65107fe7e..0000000000
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ /dev/null
@@ -1,487 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.catalog.lakehouse;
-
-import static org.apache.gravitino.Entity.EntityType.TABLE;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.time.Instant;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.Catalog;
-import org.apache.gravitino.Entity;
-import org.apache.gravitino.EntityAlreadyExistsException;
-import org.apache.gravitino.EntityStore;
-import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
-import org.apache.gravitino.Schema;
-import org.apache.gravitino.SchemaChange;
-import org.apache.gravitino.catalog.ManagedSchemaOperations;
-import org.apache.gravitino.catalog.lakehouse.lance.LanceCatalogOperations;
-import org.apache.gravitino.catalog.lakehouse.utils.EntityConverter;
-import org.apache.gravitino.connector.CatalogInfo;
-import org.apache.gravitino.connector.CatalogOperations;
-import org.apache.gravitino.connector.GenericLakehouseTable;
-import org.apache.gravitino.connector.HasPropertyMetadata;
-import org.apache.gravitino.connector.SupportsSchemas;
-import org.apache.gravitino.exceptions.NoSuchCatalogException;
-import org.apache.gravitino.exceptions.NoSuchEntityException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NoSuchTableException;
-import org.apache.gravitino.exceptions.NonEmptySchemaException;
-import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
-import org.apache.gravitino.exceptions.TableAlreadyExistsException;
-import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.meta.SchemaEntity;
-import org.apache.gravitino.meta.TableEntity;
-import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
-import org.apache.gravitino.rel.TableCatalog;
-import org.apache.gravitino.rel.TableChange;
-import org.apache.gravitino.rel.expressions.distributions.Distribution;
-import org.apache.gravitino.rel.expressions.sorts.SortOrder;
-import org.apache.gravitino.rel.expressions.transforms.Transform;
-import org.apache.gravitino.rel.indexes.Index;
-import org.apache.gravitino.storage.IdGenerator;
-import org.apache.gravitino.utils.PrincipalUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Operations for interacting with a generic lakehouse catalog in Apache
Gravitino.
- *
- * <p>This catalog provides a unified interface for managing lakehouse table
formats. It is designed
- * to be extensible and can support various table formats through a common
interface.
- */
-public class GenericLakehouseCatalogOperations
- implements CatalogOperations, SupportsSchemas, TableCatalog {
- private static final Logger LOG =
- LoggerFactory.getLogger(GenericLakehouseCatalogOperations.class);
-
- private static final String SLASH = "/";
-
- private final ManagedSchemaOperations managedSchemaOps;
-
- private Optional<Path> catalogLakehouseLocation;
-
- private static final Map<LakehouseTableFormat, LakehouseCatalogOperations>
SUPPORTED_FORMATS =
- Maps.newConcurrentMap();
-
- private Map<String, String> catalogConfig;
- private CatalogInfo catalogInfo;
- private HasPropertyMetadata propertiesMetadata;
- private EntityStore store;
- /**
- * Initializes the generic lakehouse catalog operations with the provided
configuration.
- *
- * @param conf The configuration map for the generic catalog operations.
- * @param info The catalog info associated with this operation instance.
- * @param propertiesMetadata The properties metadata of generic lakehouse
catalog.
- * @throws RuntimeException if initialization fails.
- */
- @Override
- public void initialize(
- Map<String, String> conf, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
- throws RuntimeException {
- String catalogLocation =
- (String)
- propertiesMetadata
- .catalogPropertiesMetadata()
- .getOrDefault(conf,
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION);
- this.catalogLakehouseLocation =
- StringUtils.isNotBlank(catalogLocation)
- ?
Optional.of(catalogLocation).map(this::ensureTrailingSlash).map(Path::new)
- : Optional.empty();
- this.store = GravitinoEnv.getInstance().entityStore();
- this.catalogConfig = conf;
- this.catalogInfo = info;
- this.propertiesMetadata = propertiesMetadata;
- }
-
- public GenericLakehouseCatalogOperations() {
- this(GravitinoEnv.getInstance().entityStore());
- }
-
- @VisibleForTesting
- GenericLakehouseCatalogOperations(EntityStore store) {
- this.managedSchemaOps =
- new ManagedSchemaOperations() {
- @Override
- protected EntityStore store() {
- return store;
- }
- };
- }
-
- @Override
- public void close() {}
-
- @Override
- public void testConnection(
- NameIdentifier catalogIdent,
- Catalog.Type type,
- String provider,
- String comment,
- Map<String, String> properties) {
- // No-op for generic lakehouse catalog.
- }
-
- @Override
- public NameIdentifier[] listSchemas(Namespace namespace) throws
NoSuchCatalogException {
- return managedSchemaOps.listSchemas(namespace);
- }
-
- @Override
- public Schema createSchema(NameIdentifier ident, String comment, Map<String,
String> properties)
- throws NoSuchCatalogException, SchemaAlreadyExistsException {
- return managedSchemaOps.createSchema(ident, comment, properties);
- }
-
- @Override
- public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
- return managedSchemaOps.loadSchema(ident);
- }
-
- @Override
- public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
- throws NoSuchSchemaException {
- return managedSchemaOps.alterSchema(ident, changes);
- }
-
- @Override
- public boolean dropSchema(NameIdentifier ident, boolean cascade) throws
NonEmptySchemaException {
- return managedSchemaOps.dropSchema(ident, cascade);
- }
-
- // ==================== Table Operations (In Development)
====================
- // TODO: Implement table operations in subsequent releases
- // See: https://github.com/apache/gravitino/issues/8838
- // The table operations will delegate to format-specific implementations
- // (e.g., LanceCatalogOperations for Lance tables)
-
- @Override
- public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
- NameIdentifier identifier = NameIdentifier.of(namespace.levels());
- try {
- store.get(identifier, Entity.EntityType.SCHEMA, SchemaEntity.class);
- } catch (NoSuchEntityException e) {
- throw new NoSuchSchemaException(e, "Schema %s does not exist",
namespace);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to get schema " + identifier);
- }
-
- try {
- List<TableEntity> tableEntityList = store.list(namespace,
TableEntity.class, TABLE);
- return tableEntityList.stream()
- .map(e -> NameIdentifier.of(namespace, e.name()))
- .toArray(NameIdentifier[]::new);
- } catch (IOException e) {
- throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
- }
- }
-
- @Override
- public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
- try {
- TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
- return GenericLakehouseTable.builder()
- .withProperties(tableEntity.properties())
- .withAuditInfo(tableEntity.auditInfo())
- .withSortOrders(tableEntity.sortOrders())
- .withPartitioning(tableEntity.partitioning())
- .withDistribution(tableEntity.distribution())
- .withColumns(EntityConverter.toColumns(tableEntity.columns()))
- .withIndexes(tableEntity.indexes())
- .withName(tableEntity.name())
- .withComment(tableEntity.comment())
- .build();
- } catch (NoSuchEntityException e) {
- throw new NoSuchTableException(e, "Table %s does not exist", ident);
- } catch (IOException e) {
- throw new RuntimeException("Failed to list tables under schema " +
ident.namespace(), e);
- }
- }
-
- @Override
- public Table createTable(
- NameIdentifier ident,
- Column[] columns,
- String comment,
- Map<String, String> properties,
- Transform[] partitions,
- Distribution distribution,
- SortOrder[] sortOrders,
- Index[] indexes)
- throws NoSuchSchemaException, TableAlreadyExistsException {
- Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
-
- String tableLocation = calculateTableLocation(schema, ident, properties);
- Map<String, String> tableStorageProps = calculateTableStorageProps(schema,
properties);
-
- Map<String, String> newProperties = Maps.newHashMap(properties);
-
newProperties.put(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION,
tableLocation);
- newProperties.putAll(tableStorageProps);
-
- AuditInfo auditInfo =
- AuditInfo.builder()
- .withCreator(PrincipalUtils.getCurrentUserName())
- .withCreateTime(Instant.now())
- .build();
- IdGenerator idGenerator = GravitinoEnv.getInstance().idGenerator();
- List<ColumnEntity> columnEntityList =
- IntStream.range(0, columns.length)
- .mapToObj(
- i -> ColumnEntity.toColumnEntity(columns[i], i,
idGenerator.nextId(), auditInfo))
- .collect(Collectors.toList());
-
- try {
- TableEntity entityToStore =
- TableEntity.builder()
- .withName(ident.name())
- .withNamespace(ident.namespace())
- .withColumns(columnEntityList)
- .withProperties(newProperties)
- .withComment(comment)
- .withPartitioning(partitions)
- .withSortOrders(sortOrders)
- .withDistribution(distribution)
- .withIndexes(indexes)
- .withId(idGenerator.nextId())
- .withAuditInfo(auditInfo)
- .build();
- store.put(entityToStore);
- } catch (EntityAlreadyExistsException e) {
- throw new TableAlreadyExistsException(e, "Table %s already exists in the
metadata", ident);
- } catch (IOException e) {
- throw new RuntimeException("Failed to create table metadata for " +
ident, e);
- }
-
- // Get the value of register in table properties
- boolean register =
- Boolean.parseBoolean(
- properties.getOrDefault(
- GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_REGISTER,
"false"));
- if (register) {
- // Do not need to create the physical table if this is a registration
operation.
- // Whether we need to check the existence of the physical table?
- GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
- return builder
- .withName(ident.name())
- .withColumns(columns)
- .withComment(comment)
- .withProperties(properties)
- .withDistribution(distribution)
- .withIndexes(indexes)
- .withAuditInfo(
- AuditInfo.builder()
- .withCreator(PrincipalUtils.getCurrentUserName())
- .withCreateTime(Instant.now())
- .build())
- .withPartitioning(partitions)
- .withSortOrders(sortOrders)
- .withFormat(LakehouseTableFormat.LANCE.lowerName())
- .build();
- }
-
- try {
- LakehouseCatalogOperations lanceCatalogOperations =
- getLakehouseCatalogOperations(newProperties);
- return lanceCatalogOperations.createTable(
- ident, columns, comment, newProperties, partitions, distribution,
sortOrders, indexes);
- } catch (Exception e) {
- // Try to roll back the metadata entry in Gravitino store
- try {
- store.delete(ident, Entity.EntityType.TABLE);
- } catch (IOException ioException) {
- LOG.error(
- "Failed to roll back the metadata entry for table {} after
physical table creation failure.",
- ident,
- ioException);
- }
- if (e.getClass().isAssignableFrom(RuntimeException.class)) {
- throw e;
- }
-
- throw new RuntimeException("Failed to create table " + ident, e);
- }
- }
-
- private String calculateTableLocation(
- Schema schema, NameIdentifier tableIdent, Map<String, String>
tableProperties) {
- String tableLocation =
- (String)
- propertiesMetadata
- .tablePropertiesMetadata()
- .getOrDefault(
- tableProperties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
- if (StringUtils.isNotBlank(tableLocation)) {
- return ensureTrailingSlash(tableLocation);
- }
-
- String schemaLocation =
- schema.properties() == null
- ? null
- :
schema.properties().get(GenericLakehouseSchemaPropertiesMetadata.LAKEHOUSE_LOCATION);
-
- // If we do not set location in table properties, and schema location is
set, use schema
- // location as the base path.
- if (StringUtils.isNotBlank(schemaLocation)) {
- return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
- }
-
- // If the schema location is not set, use catalog lakehouse dir as the
base path. Or else, throw
- // an exception.
- if (catalogLakehouseLocation.isEmpty()) {
- throw new RuntimeException(
- String.format(
- "No location specified for table %s, you need to set location
either in catalog, schema, or table properties",
- tableIdent));
- }
-
- String catalogLakehousePath = catalogLakehouseLocation.get().toString();
- String[] nsLevels = tableIdent.namespace().levels();
- String schemaName = nsLevels[nsLevels.length - 1];
- return ensureTrailingSlash(catalogLakehousePath)
- + schemaName
- + SLASH
- + tableIdent.name()
- + SLASH;
- }
-
- @Override
- public Table alterTable(NameIdentifier ident, TableChange... changes)
- throws NoSuchTableException, IllegalArgumentException {
- try {
- TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
- Map<String, String> tableProperties = tableEntity.properties();
- LakehouseCatalogOperations lakehouseCatalogOperations =
- getLakehouseCatalogOperations(tableProperties);
- return lakehouseCatalogOperations.alterTable(ident, changes);
- } catch (IOException e) {
- throw new RuntimeException("Failed to alter table " + ident, e);
- }
- }
-
- @Override
- public boolean purgeTable(NameIdentifier ident) {
- try {
- TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
- LakehouseCatalogOperations lakehouseCatalogOperations =
- getLakehouseCatalogOperations(tableEntity.properties());
- return lakehouseCatalogOperations.purgeTable(ident);
- } catch (NoSuchTableException e) {
- LOG.warn("Table {} does not exist, skip purging it.", ident);
- return false;
- } catch (IOException e) {
- throw new RuntimeException("Failed to purge table: " + ident, e);
- }
- }
-
- @Override
- public boolean dropTable(NameIdentifier ident) throws
UnsupportedOperationException {
- try {
- // Only delete the metadata entry here. The physical data will not be
deleted.
- if (!tableExists(ident)) {
- return false;
- }
- return store.delete(ident, Entity.EntityType.TABLE);
- } catch (IOException e) {
- throw new RuntimeException("Failed to drop table " + ident, e);
- }
- }
-
- private LakehouseCatalogOperations getLakehouseCatalogOperations(
- Map<String, String> tableProperties) {
- LakehouseTableFormat format =
- (LakehouseTableFormat)
- propertiesMetadata
- .tablePropertiesMetadata()
- .getOrDefault(
- tableProperties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
-
- return SUPPORTED_FORMATS.compute(
- format,
- (k, v) ->
- v == null
- ? createLakehouseCatalogOperations(
- format, tableProperties, catalogInfo, propertiesMetadata)
- : v);
- }
-
- private String ensureTrailingSlash(String path) {
- return path.endsWith(SLASH) ? path : path + SLASH;
- }
-
- private LakehouseCatalogOperations createLakehouseCatalogOperations(
- LakehouseTableFormat format,
- Map<String, String> properties,
- CatalogInfo catalogInfo,
- HasPropertyMetadata propertiesMetadata) {
- LakehouseCatalogOperations operations;
- switch (format) {
- case LANCE:
- operations = new LanceCatalogOperations();
- break;
- default:
- throw new UnsupportedOperationException("Unsupported lakehouse format:
" + format);
- }
-
- operations.initialize(properties, catalogInfo, propertiesMetadata);
- return operations;
- }
-
- /**
- * Calculate the table storage properties by merging catalog config, schema
properties and table
- * properties. The precedence is: table properties > schema properties >
catalog config.
- *
- * @param schema The schema of the table.
- * @param tableProps The table properties.
- * @return The merged table storage properties.
- */
- private Map<String, String> calculateTableStorageProps(
- Schema schema, Map<String, String> tableProps) {
- Map<String, String> storageProps =
getLanceTableStorageOptions(catalogConfig);
- storageProps.putAll(getLanceTableStorageOptions(schema.properties()));
- storageProps.putAll(getLanceTableStorageOptions(tableProps));
- return storageProps;
- }
-
- private Map<String, String> getLanceTableStorageOptions(Map<String, String>
properties) {
- if (MapUtils.isEmpty(properties)) {
- return Maps.newHashMap();
- }
- return properties.entrySet().stream()
- .filter(
- e ->
- e.getKey()
- .startsWith(
-
GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- }
-}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
deleted file mode 100644
index 72c1e5bc57..0000000000
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.catalog.lakehouse;
-
-import static
org.apache.gravitino.connector.PropertyEntry.booleanPropertyEntry;
-import static org.apache.gravitino.connector.PropertyEntry.enumPropertyEntry;
-import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-import org.apache.gravitino.connector.BasePropertiesMetadata;
-import org.apache.gravitino.connector.PropertyEntry;
-
-public class GenericLakehouseTablePropertiesMetadata extends
BasePropertiesMetadata {
- public static final String LAKEHOUSE_LOCATION = "location";
- public static final String LAKEHOUSE_FORMAT = "format";
- public static final String LANCE_TABLE_STORAGE_OPTION_PREFIX =
"lance.storage.";
- public static final String LAKEHOUSE_REGISTER = "register";
-
- private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
-
- static {
- List<PropertyEntry<?>> propertyEntries =
- ImmutableList.of(
- stringOptionalPropertyEntry(
- LAKEHOUSE_LOCATION,
- "The root directory of the lakehouse catalog.",
- false /* immutable */,
- null, /* defaultValue */
- false /* hidden */),
- enumPropertyEntry(
- LAKEHOUSE_FORMAT,
- "The table format of the lakehouse table (e.g., iceberg,
delta, lance)",
- true /* required */,
- true /* immutable */,
- LakehouseTableFormat.class /* enumClass */,
- null /* defaultValue */,
- false /* hidden */,
- false /* reserved */),
- PropertyEntry.stringOptionalPropertyPrefixEntry(
- LANCE_TABLE_STORAGE_OPTION_PREFIX,
- "The storage options passed to Lance table.",
- false /* immutable */,
- null /* default value*/,
- false /* hidden */,
- false /* reserved */),
- booleanPropertyEntry(
- LAKEHOUSE_REGISTER,
- "Whether this is a table registration operation.",
- false,
- true /* immutable */,
- false /* defaultValue */,
- false /* hidden */,
- false));
-
- PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
- }
-
- @Override
- protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
- return PROPERTIES_METADATA;
- }
-}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
deleted file mode 100644
index d5b95845db..0000000000
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse;
-
-import org.apache.gravitino.connector.CatalogOperations;
-import org.apache.gravitino.rel.TableCatalog;
-
-/**
- * Interface for detailed lakehouse catalog operations, combining catalog
operations and table
- * catalog. {@link GenericLakehouseCatalog} will try to use this interface to
provide detailed
- * lakehouse catalog operations.
- *
- * <pre>
- * GenericLakehouseCatalog.createTable()
- * -> LakehouseCatalogOperations.createTable()
- * -> LanceTableOperations.createTable()
- * -> IcebergTableOperations.createTable()
- * -> DeltaTableOperations.createTable()
- * ...
- * </pre>
- */
-public interface LakehouseCatalogOperations extends CatalogOperations,
TableCatalog {}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
deleted file mode 100644
index 57d0230f48..0000000000
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse;
-
-public enum LakehouseTableFormat {
- LANCE,
-
- DELTA,
-
- ICEBERG;
-
- public String lowerName() {
- return this.name().toLowerCase();
- }
-
- public static LakehouseTableFormat fromFormatName(String type) {
- for (LakehouseTableFormat tableType : LakehouseTableFormat.values()) {
- if (tableType.name().equalsIgnoreCase(type)) {
- return tableType;
- }
- }
- throw new IllegalArgumentException("Unknown LakehouseTableFormat: " +
type);
- }
-}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
deleted file mode 100644
index 133644dccc..0000000000
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse.lance;
-
-import static org.apache.gravitino.Entity.EntityType.TABLE;
-
-import com.google.common.collect.Lists;
-import com.lancedb.lance.Dataset;
-import com.lancedb.lance.WriteParams;
-import com.lancedb.lance.index.DistanceType;
-import com.lancedb.lance.index.IndexParams;
-import com.lancedb.lance.index.IndexType;
-import com.lancedb.lance.index.vector.VectorIndexParams;
-import java.io.IOException;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.gravitino.Catalog;
-import org.apache.gravitino.Entity;
-import org.apache.gravitino.EntityStore;
-import org.apache.gravitino.GravitinoEnv;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
-import
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata;
-import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
-import org.apache.gravitino.catalog.lakehouse.LakehouseTableFormat;
-import org.apache.gravitino.catalog.lakehouse.utils.EntityConverter;
-import org.apache.gravitino.connector.CatalogInfo;
-import org.apache.gravitino.connector.GenericLakehouseTable;
-import org.apache.gravitino.connector.HasPropertyMetadata;
-import org.apache.gravitino.exceptions.NoSuchEntityException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NoSuchTableException;
-import org.apache.gravitino.exceptions.TableAlreadyExistsException;
-import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter;
-import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
-import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.TableEntity;
-import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
-import org.apache.gravitino.rel.TableChange;
-import org.apache.gravitino.rel.expressions.distributions.Distribution;
-import org.apache.gravitino.rel.expressions.sorts.SortOrder;
-import org.apache.gravitino.rel.expressions.transforms.Transform;
-import org.apache.gravitino.rel.indexes.Index;
-import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
-import org.apache.gravitino.utils.PrincipalUtils;
-
-public class LanceCatalogOperations implements LakehouseCatalogOperations {
-
- private EntityStore store;
-
- @Override
- public void initialize(
- Map<String, String> config, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
- throws RuntimeException {
- store = GravitinoEnv.getInstance().entityStore();
- }
-
- @Override
- public void testConnection(
- NameIdentifier catalogIdent,
- Catalog.Type type,
- String provider,
- String comment,
- Map<String, String> properties)
- throws Exception {}
-
- @Override
- public void close() throws IOException {}
-
- @Override
- public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
- // No need to do nothing here as GenericLakehouseCatalogOperations will do
the work.
- throw new UnsupportedOperationException(
- "We should not reach here as we could get table info" + "from
metastore directly.");
- }
-
- @Override
- public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
- // No need to do nothing here as GenericLakehouseCatalogOperations will do
the work.
- throw new UnsupportedOperationException(
- "We should not reach here as we could get table info" + "from
metastore directly.");
- }
-
- @Override
- public Table createTable(
- NameIdentifier ident,
- Column[] columns,
- String comment,
- Map<String, String> properties,
- Transform[] partitions,
- Distribution distribution,
- SortOrder[] sortOrders,
- Index[] indexes)
- throws NoSuchSchemaException, TableAlreadyExistsException {
- // Ignore partitions, distributions, sortOrders, and indexes for Lance
tables;
- String location =
properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
- Map<String, String> storageProps =
LancePropertiesUtils.getLanceStorageOptions(properties);
-
- try (Dataset ignored =
- Dataset.create(
- new RootAllocator(),
- location,
- convertColumnsToArrowSchema(columns),
- new
WriteParams.Builder().withStorageOptions(storageProps).build())) {
- GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
- return builder
- .withName(ident.name())
- .withColumns(columns)
- .withComment(comment)
- .withProperties(properties)
- .withDistribution(distribution)
- .withIndexes(indexes)
- .withAuditInfo(
- AuditInfo.builder()
- .withCreator(PrincipalUtils.getCurrentUserName())
- .withCreateTime(Instant.now())
- .build())
- .withPartitioning(partitions)
- .withSortOrders(sortOrders)
- .withFormat(LakehouseTableFormat.LANCE.lowerName())
- .build();
- }
- }
-
- private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Column[] columns) {
- List<Field> fields =
- Arrays.stream(columns)
- .map(
- col ->
- LanceDataTypeConverter.CONVERTER.toArrowField(
- col.name(), col.dataType(), col.nullable()))
- .collect(Collectors.toList());
- return new org.apache.arrow.vector.types.pojo.Schema(fields);
- }
-
- @Override
- public Table alterTable(NameIdentifier ident, TableChange... changes)
- throws NoSuchTableException, IllegalArgumentException {
- // Lance only supports adding indexes for now.
- List<Index> addedIndexes = Lists.newArrayList();
-
- // Only support for adding index for now.
- for (TableChange change : changes) {
- if (change instanceof TableChange.AddIndex addIndexChange) {
- Index index =
- IndexImpl.builder()
- .withIndexType(addIndexChange.getType())
- .withName(addIndexChange.getName())
- .withFieldNames(addIndexChange.getFieldNames())
- .build();
- addedIndexes.add(index);
- }
- }
-
- TableEntity updatedEntity;
- try {
- TableEntity entity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
- updatedEntity =
- store.update(
- ident,
- TableEntity.class,
- TABLE,
- tableEntity ->
- TableEntity.builder()
- .withId(tableEntity.id())
- .withName(tableEntity.name())
- .withNamespace(tableEntity.namespace())
- .withAuditInfo(
- AuditInfo.builder()
- .withCreator(tableEntity.auditInfo().creator())
-
.withCreateTime(tableEntity.auditInfo().createTime())
-
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
- .withLastModifiedTime(Instant.now())
- .build())
- .withColumns(tableEntity.columns())
- .withIndexes(
- ArrayUtils.addAll(entity.indexes(),
addedIndexes.toArray(new Index[0])))
- .withDistribution(tableEntity.distribution())
- .withPartitioning(tableEntity.partitioning())
- .withSortOrders(tableEntity.sortOrders())
- .withProperties(tableEntity.properties())
- .withComment(tableEntity.comment())
- .build());
-
- // Add indexes to Lance dataset
- addLanceIndex(updatedEntity, addedIndexes);
-
- // return the updated table
- return GenericLakehouseTable.builder()
- .withProperties(updatedEntity.properties())
- .withAuditInfo(updatedEntity.auditInfo())
- .withSortOrders(updatedEntity.sortOrders())
- .withPartitioning(updatedEntity.partitioning())
- .withDistribution(updatedEntity.distribution())
- .withColumns(EntityConverter.toColumns(updatedEntity.columns()))
- .withIndexes(updatedEntity.indexes())
- .withName(updatedEntity.name())
- .withComment(updatedEntity.comment())
- .build();
- } catch (NoSuchEntityException e) {
- throw new NoSuchTableException("No such table: %s", ident);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to load table entity for: " + ident,
ioe);
- }
- }
-
- private void addLanceIndex(TableEntity updatedEntity, List<Index>
addedIndexes) {
- String location =
-
updatedEntity.properties().get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
- try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
- // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
- for (Index index : addedIndexes) {
- IndexType indexType = IndexType.valueOf(index.type().name());
- IndexParams indexParams = getIndexParamsByIndexType(indexType);
-
- dataset.createIndex(
- Arrays.stream(index.fieldNames())
- .map(fieldPath -> String.join(".", fieldPath))
- .collect(Collectors.toList()),
- indexType,
- Optional.of(index.name()),
- indexParams,
- true);
- }
- }
- }
-
- private IndexParams getIndexParamsByIndexType(IndexType indexType) {
- switch (indexType) {
- case SCALAR:
- return IndexParams.builder().build();
- case VECTOR:
- // TODO make these parameters configurable
- int numberOfDimensions = 3; // this value should be determined
dynamically based on the data
- // Add properties to Index to set this value.
- return IndexParams.builder()
- .setVectorIndexParams(
- VectorIndexParams.ivfPq(2, 8, numberOfDimensions,
DistanceType.L2, 2))
- .build();
- default:
- throw new IllegalArgumentException("Unsupported index type: " +
indexType);
- }
- }
-
- @Override
- public boolean purgeTable(NameIdentifier ident) {
- try {
- TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
- Map<String, String> lancePropertiesMap = tableEntity.properties();
- String location =
-
lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
-
- if (!store.delete(ident, Entity.EntityType.TABLE)) {
- throw new RuntimeException("Failed to purge Lance table: " +
ident.name());
- }
-
- // Drop the Lance dataset from cloud storage.
- Dataset.drop(location,
LancePropertiesUtils.getLanceStorageOptions(lancePropertiesMap));
- return true;
- } catch (IOException e) {
- throw new RuntimeException("Failed to purge Lance table: " +
ident.name(), e);
- }
- }
-
- @Override
- public boolean dropTable(NameIdentifier ident) {
- // TODO We will handle it in GenericLakehouseCatalogOperations. However,
we need
- // to figure out it's a external table or not first. we will introduce a
property
- // to indicate that. Currently, all Lance tables are external tables.
`drop` will
- // just remove the metadata in metastore and will not delete data in
storage.
- throw new UnsupportedOperationException(
- "LanceCatalogOperations does not support dropTable operation.");
- }
-}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
deleted file mode 100644
index 734309a444..0000000000
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse.utils;
-
-import java.util.List;
-import org.apache.gravitino.connector.GenericLakehouseColumn;
-import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.rel.Column;
-
-public class EntityConverter {
- public static Column[] toColumns(List<ColumnEntity> columnEntities) {
- return
columnEntities.stream().map(EntityConverter::toColumn).toArray(Column[]::new);
- }
-
- private static Column toColumn(ColumnEntity columnEntity) {
- return GenericLakehouseColumn.builder()
- .withName(columnEntity.name())
- .withComment(columnEntity.comment())
- .withAutoIncrement(columnEntity.autoIncrement())
- .withNullable(columnEntity.nullable())
- .withType(columnEntity.dataType())
- .withDefaultValue(columnEntity.defaultValue())
- .build();
- }
-}
diff --git
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
deleted file mode 100644
index 9da5ed530e..0000000000
---
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.catalog.lakehouse.utils;
-
-import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
-
-import java.util.List;
-import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.rel.types.Types;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestEntityConverter {
-
- @Test
- void testToColumns() {
- AuditInfo auditInfo = AuditInfo.builder().build();
- List<ColumnEntity> columnEntities =
- List.of(
- ColumnEntity.builder()
- .withName("id")
- .withId(1L)
- .withDataType(Types.IntegerType.get())
- .withComment("Identifier")
- .withAutoIncrement(true)
- .withNullable(false)
- .withDefaultValue(DEFAULT_VALUE_NOT_SET)
- .withAuditInfo(auditInfo)
- .withPosition(1)
- .build(),
- ColumnEntity.builder()
- .withName("name")
- .withId(2L)
- .withDataType(Types.StringType.get())
- .withComment("Name of the entity")
- .withAutoIncrement(false)
- .withNullable(true)
- .withDefaultValue(DEFAULT_VALUE_NOT_SET)
- .withPosition(2)
- .withAuditInfo(auditInfo)
- .build());
- var columns = EntityConverter.toColumns(columnEntities);
- Assertions.assertEquals(2, columns.length);
- for (var column : columns) {
- if (column.name().equals("id")) {
- Assertions.assertEquals(Types.IntegerType.get(), column.dataType());
- Assertions.assertEquals("Identifier", column.comment());
- Assertions.assertTrue(column.autoIncrement());
- Assertions.assertFalse(column.nullable());
- Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, column.defaultValue());
- } else if (column.name().equals("name")) {
- Assertions.assertEquals(Types.StringType.get(), column.dataType());
- Assertions.assertEquals("Name of the entity", column.comment());
- Assertions.assertFalse(column.autoIncrement());
- Assertions.assertTrue(column.nullable());
- Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, column.defaultValue());
- }
- }
- }
-}
diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts
b/catalogs/catalog-lakehouse-generic/build.gradle.kts
similarity index 93%
rename from catalogs/catalog-generic-lakehouse/build.gradle.kts
rename to catalogs/catalog-lakehouse-generic/build.gradle.kts
index 7dd0a4bc16..484514bede 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-generic/build.gradle.kts
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-description = "catalog-generic-lakehouse"
+description = "catalog-lakehouse-generic"
plugins {
`maven-publish`
@@ -43,7 +43,6 @@ dependencies {
implementation(libs.commons.io)
implementation(libs.commons.lang3)
implementation(libs.guava)
- implementation(libs.hadoop3.client.api)
implementation(libs.lance)
annotationProcessor(libs.lombok)
@@ -86,12 +85,12 @@ tasks {
exclude("log4j-*.jar")
exclude("slf4j-*.jar")
}
- into("$rootDir/distribution/package/catalogs/generic-lakehouse/libs")
+ into("$rootDir/distribution/package/catalogs/lakehouse-generic/libs")
}
val copyCatalogConfig by registering(Copy::class) {
from("src/main/resources")
- into("$rootDir/distribution/package/catalogs/generic-lakehouse/conf")
+ into("$rootDir/distribution/package/catalogs/lakehouse-generic/conf")
rename { original ->
if (original.endsWith(".template")) {
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalog.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalog.java
similarity index 69%
rename from
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalog.java
rename to
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalog.java
index 68072f55ba..c8000989a4 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalog.java
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalog.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
import java.util.Map;
import org.apache.gravitino.connector.BaseCatalog;
@@ -25,16 +25,16 @@ import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.connector.capability.Capability;
/** Implementation of a generic lakehouse catalog in Apache Gravitino. */
-public class GenericLakehouseCatalog extends
BaseCatalog<GenericLakehouseCatalog> {
+public class GenericCatalog extends BaseCatalog<GenericCatalog> {
- static final GenericLakehouseCatalogPropertiesMetadata
CATALOG_PROPERTIES_METADATA =
- new GenericLakehouseCatalogPropertiesMetadata();
+ private static final GenericCatalogPropertiesMetadata
CATALOG_PROPERTIES_METADATA =
+ new GenericCatalogPropertiesMetadata();
- static final GenericLakehouseSchemaPropertiesMetadata
SCHEMA_PROPERTIES_METADATA =
- new GenericLakehouseSchemaPropertiesMetadata();
+ private static final GenericSchemaPropertiesMetadata
SCHEMA_PROPERTIES_METADATA =
+ new GenericSchemaPropertiesMetadata();
- static final GenericLakehouseTablePropertiesMetadata
TABLE_PROPERTIES_METADATA =
- new GenericLakehouseTablePropertiesMetadata();
+ private static final GenericTablePropertiesMetadata
TABLE_PROPERTIES_METADATA =
+ new GenericTablePropertiesMetadata();
/**
* Returns the short name of the generic lakehouse catalog.
@@ -43,24 +43,23 @@ public class GenericLakehouseCatalog extends
BaseCatalog<GenericLakehouseCatalog
*/
@Override
public String shortName() {
- return "generic-lakehouse";
+ return "lakehouse-generic";
}
/**
- * Creates a new instance of {@link GenericLakehouseCatalogOperations} with
the provided
- * configuration.
+ * Creates a new instance of {@link GenericCatalogOperations} with the
provided configuration.
*
* @param config The configuration map for the generic catalog operations.
- * @return A new instance of {@link GenericLakehouseCatalogOperations}.
+ * @return A new instance of {@link GenericCatalogOperations}.
*/
@Override
protected CatalogOperations newOps(Map<String, String> config) {
- return new GenericLakehouseCatalogOperations();
+ return new GenericCatalogOperations();
}
@Override
public Capability newCapability() {
- return new GenericLakehouseCatalogCapability();
+ return new GenericCatalogCapability();
}
@Override
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogCapability.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
similarity index 69%
rename from
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogCapability.java
rename to
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
index 412b82d6a7..5900d0b51e 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogCapability.java
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
@@ -16,15 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
+import java.util.Objects;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.connector.capability.CapabilityResult;
-public class GenericLakehouseCatalogCapability implements Capability {
+public class GenericCatalogCapability implements Capability {
@Override
public CapabilityResult managedStorage(Scope scope) {
- return CapabilityResult.SUPPORTED;
+ if (Objects.requireNonNull(scope) == Scope.TABLE
+ || Objects.requireNonNull(scope) == Scope.SCHEMA) {
+ return CapabilityResult.SUPPORTED;
+ }
+
+ return CapabilityResult.unsupported(
+ String.format("Generic catalog does not support managed storage for
%s.", scope));
}
}
diff --git
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogOperations.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogOperations.java
new file mode 100644
index 0000000000..e31263fadc
--- /dev/null
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogOperations.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.generic;
+
+import static org.apache.gravitino.Entity.EntityType.TABLE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.connector.CatalogInfo;
+import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.storage.IdGenerator;
+
+/** Operations for interacting with a generic lakehouse catalog in Apache
Gravitino. */
+public class GenericCatalogOperations implements CatalogOperations,
SupportsSchemas, TableCatalog {
+
+ private static final String SLASH = "/";
+
+ private final ManagedSchemaOperations schemaOps;
+
+ private final Map<String, Supplier<ManagedTableOperations>> tableOpsCache;
+
+ private Optional<String> catalogLocation;
+
+ private HasPropertyMetadata propertiesMetadata;
+
+ private final Cache<NameIdentifier, String> tableFormatCache;
+
+ private final EntityStore store;
+
+ public GenericCatalogOperations() {
+ this(GravitinoEnv.getInstance().entityStore(),
GravitinoEnv.getInstance().idGenerator());
+ }
+
+ @VisibleForTesting
+ GenericCatalogOperations(EntityStore store, IdGenerator idGenerator) {
+ this.store = store;
+
+ this.schemaOps =
+ new ManagedSchemaOperations() {
+ @Override
+ protected EntityStore store() {
+ return store;
+ }
+ };
+
+ this.tableFormatCache =
CacheBuilder.newBuilder().maximumSize(1000).build();
+
+ // Initialize all the table operations for different table formats.
+ Map<String, LakehouseTableDelegator> tableDelegators =
+ LakehouseTableDelegatorFactory.tableDelegators();
+ tableOpsCache =
+ Collections.unmodifiableMap(
+ tableDelegators.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ // Lazy initialize the table operations when needed.
+ e -> {
+ LakehouseTableDelegator delegator = e.getValue();
+ return Suppliers.memoize(
+ () -> delegator.createTableOps(store, schemaOps,
idGenerator));
+ })));
+ if (tableOpsCache.isEmpty()) {
+ throw new IllegalArgumentException("No table delegators found, this is
unexpected.");
+ }
+ }
+
+ @Override
+ public void initialize(
+ Map<String, String> conf, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
+ throws RuntimeException {
+ String location =
+ (String)
+ propertiesMetadata
+ .catalogPropertiesMetadata()
+ .getOrDefault(conf, Catalog.PROPERTY_LOCATION);
+ this.catalogLocation =
+ StringUtils.isNotBlank(location)
+ ? Optional.of(location).map(this::ensureTrailingSlash)
+ : Optional.empty();
+ this.propertiesMetadata = propertiesMetadata;
+ }
+
+ @Override
+ public void close() {
+ tableFormatCache.cleanUp();
+ }
+
+ @Override
+ public void testConnection(
+ NameIdentifier catalogIdent,
+ Catalog.Type type,
+ String provider,
+ String comment,
+ Map<String, String> properties) {
+ // No-op for generic catalog.
+ }
+
+ @Override
+ public NameIdentifier[] listSchemas(Namespace namespace) throws
NoSuchCatalogException {
+ return schemaOps.listSchemas(namespace);
+ }
+
+ @Override
+ public Schema createSchema(NameIdentifier ident, String comment, Map<String,
String> properties)
+ throws NoSuchCatalogException, SchemaAlreadyExistsException {
+ return schemaOps.createSchema(ident, comment, properties);
+ }
+
+ @Override
+ public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
+ return schemaOps.loadSchema(ident);
+ }
+
+ @Override
+ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
+ throws NoSuchSchemaException {
+ return schemaOps.alterSchema(ident, changes);
+ }
+
+ @Override
+ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws
NonEmptySchemaException {
+ Namespace tableNs =
+ Namespace.of(ident.namespace().level(0), ident.namespace().level(1),
ident.name());
+ NameIdentifier[] tableIdents;
+ try {
+ tableIdents = listTables(tableNs);
+ } catch (NoSuchSchemaException e) {
+ // If schema does not exist, return false.
+ return false;
+ }
+
+ if (!cascade && tableIdents.length > 0) {
+ throw new NonEmptySchemaException(
+ "Schema %s is not empty, cannot drop it without cascade", ident);
+ }
+
+ // Drop all tables under the schema first if cascade is true.
+ for (NameIdentifier tableIdent : tableIdents) {
+ tableOps(tableIdent).dropTable(tableIdent);
+ }
+
+ return schemaOps.dropSchema(ident, cascade);
+ }
+
+ @Override
+ public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
+ // We get the table operations from any cached table ops, since listing
tables is not
+ // format-specific.
+ ManagedTableOperations tableOps =
tableOpsCache.values().iterator().next().get();
+ return tableOps.listTables(namespace);
+ }
+
+ @Override
+ public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+ Table loadedTable = tableOps(ident).loadTable(ident);
+
+ Optional<String> tableFormat =
+ Optional.ofNullable(
+
loadedTable.properties().getOrDefault(Table.PROPERTY_TABLE_FORMAT, null))
+ .map(s -> s.toLowerCase(Locale.ROOT));
+ tableFormat.ifPresent(s -> tableFormatCache.put(ident, s));
+
+ return loadedTable;
+ }
+
+ @Override
+ public Table createTable(
+ NameIdentifier ident,
+ Column[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders,
+ Index[] indexes)
+ throws NoSuchSchemaException, TableAlreadyExistsException {
+ Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
+ String tableLocation = calculateTableLocation(schema, ident, properties);
+
+ String format = properties.getOrDefault(Table.PROPERTY_TABLE_FORMAT, null);
+ Preconditions.checkArgument(
+ format != null, "Table format must be specified in table properties");
+ format = format.toLowerCase(Locale.ROOT);
+
+ Map<String, String> newProperties = Maps.newHashMap(properties);
+ newProperties.put(Table.PROPERTY_LOCATION, tableLocation);
+ newProperties.put(Table.PROPERTY_TABLE_FORMAT, format);
+
+ // Get the table operations for the specified table format.
+ Supplier<ManagedTableOperations> tableOpsSupplier =
tableOpsCache.get(format);
+ Preconditions.checkArgument(tableOpsSupplier != null, "Unsupported table
format: %s", format);
+ ManagedTableOperations tableOps = tableOpsSupplier.get();
+
+ Table createdTable =
+ tableOps.createTable(
+ ident, columns, comment, newProperties, partitions, distribution,
sortOrders, indexes);
+ // Cache the table format for future use.
+ tableFormatCache.put(ident, format);
+ return createdTable;
+ }
+
+ @Override
+ public Table alterTable(NameIdentifier ident, TableChange... changes)
+ throws NoSuchTableException, IllegalArgumentException {
+ Table alteredTable = tableOps(ident).alterTable(ident, changes);
+
+ boolean isRenameChange =
+ Arrays.stream(changes).anyMatch(c -> c instanceof
TableChange.RenameTable);
+ if (isRenameChange) {
+ tableFormatCache.invalidate(ident);
+ }
+
+ return alteredTable;
+ }
+
+ @Override
+ public boolean purgeTable(NameIdentifier ident) {
+ boolean purged = tableOps(ident).purgeTable(ident);
+ tableFormatCache.invalidate(ident);
+ return purged;
+ }
+
+ @Override
+ public boolean dropTable(NameIdentifier ident) throws
UnsupportedOperationException {
+ boolean dropped = tableOps(ident).dropTable(ident);
+ tableFormatCache.invalidate(ident);
+ return dropped;
+ }
+
+ private String calculateTableLocation(
+ Schema schema, NameIdentifier tableIdent, Map<String, String>
tableProperties) {
+ String tableLocation =
+ (String)
+ propertiesMetadata
+ .tablePropertiesMetadata()
+ .getOrDefault(tableProperties, Table.PROPERTY_LOCATION);
+ if (StringUtils.isNotBlank(tableLocation)) {
+ return ensureTrailingSlash(tableLocation);
+ }
+
+ String schemaLocation =
+ schema.properties() == null ? null :
schema.properties().get(Schema.PROPERTY_LOCATION);
+
+ // If we do not set location in table properties, and schema location is
set, use schema
+ // location as the base path.
+ if (StringUtils.isNotBlank(schemaLocation)) {
+ return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
+ }
+
+ // If the schema location is not set, use catalog lakehouse dir as the
base path. Or else, throw
+ // an exception.
+ if (catalogLocation.isEmpty()) {
+ throw new IllegalArgumentException(
+ "'location' property is neither set in table properties "
+ + "nor in schema properties, and no location is set in catalog
properties either. "
+ + "Please set the 'location' in either of them to create the
table "
+ + tableIdent);
+ }
+
+ return ensureTrailingSlash(catalogLocation.get())
+ + tableIdent.namespace().level(2)
+ + SLASH
+ + tableIdent.name()
+ + SLASH;
+ }
+
+ private String ensureTrailingSlash(String path) {
+ return path.endsWith(SLASH) ? path : path + SLASH;
+ }
+
+ private ManagedTableOperations tableOps(NameIdentifier tableIdent) {
+ try {
+ String tableFormat =
+ tableFormatCache.get(
+ tableIdent,
+ () -> {
+ TableEntity table = store.get(tableIdent, TABLE,
TableEntity.class);
+ String format =
table.properties().getOrDefault(Table.PROPERTY_TABLE_FORMAT, null);
+ Preconditions.checkArgument(
+ format != null, "Table format for %s is null, this is
unexpected", tableIdent);
+
+ return format.toLowerCase(Locale.ROOT);
+ });
+
+ ManagedTableOperations ops = tableOpsCache.get(tableFormat).get();
+ Preconditions.checkArgument(
+ ops != null, "No table operations found for table format %s",
tableFormat);
+ return ops;
+
+ } catch (Exception e) {
+ Throwable t = e.getCause();
+
+ if (t instanceof NoSuchEntityException) {
+ throw new NoSuchTableException("Table %s does not exist", tableIdent);
+ } else if (t instanceof IllegalArgumentException) {
+ throw (IllegalArgumentException) t;
+ } else if (t instanceof IOException) {
+ throw new RuntimeException(
+ String.format("Failed to load table %s: %s", tableIdent,
t.getMessage()), t);
+ } else {
+ throw new RuntimeException(
+ String.format("Unexpected exception when loading table %s",
tableIdent), t);
+ }
+ }
+ }
+}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogPropertiesMetadata.java
similarity index 67%
rename from
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
rename to
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogPropertiesMetadata.java
index b8c3958e9a..71daef161b 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogPropertiesMetadata.java
@@ -17,21 +17,19 @@
* under the License.
*/
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
-import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
-public class GenericLakehouseCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata {
-
- public static final String LAKEHOUSE_LOCATION = "location";
+public class GenericCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata {
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
@@ -39,18 +37,11 @@ public class GenericLakehouseCatalogPropertiesMetadata
extends BaseCatalogProper
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringOptionalPropertyEntry(
- LAKEHOUSE_LOCATION,
- "The root directory of the lakehouse catalog.",
+ Catalog.PROPERTY_LOCATION,
+ "The root directory of the generic catalog.",
false /* immutable */,
null, /* defaultValue */
- false /* hidden */),
- PropertyEntry.stringOptionalPropertyPrefixEntry(
- LANCE_TABLE_STORAGE_OPTION_PREFIX,
- "The storage options passed to Lance table.",
- false /* immutable */,
- null /* default value*/,
- false /* hidden */,
- false /* reserved */));
+ false /* hidden */));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericSchemaPropertiesMetadata.java
similarity index 65%
copy from
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
copy to
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericSchemaPropertiesMetadata.java
index 3dd0abf81d..585cf8babc 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericSchemaPropertiesMetadata.java
@@ -16,21 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
-import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
+import org.apache.gravitino.Schema;
import org.apache.gravitino.connector.BasePropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
-public class GenericLakehouseSchemaPropertiesMetadata extends
BasePropertiesMetadata {
- public static final String LAKEHOUSE_LOCATION =
- GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION;
+public class GenericSchemaPropertiesMetadata extends BasePropertiesMetadata {
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
@@ -38,18 +36,11 @@ public class GenericLakehouseSchemaPropertiesMetadata
extends BasePropertiesMeta
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringOptionalPropertyEntry(
- LAKEHOUSE_LOCATION,
- "The root directory of the lakehouse schema.",
+ Schema.PROPERTY_LOCATION,
+ "The root directory of the schema.",
false /* immutable */,
null, /* defaultValue */
- false /* hidden */),
- PropertyEntry.stringOptionalPropertyPrefixEntry(
- LANCE_TABLE_STORAGE_OPTION_PREFIX,
- "The storage options passed to Lance table.",
- false /* immutable */,
- null /* default value*/,
- false /* hidden */,
- false /* reserved */));
+ false /* hidden */));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
similarity index 56%
rename from
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
rename to
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
index 3dd0abf81d..14cd263ecd 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericTablePropertiesMetadata.java
@@ -16,21 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
-import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+import static
org.apache.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.gravitino.connector.BasePropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.rel.Table;
-public class GenericLakehouseSchemaPropertiesMetadata extends
BasePropertiesMetadata {
- public static final String LAKEHOUSE_LOCATION =
- GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION;
+public class GenericTablePropertiesMetadata extends BasePropertiesMetadata {
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
@@ -38,24 +39,32 @@ public class GenericLakehouseSchemaPropertiesMetadata
extends BasePropertiesMeta
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringOptionalPropertyEntry(
- LAKEHOUSE_LOCATION,
- "The root directory of the lakehouse schema.",
+ Table.PROPERTY_LOCATION,
+ "The root directory of the generic table.",
false /* immutable */,
null, /* defaultValue */
false /* hidden */),
- PropertyEntry.stringOptionalPropertyPrefixEntry(
- LANCE_TABLE_STORAGE_OPTION_PREFIX,
- "The storage options passed to Lance table.",
- false /* immutable */,
- null /* default value*/,
- false /* hidden */,
- false /* reserved */));
+ stringRequiredPropertyEntry(
+ Table.PROPERTY_TABLE_FORMAT,
+ "The format of the table",
+ true /* immutable */,
+ false /* hidden */));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
}
@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
- return PROPERTIES_METADATA;
+ // Get all the table specific property entries from the registered table
delegators, and merge
+ // them with the generic table properties.
+ Map<String, PropertyEntry<?>> tableSpecificPropertyEntries =
+ LakehouseTableDelegatorFactory.tableDelegators().entrySet().stream()
+ .flatMap(kv -> kv.getValue().tablePropertyEntries().stream())
+ .collect(Collectors.toMap(PropertyEntry::getName, entry -> entry));
+
+ return ImmutableMap.<String, PropertyEntry<?>>builder()
+ .putAll(PROPERTIES_METADATA)
+ .putAll(tableSpecificPropertyEntries)
+ .build();
}
}
diff --git
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegator.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegator.java
new file mode 100644
index 0000000000..0e1d1fbec4
--- /dev/null
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.generic;
+
+import java.util.List;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.IdGenerator;
+
+/**
+ * A delegator interface for different lakehouse table formats to provide
their specific table
+ * operations and property definitions.
+ */
+public interface LakehouseTableDelegator {
+
+ /**
+ * Returns the table format name handled by this delegator.
+ *
+ * @return the table format name
+ */
+ String tableFormat();
+
+ /**
+ * Returns the list of property entries specific to the table format.
+ *
+ * @return the list of property entries
+ */
+ List<PropertyEntry<?>> tablePropertyEntries();
+
+ /**
+ * Create the managed table operations for the specific table format. This
method should return a
+ * new instance of {@link ManagedTableOperations} each time it is called.
The returned instance
+ * will be used in {@link GenericCatalogOperations} to perform the specific
table operation.
+ *
+ * @param store the entity store
+ * @param schemaOps the managed schema operations
+ * @param idGenerator the ID generator
+ * @return the managed table operations
+ */
+ ManagedTableOperations createTableOps(
+ EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator
idGenerator);
+}
diff --git
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegatorFactory.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegatorFactory.java
new file mode 100644
index 0000000000..3ec700e374
--- /dev/null
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/LakehouseTableDelegatorFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.generic;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LakehouseTableDelegatorFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(LakehouseTableDelegatorFactory.class);
+
+ private static ImmutableMap<String, LakehouseTableDelegator> delegators;
+
+ private LakehouseTableDelegatorFactory() {}
+
+ private static synchronized void createTableDelegators() {
+ if (delegators != null) {
+ return;
+ }
+
+ ClassLoader cl =
+ Optional.ofNullable(Thread.currentThread().getContextClassLoader())
+ .orElse(LakehouseTableDelegator.class.getClassLoader());
+ ServiceLoader<LakehouseTableDelegator> loader =
+ ServiceLoader.load(LakehouseTableDelegator.class, cl);
+ delegators =
+ ImmutableMap.copyOf(
+ loader.stream()
+ .collect(
+ Collectors.toMap(
+ provider -> provider.get().tableFormat(),
ServiceLoader.Provider::get)));
+ Preconditions.checkArgument(
+ !delegators.isEmpty(),
+ "No LakehouseTableDelegator implementation found via ServiceLoader,
this is unexpected, "
+ + "please check the code to see what is going on.");
+
+ LOG.info("Loaded LakehouseTableDelegators: {}", delegators.keySet());
+ }
+
+ public static ImmutableMap<String, LakehouseTableDelegator>
tableDelegators() {
+ // Initialize delegators if not yet done
+ createTableDelegators();
+ return delegators;
+ }
+}
diff --git
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java
new file mode 100644
index 0000000000..d23322eb17
--- /dev/null
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.storage.IdGenerator;
+
+public class LanceTableDelegator implements LakehouseTableDelegator {
+
+ public static final String LANCE_TABLE_FORMAT = "lance";
+
+ public static final String PROPERTY_LANCE_TABLE_REGISTER = "lance.register";
+
+ public static final String PROPERTY_LANCE_STORAGE_OPTIONS_PREFIX =
"lance.storage.";
+
+ @Override
+ public String tableFormat() {
+ return LANCE_TABLE_FORMAT;
+ }
+
+ @Override
+ public List<PropertyEntry<?>> tablePropertyEntries() {
+ return ImmutableList.of(
+ PropertyEntry.stringOptionalPropertyPrefixEntry(
+ PROPERTY_LANCE_STORAGE_OPTIONS_PREFIX,
+ "The storage options passed to Lance table.",
+ false /* immutable */,
+ null /* default value*/,
+ false /* hidden */,
+ false /* reserved */),
+ PropertyEntry.booleanPropertyEntry(
+ PROPERTY_LANCE_TABLE_REGISTER,
+ "Whether this is a table registration operation.",
+ false,
+ true /* immutable */,
+ false /* defaultValue */,
+ false /* hidden */,
+ false));
+ }
+
+ @Override
+ public ManagedTableOperations createTableOps(
+ EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator
idGenerator) {
+ return new LanceTableOperations(store, schemaOps, idGenerator);
+ }
+}
diff --git
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java
new file mode 100644
index 0000000000..2eed5288be
--- /dev/null
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import com.google.common.base.Preconditions;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.index.DistanceType;
+import com.lancedb.lance.index.IndexParams;
+import com.lancedb.lance.index.IndexType;
+import com.lancedb.lance.index.vector.VectorIndexParams;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.ManagedTableOperations;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter;
+import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.storage.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LanceTableOperations extends ManagedTableOperations {
+ private static final Logger LOG =
LoggerFactory.getLogger(LanceTableOperations.class);
+
+ private final EntityStore store;
+
+ private final ManagedSchemaOperations schemaOps;
+
+ private final IdGenerator idGenerator;
+
+ public LanceTableOperations(
+ EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator
idGenerator) {
+ this.store = store;
+ this.schemaOps = schemaOps;
+ this.idGenerator = idGenerator;
+ }
+
+ @Override
+ protected EntityStore store() {
+ return store;
+ }
+
+ @Override
+ protected SupportsSchemas schemas() {
+ return schemaOps;
+ }
+
+ @Override
+ protected IdGenerator idGenerator() {
+ return idGenerator;
+ }
+
+ @Override
+ public Table createTable(
+ NameIdentifier ident,
+ Column[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders,
+ Index[] indexes)
+ throws NoSuchSchemaException, TableAlreadyExistsException {
+ String location = properties.get(Table.PROPERTY_LOCATION);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(location), "Table location must be specified");
+ Map<String, String> storageProps =
LancePropertiesUtils.getLanceStorageOptions(properties);
+
+ boolean register =
+
Optional.ofNullable(properties.get(LanceTableDelegator.PROPERTY_LANCE_TABLE_REGISTER))
+ .map(Boolean::parseBoolean)
+ .orElse(false);
+ if (register) {
+ // If this is a registration operation, just create the table metadata
without creating a new
+ // dataset
+ return super.createTable(
+ ident, columns, comment, properties, partitions, distribution,
sortOrders, indexes);
+ }
+
+ try (Dataset ignored =
+ Dataset.create(
+ new RootAllocator(),
+ location,
+ convertColumnsToArrowSchema(columns),
+ new
WriteParams.Builder().withStorageOptions(storageProps).build())) {
+ // Only create the table metadata in Gravitino after the Lance dataset
is successfully
+ // created.
+ return super.createTable(
+ ident, columns, comment, properties, partitions, distribution,
sortOrders, indexes);
+ } catch (NoSuchSchemaException e) {
+ throw e;
+ } catch (TableAlreadyExistsException e) {
+ // If the table metadata already exists, but the underlying lance table
was just created
+ // successfully, we need to clean up the created lance table to avoid
orphaned datasets.
+ Dataset.drop(location,
LancePropertiesUtils.getLanceStorageOptions(properties));
+ throw e;
+ } catch (IllegalArgumentException e) {
+ if (e.getMessage().contains("Dataset already exists")) {
+ throw new TableAlreadyExistsException(
+ e, "Lance dataset already exists at location %s", location);
+ }
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create Lance dataset at location "
+ location, e);
+ }
+ }
+
+ @Override
+ public Table alterTable(NameIdentifier ident, TableChange... changes)
+ throws NoSuchSchemaException, TableAlreadyExistsException {
+ // Lance only supports adding indexes for now.
+ boolean onlyAddIndex =
+ Arrays.stream(changes).allMatch(change -> change instanceof
TableChange.AddIndex);
+ Preconditions.checkArgument(onlyAddIndex, "Only adding indexes is
supported for Lance tables");
+
+ List<Index> addedIndexes =
+ Arrays.stream(changes)
+ .filter(change -> change instanceof TableChange.AddIndex)
+ .map(
+ change -> {
+ TableChange.AddIndex addIndexChange = (TableChange.AddIndex)
change;
+ return Indexes.IndexImpl.builder()
+ .withIndexType(addIndexChange.getType())
+ .withName(addIndexChange.getName())
+ .withFieldNames(addIndexChange.getFieldNames())
+ .build();
+ })
+ .collect(Collectors.toList());
+
+ Table loadedTable = super.loadTable(ident);
+ addLanceIndex(loadedTable, addedIndexes);
+ // After adding the index to the Lance dataset, we need to update the
table metadata in
+ // Gravitino. If there's any failure during this process, the code will
throw an exception
+ // and the update won't be applied in Gravitino.
+ return super.alterTable(ident, changes);
+ }
+
+ @Override
+ public boolean purgeTable(NameIdentifier ident) {
+ try {
+ Table table = loadTable(ident);
+ String location = table.properties().get(Table.PROPERTY_LOCATION);
+
+ boolean purged = super.purgeTable(ident);
+ // If the table metadata is purged successfully, we can delete the Lance
dataset.
+ // Otherwise, we should not delete the dataset.
+ if (purged) {
+ // Delete the Lance dataset at the location
+ Dataset.drop(location,
LancePropertiesUtils.getLanceStorageOptions(table.properties()));
+ LOG.info("Deleted Lance dataset at location {}", location);
+ }
+
+ return purged;
+
+ } catch (NoSuchTableException e) {
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to purge Lance dataset for table " +
ident, e);
+ }
+ }
+
+ @Override
+ public boolean dropTable(NameIdentifier ident) {
+ try {
+ Table table = loadTable(ident);
+ boolean external =
+ Optional.ofNullable(table.properties().get(Table.PROPERTY_EXTERNAL))
+ .map(Boolean::parseBoolean)
+ .orElse(false);
+
+ boolean dropped = super.dropTable(ident);
+ if (external) {
+ return dropped;
+ }
+
+ // If the table metadata is dropped successfully, and the table is not
external, we can delete
+ // the
+ // Lance dataset. Otherwise, we should not delete the dataset.
+ if (dropped) {
+ String location = table.properties().get(Table.PROPERTY_LOCATION);
+
+ // Delete the Lance dataset at the location
+ Dataset.drop(location,
LancePropertiesUtils.getLanceStorageOptions(table.properties()));
+ LOG.info("Deleted Lance dataset at location {}", location);
+ }
+
+ return dropped;
+
+ } catch (NoSuchTableException e) {
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to drop Lance dataset for table " +
ident, e);
+ }
+ }
+
+ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Column[] columns) {
+ List<Field> fields =
+ Arrays.stream(columns)
+ .map(
+ col ->
+ LanceDataTypeConverter.CONVERTER.toArrowField(
+ col.name(), col.dataType(), col.nullable()))
+ .collect(Collectors.toList());
+ return new org.apache.arrow.vector.types.pojo.Schema(fields);
+ }
+
+ private void addLanceIndex(Table table, List<Index> addedIndexes) {
+ String location = table.properties().get(Table.PROPERTY_LOCATION);
+ try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
+ // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
+ for (Index index : addedIndexes) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+ IndexParams indexParams = getIndexParamsByIndexType(indexType);
+
+ dataset.createIndex(
+ Arrays.stream(index.fieldNames())
+ .map(field -> String.join(".", field))
+ .collect(Collectors.toList()),
+ indexType,
+ Optional.of(index.name()),
+ indexParams,
+ true);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to add indexes to Lance dataset at location " + location, e);
+ }
+ }
+
+ private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+ switch (indexType) {
+ case SCALAR:
+ return IndexParams.builder().build();
+ case VECTOR:
+ // TODO make these parameters configurable
+ int numberOfDimensions = 3; // this value should be determined
dynamically based on the data
+ // Add properties to Index to set this value.
+ return IndexParams.builder()
+ .setVectorIndexParams(
+ VectorIndexParams.ivfPq(2, 8, numberOfDimensions,
DistanceType.L2, 2))
+ .build();
+ default:
+ throw new IllegalArgumentException("Unsupported index type: " +
indexType);
+ }
+ }
+}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
similarity index 92%
copy from
catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
copy to
catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
index 927e28b4fd..f0aeb0cb23 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
+++
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
@@ -16,4 +16,4 @@
# specific language governing permissions and limitations
# under the License.
#
-org.apache.gravitino.catalog.lakehouse.GenericLakehouseCatalog
+org.apache.gravitino.catalog.lakehouse.generic.GenericCatalog
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
similarity index 92%
rename from
catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
rename to
catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
index 927e28b4fd..352813f4b1 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/resources/META-INF/services/org.apache.gravitino.CatalogProvider
+++
b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator
@@ -16,4 +16,4 @@
# specific language governing permissions and limitations
# under the License.
#
-org.apache.gravitino.catalog.lakehouse.GenericLakehouseCatalog
+org.apache.gravitino.catalog.lakehouse.lance.LanceTableDelegator
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/resources/generic-lakehouse.conf
b/catalogs/catalog-lakehouse-generic/src/main/resources/lakehouse-generic.conf
similarity index 100%
rename from
catalogs/catalog-generic-lakehouse/src/main/resources/generic-lakehouse.conf
rename to
catalogs/catalog-lakehouse-generic/src/main/resources/lakehouse-generic.conf
diff --git
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestGenericLakehouseCatalogOperations.java
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
similarity index 97%
rename from
catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestGenericLakehouseCatalogOperations.java
rename to
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
index 67887c2f7a..4c71f4adb1 100644
---
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestGenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestGenericCatalogOperations.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
import static
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
@@ -68,7 +68,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-public class TestGenericLakehouseCatalogOperations {
+public class TestGenericCatalogOperations {
private static final String STORE_PATH =
"/tmp/gravitino_test_entityStore_" +
UUID.randomUUID().toString().replace("-", "");
private static final String METALAKE_NAME = "metalake_for_lakehouse_test";
@@ -76,7 +76,7 @@ public class TestGenericLakehouseCatalogOperations {
private static EntityStore store;
private static IdGenerator idGenerator;
- private static GenericLakehouseCatalogOperations ops;
+ private static GenericCatalogOperations ops;
@BeforeAll
public static void setUp() throws IOException {
@@ -132,7 +132,7 @@ public class TestGenericLakehouseCatalogOperations {
.build();
store.put(catalog, false);
- ops = new GenericLakehouseCatalogOperations(store);
+ ops = new GenericCatalogOperations(store, idGenerator);
}
@AfterAll
diff --git
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestPropertiesMetadata.java
similarity index 52%
rename from
catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
rename to
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestPropertiesMetadata.java
index 8dfd3b5ce0..75eb613dc5 100644
---
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/generic/TestPropertiesMetadata.java
@@ -17,86 +17,66 @@
* under the License.
*/
-package org.apache.gravitino.catalog.lakehouse;
+package org.apache.gravitino.catalog.lakehouse.generic;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
+import org.apache.gravitino.Schema;
import org.apache.gravitino.connector.PropertiesMetadata;
+import org.apache.gravitino.rel.Table;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
public class TestPropertiesMetadata {
- public static GenericLakehouseCatalog genericLakehouseCatalog;
+ public static GenericCatalog genericCatalog;
@BeforeAll
static void init() {
- genericLakehouseCatalog = new GenericLakehouseCatalog();
+ genericCatalog = new GenericCatalog();
}
@Test
void testCatalogPropertiesMetadata() {
- PropertiesMetadata catalogPropertiesMetadata =
- genericLakehouseCatalog.catalogPropertiesMetadata();
+ PropertiesMetadata catalogPropertiesMetadata =
genericCatalog.catalogPropertiesMetadata();
Assertions.assertNotNull(catalogPropertiesMetadata);
- Map<String, String> catalogProperties =
- ImmutableMap.of(
- "storage.type", "s3",
- "storage.s3.bucket", "my-bucket",
- "storage.s3.region", "us-west-2",
- "location", "/tmp/test1");
+ Map<String, String> catalogProperties = ImmutableMap.of("location",
"/tmp/test1");
String catalogLocation =
(String)
catalogPropertiesMetadata.getOrDefault(
- catalogProperties,
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION);
+ catalogProperties, GenericCatalog.PROPERTY_LOCATION);
Assertions.assertEquals("/tmp/test1", catalogLocation);
}
@Test
void testSchemaPropertiesMetadata() {
- PropertiesMetadata schemaPropertiesMetadata =
- genericLakehouseCatalog.schemaPropertiesMetadata();
+ PropertiesMetadata schemaPropertiesMetadata =
genericCatalog.schemaPropertiesMetadata();
Assertions.assertNotNull(schemaPropertiesMetadata);
- Map<String, String> schemaProperties =
- ImmutableMap.of(
- "storage.type", "s3",
- "storage.s3.bucket", "my-bucket",
- "storage.s3.region", "us-west-2",
- "location", "/tmp/test_schema");
+ Map<String, String> schemaProperties = ImmutableMap.of("location",
"/tmp/test_schema");
String schemaLocation =
- (String)
- schemaPropertiesMetadata.getOrDefault(
- schemaProperties,
GenericLakehouseSchemaPropertiesMetadata.LAKEHOUSE_LOCATION);
+ (String) schemaPropertiesMetadata.getOrDefault(schemaProperties,
Schema.PROPERTY_LOCATION);
Assertions.assertEquals("/tmp/test_schema", schemaLocation);
}
@Test
void testTablePropertiesMetadata() {
- PropertiesMetadata tablePropertiesMetadata =
genericLakehouseCatalog.tablePropertiesMetadata();
+ PropertiesMetadata tablePropertiesMetadata =
genericCatalog.tablePropertiesMetadata();
Assertions.assertNotNull(tablePropertiesMetadata);
Map<String, String> tableProperties =
ImmutableMap.of(
- "storage.type", "s3",
- "storage.s3.bucket", "my-bucket",
- "storage.s3.region", "us-west-2",
+ "lance.storage.type", "s3",
+ "lance.storage.s3.bucket", "my-bucket",
+ "lance.storage.s3.region", "us-west-2",
"location", "/tmp/test_table",
"format", "iceberg");
String tableLocation =
- (String)
- tablePropertiesMetadata.getOrDefault(
- tableProperties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
+ (String) tablePropertiesMetadata.getOrDefault(tableProperties,
Table.PROPERTY_LOCATION);
Assertions.assertEquals("/tmp/test_table", tableLocation);
-
- LakehouseTableFormat tableFormat =
- (LakehouseTableFormat)
- tablePropertiesMetadata.getOrDefault(
- tableProperties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
- Assertions.assertEquals(LakehouseTableFormat.ICEBERG, tableFormat);
}
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
similarity index 97%
rename from
catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
rename to
catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
index 0c92b43eef..01b0ce2aed 100644
---
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/integration/test/CatalogGenericLakehouseLanceIT.java
+++
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.catalog.lakehouse.integration.test;
+package org.apache.gravitino.catalog.lakehouse.lance.integration.test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
@@ -53,7 +53,6 @@ import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.integration.test.util.BaseIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.rel.Column;
@@ -83,10 +82,11 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CatalogGenericLakehouseLanceIT extends BaseIT {
- private static final Logger LOG =
LoggerFactory.getLogger(CatalogGenericLakehouseLanceIT.class);
+public class CatalogGenericCatalogLanceIT extends BaseIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(CatalogGenericCatalogLanceIT.class);
public static final String metalakeName =
GravitinoITUtils.genRandomName("CatalogGenericLakeLanceIT_metalake");
+
public String catalogName =
GravitinoITUtils.genRandomName("CatalogGenericLakeLanceI_catalog");
public String SCHEMA_PREFIX = "CatalogGenericLakeLance_schema";
public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
@@ -96,7 +96,7 @@ public class CatalogGenericLakehouseLanceIT extends BaseIT {
public static final String LANCE_COL_NAME1 = "lance_col_name1";
public static final String LANCE_COL_NAME2 = "lance_col_name2";
public static final String LANCE_COL_NAME3 = "lance_col_name3";
- protected final String provider = "generic-lakehouse";
+ protected final String provider = "lakehouse-generic";
protected GravitinoMetalake metalake;
protected Catalog catalog;
protected String tempDirectory;
@@ -146,7 +146,7 @@ public class CatalogGenericLakehouseLanceIT extends BaseIT {
}
@Test
- public void testCreateLanceTable() throws InterruptedException {
+ public void testCreateLanceTable() {
// Create a table from Gravitino API
Column[] columns = createColumns();
NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
@@ -284,7 +284,7 @@ public class CatalogGenericLakehouseLanceIT extends BaseIT {
Assertions.assertTrue(e.getMessage().contains("Invalid user input"));
Assertions.assertThrows(
- NoSuchTableException.class, () ->
catalog.asTableCatalog().loadTable(newNameIdentifier));
+ RuntimeException.class, () ->
catalog.asTableCatalog().loadTable(newNameIdentifier));
}
@Test
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 6289bd3404..7446135834 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -42,10 +42,8 @@ import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.StringIdentifier;
-import org.apache.gravitino.catalog.CatalogManager.CatalogWrapper;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.connector.capability.CapabilityResult;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
@@ -115,18 +113,6 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
*/
@Override
public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
- NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- if (isManagedTable(catalogIdent)) {
- return TreeLockUtils.doWithTreeLock(
- ident,
- LockType.READ,
- () ->
- doWithCatalog(
- catalogIdent,
- c -> c.doWithTableOps(t -> t.loadTable(ident)),
- NoSuchTableException.class));
- }
-
EntityCombinedTable entityCombinedTable =
TreeLockUtils.doWithTreeLock(ident, LockType.READ, () ->
internalLoadTable(ident));
@@ -250,9 +236,8 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
NoSuchTableException.class,
IllegalArgumentException.class);
- if (isManagedTable(catalogIdent)) {
- // For generic lakehouse catalog, all operations will be
dispatched to the underlying
- // catalog.
+ boolean isManagedTable = isManagedEntity(catalogIdent,
Capability.Scope.TABLE);
+ if (isManagedTable) {
return EntityCombinedTable.of(alteredTable)
.withHiddenProperties(
getHiddenPropertyNames(
@@ -341,19 +326,17 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
LockType.WRITE,
() -> {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- if (isManagedTable(catalogIdent)) {
- return doWithCatalog(
- catalogIdent,
- c -> c.doWithTableOps(t -> t.dropTable(ident)),
- RuntimeException.class);
- }
-
boolean droppedFromCatalog =
doWithCatalog(
catalogIdent,
c -> c.doWithTableOps(t -> t.dropTable(ident)),
RuntimeException.class);
+ boolean isManagedTable = isManagedEntity(catalogIdent,
Capability.Scope.TABLE);
+ if (isManagedTable) {
+ return droppedFromCatalog;
+ }
+
// For unmanaged table, it could happen that the table:
// 1. Is not found in the catalog (dropped directly from underlying
sources)
// 2. Is found in the catalog but not in the store (not managed by
Gravitino)
@@ -362,20 +345,14 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
// In all situations, we try to delete the schema from the store,
but we don't take the
// return value of the store operation into account. We only take
the return value of the
// catalog into account.
- //
- // For managed table, we should take the return value of the store
operation into account.
- boolean droppedFromStore = false;
try {
- droppedFromStore = store.delete(ident, TABLE);
+ store.delete(ident, TABLE);
} catch (NoSuchEntityException e) {
LOG.warn("The table to be dropped does not exist in the store:
{}", ident, e);
} catch (Exception e) {
throw new RuntimeException(e);
}
-
- return isManagedEntity(catalogIdent, Capability.Scope.TABLE)
- ? droppedFromStore
- : droppedFromCatalog;
+ return droppedFromCatalog;
});
}
@@ -408,7 +385,8 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
RuntimeException.class,
UnsupportedOperationException.class);
- if (isManagedTable(catalogIdent)) {
+ boolean isManagedTable = isManagedEntity(catalogIdent,
Capability.Scope.TABLE);
+ if (isManagedTable) {
return droppedFromCatalog;
}
@@ -420,21 +398,15 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
// In all situations, we try to delete the schema from the store,
but we don't take the
// return value of the store operation into account. We only take
the return value of the
// catalog into account.
- //
- // For managed table, we should take the return value of the store
operation into account.
- boolean droppedFromStore;
try {
- droppedFromStore = store.delete(ident, TABLE);
+ store.delete(ident, TABLE);
} catch (NoSuchEntityException e) {
LOG.warn("The table to be purged does not exist in the store: {}",
ident, e);
return false;
} catch (Exception e) {
throw new RuntimeException(e);
}
-
- return isManagedEntity(catalogIdent, Capability.Scope.TABLE)
- ? droppedFromStore
- : droppedFromCatalog;
+ return droppedFromCatalog;
});
}
@@ -530,6 +502,18 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
c -> c.doWithTableOps(t -> t.loadTable(ident)),
NoSuchTableException.class);
+ boolean isManagedTable = isManagedEntity(catalogIdentifier,
Capability.Scope.TABLE);
+ if (isManagedTable) {
+ return EntityCombinedTable.of(table)
+ .withHiddenProperties(
+ getHiddenPropertyNames(
+ catalogIdentifier,
+ HasPropertyMetadata::tablePropertiesMetadata,
+ table.properties()))
+ // The metadata of managed table is stored by Gravitino, so it is
always imported.
+ .withImported(true /* imported */);
+ }
+
StringIdentifier stringId = getStringIdFromProperties(table.properties());
// Case 1: The table is not created by Gravitino or the external system
does not support storing
// string identifier.
@@ -554,9 +538,9 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
catalogIdentifier,
HasPropertyMetadata::tablePropertiesMetadata,
table.properties()))
- // Some tables don't have properties or are not created by Gravitino,
- // we can't use stringIdentifier to judge whether schema is ever
imported or not.
- // We need to check whether the entity exists.
+ // For some catalogs like PG, the identifier information is not
stored in the table's
+ // metadata, we need to check if this table exists in the store, if
so we don't
+ // need to import.
.withImported(true);
}
@@ -596,31 +580,6 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
}),
IllegalArgumentException.class);
- if (isManagedTable(catalogIdent)) {
- // For generic lakehouse catalog, all operations will be dispatched to
the underlying catalog.
- Table table =
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithTableOps(
- t ->
- t.createTable(
- ident,
- columns,
- comment,
- properties,
- partitions == null ? EMPTY_TRANSFORM :
partitions,
- distribution == null ? Distributions.NONE :
distribution,
- sortOrders == null ? new SortOrder[0] :
sortOrders,
- indexes == null ? Indexes.EMPTY_INDEXES :
indexes)),
- NoSuchSchemaException.class,
- TableAlreadyExistsException.class);
- return EntityCombinedTable.of(table)
- .withHiddenProperties(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
- }
-
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will
handle this
// StringIdentifier to make sure only when the operation is successful,
the related
@@ -649,6 +608,15 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
NoSuchSchemaException.class,
TableAlreadyExistsException.class);
+ // If the table is managed by Gravitino, we don't need to create
TableEntity and store it again.
+ boolean isManagedTable = isManagedEntity(catalogIdent,
Capability.Scope.TABLE);
+ if (isManagedTable) {
+ return EntityCombinedTable.of(table)
+ .withHiddenProperties(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
+ }
+
AuditInfo audit =
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
@@ -671,12 +639,6 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
- if (isManagedTable(catalogIdent)) {
- // Drop table
- doWithCatalog(
- catalogIdent, c -> c.doWithTableOps(t -> t.dropTable(ident)),
RuntimeException.class);
- }
-
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenProperties(
@@ -684,7 +646,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
}
- // For managed table, we can use table entity to indicate the table is
created successfully.
+ // Merge both the metadata from catalog operation and the metadata from
entity store.
return EntityCombinedTable.of(table, tableEntity)
.withHiddenProperties(
getHiddenPropertyNames(
@@ -699,15 +661,6 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
.collect(Collectors.toList());
}
- private boolean isManagedTable(NameIdentifier catalogIdent) {
- CatalogManager catalogManager =
GravitinoEnv.getInstance().catalogManager();
- CatalogWrapper wrapper = catalogManager.loadCatalogAndWrap(catalogIdent);
- Capability capability = wrapper.catalog().capability();
-
- CapabilityResult result =
capability.managedStorage(Capability.Scope.TABLE);
- return result == CapabilityResult.SUPPORTED;
- }
-
private boolean isSameColumn(Column left, int columnPosition, ColumnEntity
right) {
return Objects.equal(left.name(), right.name())
&& columnPosition == right.position()
diff --git
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
deleted file mode 100644
index 4fb7b5d12d..0000000000
---
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.connector;
-
-import org.apache.gravitino.tag.SupportsTags;
-
-public class GenericLakehouseColumn extends BaseColumn {
- @Override
- public SupportsTags supportsTags() {
- return super.supportsTags();
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder extends BaseColumnBuilder<Builder,
GenericLakehouseColumn> {
-
- /** Creates a new instance of {@link Builder}. */
- private Builder() {}
-
- /**
- * Internal method to build a HiveColumn instance using the provided
values.
- *
- * @return A new HiveColumn instance with the configured values.
- */
- @Override
- protected GenericLakehouseColumn internalBuild() {
- GenericLakehouseColumn hiveColumn = new GenericLakehouseColumn();
-
- hiveColumn.name = name;
- hiveColumn.comment = comment;
- hiveColumn.dataType = dataType;
- hiveColumn.nullable = nullable;
- hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET :
defaultValue;
- hiveColumn.autoIncrement = autoIncrement;
- return hiveColumn;
- }
- }
-}
diff --git
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
deleted file mode 100644
index 2d14a539e2..0000000000
---
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.connector;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.gravitino.rel.Table;
-
-public class GenericLakehouseTable extends BaseTable {
-
- public static Builder builder() {
- return new Builder();
- }
-
- @Override
- protected TableOperations newOps() throws UnsupportedOperationException {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- public static class Builder extends BaseTableBuilder<Builder,
GenericLakehouseTable> {
-
- private String format;
-
- public Builder withFormat(String format) {
- this.format = format;
- return this;
- }
-
- @Override
- protected GenericLakehouseTable internalBuild() {
- GenericLakehouseTable genericLakehouseTable = new
GenericLakehouseTable();
- genericLakehouseTable.columns = this.columns;
- genericLakehouseTable.comment = this.comment;
-
- if (format != null) {
- genericLakehouseTable.properties =
- ImmutableMap.<String, String>builder()
- .putAll(this.properties)
- .put(Table.PROPERTY_TABLE_FORMAT, this.format)
- .buildKeepingLast();
- } else {
- genericLakehouseTable.properties = this.properties;
- }
- genericLakehouseTable.auditInfo = this.auditInfo;
- genericLakehouseTable.distribution = this.distribution;
- genericLakehouseTable.indexes = this.indexes;
- genericLakehouseTable.name = this.name;
- genericLakehouseTable.partitioning = this.partitioning;
- genericLakehouseTable.sortOrders = this.sortOrders;
- return genericLakehouseTable;
- }
- }
-}
diff --git a/lance/lance-common/build.gradle.kts
b/lance/lance-common/build.gradle.kts
index 8bb80c1934..e3a669212c 100644
--- a/lance/lance-common/build.gradle.kts
+++ b/lance/lance-common/build.gradle.kts
@@ -46,3 +46,13 @@ dependencies {
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
}
+
+tasks.test {
+ val skipITs = project.hasProperty("skipITs")
+ if (skipITs) {
+ // Exclude integration tests
+ exclude("**/integration/test/**")
+ } else {
+ dependsOn(tasks.jar)
+ }
+}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
index 961134ec66..ee21c8cb1b 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
@@ -222,7 +222,7 @@ public class GravitinoLanceNameSpaceOperations implements
LanceNamespaceOperatio
client.createCatalog(
catalogName,
Catalog.Type.RELATIONAL,
- "generic-lakehouse",
+ "lakehouse-generic",
"created by Lance REST server",
properties);
response.setProperties(
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
index a1fee0a73e..d90bf08196 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
@@ -93,7 +93,7 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper {
public boolean isLakehouseCatalog(Catalog catalog) {
return catalog.type().equals(Catalog.Type.RELATIONAL)
- && "generic-lakehouse".equals(catalog.provider());
+ && "lakehouse-generic".equals(catalog.provider());
}
public Catalog loadAndValidateLakehouseCatalog(String catalogName) {
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
index d298dbe5e5..8bb6bf37ea 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
@@ -127,7 +127,8 @@ public class GravitinoLanceTableOperations implements
LanceTableOperations {
createTableProperties.put(LANCE_LOCATION, tableLocation);
}
// The format is defined in GenericLakehouseCatalog
- createTableProperties.put("format", "lance");
+ createTableProperties.put(Table.PROPERTY_TABLE_FORMAT, "lance");
+ createTableProperties.put(Table.PROPERTY_EXTERNAL, "true");
Table t;
try {
diff --git a/lance/lance-rest-server/build.gradle.kts
b/lance/lance-rest-server/build.gradle.kts
index 6a825360f3..a74fe4819d 100644
--- a/lance/lance-rest-server/build.gradle.kts
+++ b/lance/lance-rest-server/build.gradle.kts
@@ -114,7 +114,17 @@ tasks {
test {
val testMode = project.properties["testMode"] as? String ?: "embedded"
if (testMode == "embedded") {
- dependsOn(":catalogs:catalog-generic-lakehouse:build")
+ dependsOn(":catalogs:catalog-lakehouse-generic:build")
}
}
}
+
+tasks.test {
+ val skipITs = project.hasProperty("skipITs")
+ if (skipITs) {
+ // Exclude integration tests
+ exclude("**/integration/test/**")
+ } else {
+ dependsOn(tasks.jar)
+ }
+}
diff --git
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
index a63bb39307..51826c3add 100644
---
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
+++
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
@@ -65,6 +65,7 @@ import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.io.FileUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
@@ -111,9 +112,9 @@ public class LanceRESTServiceIT extends BaseIT {
}
@AfterAll
- public void clean() {
+ public void clean() throws IOException {
client.dropMetalake(getLanceRESTServerMetalakeName(), true);
- tempDir.toFile().deleteOnExit();
+ FileUtils.deleteDirectory(tempDir.toFile());
}
@AfterEach
@@ -557,11 +558,8 @@ public class LanceRESTServiceIT extends BaseIT {
// Create a table without location should fail
CreateTableRequest noLocationRequest = new CreateTableRequest();
noLocationRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME,
"no_location_table"));
- LanceNamespaceException noLocationException =
- Assertions.assertThrows(
- LanceNamespaceException.class, () ->
ns.createTable(noLocationRequest, body));
- Assertions.assertTrue(
- noLocationException.getMessage().contains("No location specified for
table"));
+ Assertions.assertThrows(
+ LanceNamespaceException.class, () -> ns.createTable(noLocationRequest,
body));
// Create table with invalid schema should fail
byte[] invalidBody = "".getBytes(Charset.defaultCharset());
@@ -696,7 +694,7 @@ public class LanceRESTServiceIT extends BaseIT {
return metalake.createCatalog(
catalogName,
Catalog.Type.RELATIONAL,
- "generic-lakehouse",
+ "lakehouse-generic",
"catalog for lance rest service tests",
properties);
}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 4a3a5d468a..98f6df0361 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -32,6 +32,7 @@ include("catalogs:hive-metastore-common")
include("catalogs:catalog-lakehouse-iceberg")
include("catalogs:catalog-lakehouse-paimon")
include("catalogs:catalog-lakehouse-hudi")
+include("catalogs:catalog-lakehouse-generic")
include(
"catalogs:catalog-jdbc-common",
"catalogs:catalog-jdbc-doris",
@@ -51,7 +52,6 @@ include(
"clients:client-python",
"clients:cli"
)
-include("catalogs:catalog-generic-lakehouse")
if (gradle.startParameter.projectProperties["enableFuse"]?.toBoolean() ==
true) {
include("clients:filesystem-fuse")
} else {