This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by
this push:
new f313b70f9c [#8915] improvment(catalogs): Polish code for PR #8879
(#8922)
f313b70f9c is described below
commit f313b70f9c59969a7d95d99b72a02d23c17bee05
Author: Mini Yu <[email protected]>
AuthorDate: Thu Oct 30 15:01:12 2025 +0800
[#8915] improvment(catalogs): Polish code for PR #8879 (#8922)
### What changes were proposed in this pull request?
This PR trys to resolve the comments that have not been addressed in
#8879
### Why are the changes needed?
It's an improvement.
Fix: #8915
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
Test locally, and we will add ITs in
https://github.com/apache/gravitino/issues/8921
---
.../main/java/org/apache/gravitino/rel/Table.java | 31 ++++
.../catalog-generic-lakehouse/build.gradle.kts | 1 -
.../GenericLakehouseCatalogOperations.java | 197 ++++++++++++++-------
.../GenericLakehouseCatalogPropertiesMetadata.java | 4 +-
.../GenericLakehouseSchemaPropertiesMetadata.java | 6 +-
.../GenericLakehouseTablePropertiesMetadata.java | 19 +-
.../lakehouse/LakehouseCatalogOperations.java | 14 ++
.../catalog/lakehouse/LakehouseTableFormat.java | 38 ++--
.../lakehouse/lance/LanceCatalogOperations.java | 126 ++++++++-----
.../catalog/lakehouse/utils/EntityConverter.java | 41 ++---
.../catalog/lakehouse/TestPropertiesMetadata.java | 102 +++++++++++
.../lakehouse/utils/TestEntityConverter.java | 78 ++++++++
.../gravitino/catalog/EntityCombinedTable.java | 10 ++
.../catalog/TableOperationDispatcher.java | 181 ++++++++-----------
.../connector/GenericLakehouseColumn.java | 1 +
.../gravitino/connector/GenericLakehouseTable.java | 14 +-
.../apache/gravitino/meta/GenericTableEntity.java | 186 -------------------
.../org/apache/gravitino/meta/TableEntity.java | 112 +++++++++++-
.../storage/relational/mapper/TableMetaMapper.java | 3 -
.../relational/mapper/TableVersionMapper.java | 14 ++
.../mapper/TableVersionSQLProviderFactory.java | 10 ++
.../provider/base/TableMetaBaseSQLProvider.java | 43 +++--
.../provider/base/TableVersionBaseSQLProvider.java | 16 ++
.../postgresql/TableVersionPostgreSQLProvider.java | 11 +-
.../relational/service/TableMetaService.java | 46 +++--
.../storage/relational/utils/POConverters.java | 119 ++++---------
.../gravitino/catalog/TestOperationDispatcher.java | 1 +
.../gravitino/hook/TestTableHookDispatcher.java | 7 +
.../java/org/apache/gravitino/meta/TestEntity.java | 35 +++-
.../gravitino/storage/TestEntityStorage.java | 27 +--
.../storage/relational/TestJDBCBackend.java | 123 +++++++++++++
.../storage/relational/utils/TestPOConverters.java | 3 +-
.../service/rest/LanceNamespaceOperations.java | 2 +
.../lance/service/rest/LanceTableOperations.java | 9 +
scripts/h2/schema-1.1.0-h2.sql | 5 +-
scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql | 5 +-
scripts/mysql/schema-1.1.0-mysql.sql | 5 +-
scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql | 5 +-
scripts/postgresql/schema-1.1.0-postgresql.sql | 8 +-
.../upgrade-1.0.0-to-1.1.0-postgresql.sql | 8 +-
40 files changed, 1038 insertions(+), 628 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/rel/Table.java
b/api/src/main/java/org/apache/gravitino/rel/Table.java
index 619694b1a2..316ba18839 100644
--- a/api/src/main/java/org/apache/gravitino/rel/Table.java
+++ b/api/src/main/java/org/apache/gravitino/rel/Table.java
@@ -99,6 +99,37 @@ public interface Table extends Auditable {
return Collections.emptyMap();
}
+ /**
+ * Table format of the table. For example, in a file-based table, it could
be "parquet", "Lance",
+ * "Iceberg", etc.
+ *
+ * @return the table format name, for more information: LakehouseTableFormat
+ */
+ default String format() {
+ throw new UnsupportedOperationException("Table format is not supported.");
+ }
+
+ /**
+ * Gets the location of the table if the table has a location. For example,
in a file-based table,
+ * it could be the root path where the table data is stored.
+ *
+ * @return the location of the table as a string.
+ */
+ default String location() {
+ throw new UnsupportedOperationException("Table location is not
supported.");
+ }
+
+ /**
+ * Indicates whether the table is external. An external table is a table
that is not managed by
+ * the catalog and the drop operation will not delete the underlying data.
If it's a managed
+ * table, dropping the table will delete the underlying data.
+ *
+ * @return true if the table is external, false otherwise
+ */
+ default boolean external() {
+ return false;
+ }
+
/**
* Table method for working with partitions. If the table does not support
partition operations,
* an {@link UnsupportedOperationException} is thrown.
diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts
b/catalogs/catalog-generic-lakehouse/build.gradle.kts
index df401dcde4..704dbda7e3 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-generic-lakehouse/build.gradle.kts
@@ -43,7 +43,6 @@ dependencies {
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.hadoop3.client.api)
- implementation(libs.hadoop3.client.runtime)
implementation(libs.lance)
annotationProcessor(libs.lombok)
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 358c2dcab5..89a0ef58ef 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -19,15 +19,16 @@
package org.apache.gravitino.catalog.lakehouse;
import static org.apache.gravitino.Entity.EntityType.TABLE;
-import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
@@ -40,8 +41,10 @@ import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.catalog.ManagedSchemaOperations;
import org.apache.gravitino.catalog.lakehouse.lance.LanceCatalogOperations;
+import org.apache.gravitino.catalog.lakehouse.utils.EntityConverter;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.connector.GenericLakehouseTable;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
@@ -51,8 +54,10 @@ import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
-import org.apache.gravitino.meta.GenericTableEntity;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
@@ -61,6 +66,8 @@ import
org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,14 +81,16 @@ public class GenericLakehouseCatalogOperations
private static final String SLASH = "/";
private final ManagedSchemaOperations managedSchemaOps;
- private static final Map<String, LakehouseCatalogOperations>
SUPPORTED_FORMATS =
- Maps.newHashMap();
- private Optional<Path> catalogLakehouseDir;
+ private Optional<Path> catalogLakehouseLocation;
+
+ private static final Map<LakehouseTableFormat, LakehouseCatalogOperations>
SUPPORTED_FORMATS =
+ Maps.newConcurrentMap();
+
private Map<String, String> catalogConfig;
private CatalogInfo catalogInfo;
private HasPropertyMetadata propertiesMetadata;
-
+ private EntityStore store;
/**
* Initializes the generic lakehouse catalog operations with the provided
configuration.
*
@@ -94,15 +103,16 @@ public class GenericLakehouseCatalogOperations
public void initialize(
Map<String, String> conf, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
throws RuntimeException {
- String catalogDir =
+ String catalogLocation =
(String)
propertiesMetadata
.catalogPropertiesMetadata()
- .getOrDefault(conf,
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_DIR);
- this.catalogLakehouseDir =
- StringUtils.isNotBlank(catalogDir)
- ?
Optional.of(catalogDir).map(this::ensureTrailingSlash).map(Path::new)
+ .getOrDefault(conf,
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION);
+ this.catalogLakehouseLocation =
+ StringUtils.isNotBlank(catalogLocation)
+ ?
Optional.of(catalogLocation).map(this::ensureTrailingSlash).map(Path::new)
: Optional.empty();
+ this.store = GravitinoEnv.getInstance().entityStore();
this.catalogConfig = conf;
this.catalogInfo = info;
this.propertiesMetadata = propertiesMetadata;
@@ -165,19 +175,17 @@ public class GenericLakehouseCatalogOperations
@Override
public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
- EntityStore store = GravitinoEnv.getInstance().entityStore();
NameIdentifier identifier = NameIdentifier.of(namespace.levels());
try {
store.get(identifier, Entity.EntityType.SCHEMA, SchemaEntity.class);
- } catch (NoSuchTableException e) {
- throw new NoSuchEntityException(e, "Schema %s does not exist",
namespace);
+ } catch (NoSuchEntityException e) {
+ throw new NoSuchSchemaException(e, "Schema %s does not exist",
namespace);
} catch (IOException ioe) {
throw new RuntimeException("Failed to get schema " + identifier);
}
try {
- List<GenericTableEntity> tableEntityList =
- store.list(namespace, GenericTableEntity.class, TABLE);
+ List<TableEntity> tableEntityList = store.list(namespace,
TableEntity.class, TABLE);
return tableEntityList.stream()
.map(e -> NameIdentifier.of(namespace, e.name()))
.toArray(NameIdentifier[]::new);
@@ -188,7 +196,23 @@ public class GenericLakehouseCatalogOperations
@Override
public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
- throw new UnsupportedOperationException("Not implemented yet.");
+ try {
+ TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
+ return GenericLakehouseTable.builder()
+ .withFormat(tableEntity.getFormat())
+ .withProperties(tableEntity.getProperties())
+ .withAuditInfo(tableEntity.auditInfo())
+ .withSortOrders(tableEntity.getSortOrder())
+ .withPartitioning(tableEntity.getPartitions())
+ .withDistribution(tableEntity.getDistribution())
+ .withColumns(EntityConverter.toColumns(tableEntity.columns()))
+ .withIndexes(tableEntity.getIndexes())
+ .withName(tableEntity.name())
+ .withComment(tableEntity.getComment())
+ .build();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list tables under schema " +
ident.namespace(), e);
+ }
}
@Override
@@ -202,54 +226,92 @@ public class GenericLakehouseCatalogOperations
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
+ LakehouseTableFormat format =
+ (LakehouseTableFormat)
+ propertiesMetadata
+ .tablePropertiesMetadata()
+ .getOrDefault(properties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
+
String tableLocation = calculateTableLocation(schema, ident, properties);
Map<String, String> tableStorageProps = calculateTableStorageProps(schema,
properties);
Map<String, String> newProperties = Maps.newHashMap(properties);
- newProperties.put(LOCATION, tableLocation);
+
newProperties.put(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION,
tableLocation);
newProperties.putAll(tableStorageProps);
- String format = properties.getOrDefault("format", "lance");
- LakehouseCatalogOperations lakehouseCatalogOperations =
- SUPPORTED_FORMATS.compute(
- format,
- (k, v) ->
- v == null
- ? createLakehouseCatalogOperations(
- format, properties, catalogInfo, propertiesMetadata)
- : v);
-
- return lakehouseCatalogOperations.createTable(
- ident, columns, comment, newProperties, partitions, distribution,
sortOrders, indexes);
+ AuditInfo auditInfo =
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .build();
+ IdGenerator idGenerator = GravitinoEnv.getInstance().idGenerator();
+ List<ColumnEntity> columnEntityList =
+ IntStream.range(0, columns.length)
+ .mapToObj(
+ i -> ColumnEntity.toColumnEntity(columns[i], i,
idGenerator.nextId(), auditInfo))
+ .collect(Collectors.toList());
+
+ TableEntity entityToStore;
+ try {
+ entityToStore =
+ TableEntity.builder()
+ .withName(ident.name())
+ .withNamespace(ident.namespace())
+ .withColumns(columnEntityList)
+ .withFormat(format.lowerName())
+ .withProperties(newProperties)
+ .withComment(comment)
+ .withPartitions(partitions)
+ .withSortOrder(sortOrders)
+ .withDistribution(distribution)
+ .withIndexes(indexes)
+ .withId(idGenerator.nextId())
+ .withAuditInfo(auditInfo)
+ .build();
+ store.put(entityToStore);
+ LakehouseCatalogOperations lanceCatalogOperations =
+ getLakehouseCatalogOperations(newProperties);
+ return lanceCatalogOperations.createTable(
+ ident, columns, comment, newProperties, partitions, distribution,
sortOrders, indexes);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create table " + ident, e);
+ }
}
private String calculateTableLocation(
Schema schema, NameIdentifier tableIdent, Map<String, String>
tableProperties) {
- String tableLocation = tableProperties.get(LOCATION);
+ String tableLocation =
+ (String)
+ propertiesMetadata
+ .tablePropertiesMetadata()
+ .getOrDefault(
+ tableProperties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
if (StringUtils.isNotBlank(tableLocation)) {
return ensureTrailingSlash(tableLocation);
}
- String schemaLocation = schema.properties() == null ? null :
schema.properties().get(LOCATION);
+ String schemaLocation =
+ schema.properties() == null
+ ? null
+ :
schema.properties().get(GenericLakehouseSchemaPropertiesMetadata.LAKEHOUSE_LOCATION);
// If we do not set location in table properties, and schema location is
set, use schema
- // location
- // as the base path.
+ // location as the base path.
if (StringUtils.isNotBlank(schemaLocation)) {
return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
}
// If the schema location is not set, use catalog lakehouse dir as the
base path. Or else, throw
// an exception.
- if (catalogLakehouseDir.isEmpty()) {
+ if (catalogLakehouseLocation.isEmpty()) {
throw new RuntimeException(
String.format(
"No location specified for table %s, you need to set location
either in catalog, schema, or table properties",
tableIdent));
}
- String catalogLakehousePath = catalogLakehouseDir.get().toString();
+ String catalogLakehousePath = catalogLakehouseLocation.get().toString();
String[] nsLevels = tableIdent.namespace().levels();
String schemaName = nsLevels[nsLevels.length - 1];
return ensureTrailingSlash(catalogLakehousePath)
@@ -262,21 +324,12 @@ public class GenericLakehouseCatalogOperations
@Override
public Table alterTable(NameIdentifier ident, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
- EntityStore store = GravitinoEnv.getInstance().entityStore();
Namespace namespace = ident.namespace();
try {
- GenericTableEntity tableEntity =
- store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
+ TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
Map<String, String> tableProperties = tableEntity.getProperties();
- String format = tableProperties.getOrDefault("format", "lance");
LakehouseCatalogOperations lakehouseCatalogOperations =
- SUPPORTED_FORMATS.compute(
- format,
- (k, v) ->
- v == null
- ? createLakehouseCatalogOperations(
- format, tableProperties, catalogInfo,
propertiesMetadata)
- : v);
+ getLakehouseCatalogOperations(tableProperties);
return lakehouseCatalogOperations.alterTable(ident, changes);
} catch (IOException e) {
throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
@@ -285,28 +338,36 @@ public class GenericLakehouseCatalogOperations
@Override
public boolean dropTable(NameIdentifier ident) {
- EntityStore store = GravitinoEnv.getInstance().entityStore();
- GenericTableEntity tableEntity;
+ Namespace namespace = ident.namespace();
try {
- tableEntity = store.get(ident, Entity.EntityType.TABLE,
GenericTableEntity.class);
- } catch (NoSuchEntityException e) {
- LOG.warn("Table {} does not exist, skip dropping.", ident);
+ TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
+ LakehouseCatalogOperations lakehouseCatalogOperations =
+ getLakehouseCatalogOperations(tableEntity.getProperties());
+ return lakehouseCatalogOperations.dropTable(ident);
+ } catch (NoSuchTableException e) {
+ LOG.warn("Table {} does not exist, skip dropping it.", ident);
return false;
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to get table " + ident);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
}
+ }
- Map<String, String> tableProperties = tableEntity.getProperties();
- String format = tableProperties.getOrDefault("format", "lance");
- LakehouseCatalogOperations lakehouseCatalogOperations =
- SUPPORTED_FORMATS.compute(
- format,
- (k, v) ->
- v == null
- ? createLakehouseCatalogOperations(
- format, tableProperties, catalogInfo,
propertiesMetadata)
- : v);
- return lakehouseCatalogOperations.dropTable(ident);
+ private LakehouseCatalogOperations getLakehouseCatalogOperations(
+ Map<String, String> tableProperties) {
+ LakehouseTableFormat format =
+ (LakehouseTableFormat)
+ propertiesMetadata
+ .tablePropertiesMetadata()
+ .getOrDefault(
+ tableProperties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
+
+ return SUPPORTED_FORMATS.compute(
+ format,
+ (k, v) ->
+ v == null
+ ? createLakehouseCatalogOperations(
+ format, tableProperties, catalogInfo, propertiesMetadata)
+ : v);
}
private String ensureTrailingSlash(String path) {
@@ -314,13 +375,13 @@ public class GenericLakehouseCatalogOperations
}
private LakehouseCatalogOperations createLakehouseCatalogOperations(
- String format,
+ LakehouseTableFormat format,
Map<String, String> properties,
CatalogInfo catalogInfo,
HasPropertyMetadata propertiesMetadata) {
LakehouseCatalogOperations operations;
- switch (format.toLowerCase()) {
- case "lance":
+ switch (format) {
+ case LANCE:
operations = new LanceCatalogOperations();
break;
default:
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
index e381558c32..b8c3958e9a 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
@@ -31,7 +31,7 @@ import org.apache.gravitino.connector.PropertyEntry;
public class GenericLakehouseCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata {
- public static final String LAKEHOUSE_DIR = "lakehouse-dir";
+ public static final String LAKEHOUSE_LOCATION = "location";
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
@@ -39,7 +39,7 @@ public class GenericLakehouseCatalogPropertiesMetadata
extends BaseCatalogProper
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringOptionalPropertyEntry(
- LAKEHOUSE_DIR,
+ LAKEHOUSE_LOCATION,
"The root directory of the lakehouse catalog.",
false /* immutable */,
null, /* defaultValue */
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
index a6da0ac2de..3dd0abf81d 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
@@ -29,8 +29,8 @@ import org.apache.gravitino.connector.BasePropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
public class GenericLakehouseSchemaPropertiesMetadata extends
BasePropertiesMetadata {
- public static final String LAKEHOUSE_DIR =
- GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_DIR;
+ public static final String LAKEHOUSE_LOCATION =
+ GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION;
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
@@ -38,7 +38,7 @@ public class GenericLakehouseSchemaPropertiesMetadata extends
BasePropertiesMeta
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringOptionalPropertyEntry(
- LAKEHOUSE_DIR,
+ LAKEHOUSE_LOCATION,
"The root directory of the lakehouse schema.",
false /* immutable */,
null, /* defaultValue */
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
index e9a61a6b0f..f8ca11b0a0 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
@@ -18,6 +18,7 @@
*/
package org.apache.gravitino.catalog.lakehouse;
+import static org.apache.gravitino.connector.PropertyEntry.enumPropertyEntry;
import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
import com.google.common.collect.ImmutableList;
@@ -28,7 +29,8 @@ import org.apache.gravitino.connector.BasePropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
public class GenericLakehouseTablePropertiesMetadata extends
BasePropertiesMetadata {
- public static final String LOCATION = "location";
+ public static final String LAKEHOUSE_LOCATION = "location";
+ public static final String LAKEHOUSE_FORMAT = "format";
public static final String LANCE_TABLE_STORAGE_OPTION_PREFIX =
"lance.storage.";
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
@@ -37,11 +39,20 @@ public class GenericLakehouseTablePropertiesMetadata
extends BasePropertiesMetad
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringOptionalPropertyEntry(
- LOCATION,
- "The root directory of the lakehouse table.",
- true /* immutable */,
+ LAKEHOUSE_LOCATION,
+ "The root directory of the lakehouse catalog.",
+ false /* immutable */,
null, /* defaultValue */
false /* hidden */),
+ enumPropertyEntry(
+ LAKEHOUSE_FORMAT,
+ "The table format of the lakehouse table (e.g., iceberg,
delta, lance)",
+ true /* required */,
+ true /* immutable */,
+ LakehouseTableFormat.class /* enumClass */,
+ null /* defaultValue */,
+ false /* hidden */,
+ false /* reserved */),
PropertyEntry.stringOptionalPropertyPrefixEntry(
LANCE_TABLE_STORAGE_OPTION_PREFIX,
"The storage options passed to Lance table.",
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
index 66c7147626..d5b95845db 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
@@ -22,4 +22,18 @@ package org.apache.gravitino.catalog.lakehouse;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.rel.TableCatalog;
+/**
+ * Interface for detailed lakehouse catalog operations, combining catalog
operations and table
+ * catalog. {@link GenericLakehouseCatalog} will try to use this interface to
provide detailed
+ * lakehouse catalog operations.
+ *
+ * <pre>
+ * GenericLakehouseCatalog.createTable()
+ * -> LakehouseCatalogOperations.createTable()
+ * -> LanceTableOperations.createTable()
+ * -> IcebergTableOperations.createTable()
+ * -> DeltaTableOperations.createTable()
+ * ...
+ * </pre>
+ */
public interface LakehouseCatalogOperations extends CatalogOperations,
TableCatalog {}
diff --git a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
similarity index 58%
copy from api/src/main/java/org/apache/gravitino/rel/GenericTable.java
copy to
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
index 4796421c53..57d0230f48 100644
--- a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseTableFormat.java
@@ -17,31 +17,25 @@
* under the License.
*/
-package org.apache.gravitino.rel;
+package org.apache.gravitino.catalog.lakehouse;
-/** A generic table interface that extends the Table interface. */
-public interface GenericTable extends Table {
+public enum LakehouseTableFormat {
+ LANCE,
- /**
- * Formats the table as a string representation.
- *
- * @return the formatted string representation of the table
- */
- String format();
+ DELTA,
- /**
- * Gets the location of the table.
- *
- * @return the location of the table
- */
- String location();
+ ICEBERG;
- /**
- * Indicates whether the table is external.
- *
- * @return true if the table is external, false otherwise
- */
- default boolean external() {
- return false;
+ public String lowerName() {
+ return this.name().toLowerCase();
+ }
+
+ public static LakehouseTableFormat fromFormatName(String type) {
+ for (LakehouseTableFormat tableType : LakehouseTableFormat.values()) {
+ if (tableType.name().equalsIgnoreCase(type)) {
+ return tableType;
+ }
+ }
+ throw new IllegalArgumentException("Unknown LakehouseTableFormat: " +
type);
}
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 9572c656d2..e27f8032ab 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -19,8 +19,8 @@
package org.apache.gravitino.catalog.lakehouse.lance;
+import static org.apache.gravitino.Entity.EntityType.TABLE;
import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
-import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -46,7 +46,10 @@ import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata;
import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
+import org.apache.gravitino.catalog.lakehouse.LakehouseTableFormat;
+import org.apache.gravitino.catalog.lakehouse.utils.EntityConverter;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.GenericLakehouseTable;
import org.apache.gravitino.connector.HasPropertyMetadata;
@@ -55,9 +58,8 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.GenericTableEntity;
+import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.GenericTable;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -66,19 +68,16 @@ import
org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
import org.apache.gravitino.utils.PrincipalUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
public class LanceCatalogOperations implements LakehouseCatalogOperations {
- private Map<String, String> lancePropertiesMap;
+ private EntityStore store;
@Override
public void initialize(
Map<String, String> config, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
throws RuntimeException {
- lancePropertiesMap = ImmutableMap.copyOf(config);
+ store = GravitinoEnv.getInstance().entityStore();
}
@Override
@@ -95,13 +94,16 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
@Override
public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
- return new NameIdentifier[0];
+ // No need to do nothing here as GenericLakehouseCatalogOperations will do
the work.
+ throw new UnsupportedOperationException(
+ "We should not reach here as we could get table info" + "from
metastore directly.");
}
@Override
public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
- // Should not come here.
- return null;
+ // No need to do nothing here as GenericLakehouseCatalogOperations will do
the work.
+ throw new UnsupportedOperationException(
+ "We should not reach here as we could get table info" + "from
metastore directly.");
}
@Override
@@ -116,7 +118,7 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
// Ignore partitions, distributions, sortOrders, and indexes for Lance
tables;
- String location = properties.get(LOCATION);
+ String location =
properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
Map<String, String> storageProps =
properties.entrySet().stream()
.filter(e ->
e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX))
@@ -124,7 +126,8 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
Collectors.toMap(
e ->
e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()),
Map.Entry::getValue));
- try (Dataset dataset =
+
+ try (Dataset ignored =
Dataset.create(
new RootAllocator(),
location,
@@ -145,7 +148,7 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
.build())
.withPartitioning(partitions)
.withSortOrders(sortOrders)
- .withFormat("lance")
+ .withFormat(LakehouseTableFormat.LANCE.lowerName())
.build();
}
}
@@ -167,6 +170,7 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
// Lance only supports adding indexes for now.
List<Index> addedIndexes = Lists.newArrayList();
+ // Only support for adding index for now.
for (TableChange change : changes) {
if (change instanceof TableChange.AddIndex addIndexChange) {
Index index =
@@ -179,17 +183,66 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
}
}
- EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
- GenericTableEntity entity;
+ TableEntity updatedEntity;
try {
- entity = entityStore.get(ident, Entity.EntityType.TABLE,
GenericTableEntity.class);
+ TableEntity entity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
+ updatedEntity =
+ store.update(
+ ident,
+ TableEntity.class,
+ TABLE,
+ tableEntity ->
+ TableEntity.builder()
+ .withId(tableEntity.id())
+ .withName(tableEntity.name())
+ .withNamespace(tableEntity.namespace())
+ .withFormat(entity.getFormat())
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(tableEntity.auditInfo().creator())
+
.withCreateTime(tableEntity.auditInfo().createTime())
+
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+ .withLastModifiedTime(Instant.now())
+ .build())
+ .withColumns(tableEntity.columns())
+ .withIndexes(
+ ArrayUtils.addAll(
+ entity.getIndexes(), addedIndexes.toArray(new
Index[0])))
+ .withDistribution(tableEntity.getDistribution())
+ .withPartitions(tableEntity.getPartitions())
+ .withSortOrder(tableEntity.getSortOrder())
+ .withProperties(tableEntity.getProperties())
+ .withComment(tableEntity.getComment())
+ .build());
+
+ // Add indexes to Lance dataset
+ addLanceIndex(updatedEntity, addedIndexes);
+
+ // return the updated table
+ return GenericLakehouseTable.builder()
+ .withFormat(updatedEntity.getFormat())
+ .withProperties(updatedEntity.getProperties())
+ .withAuditInfo(updatedEntity.auditInfo())
+ .withSortOrders(updatedEntity.getSortOrder())
+ .withPartitioning(updatedEntity.getPartitions())
+ .withDistribution(updatedEntity.getDistribution())
+ .withColumns(EntityConverter.toColumns(updatedEntity.columns()))
+ .withIndexes(updatedEntity.getIndexes())
+ .withName(updatedEntity.name())
+ .withComment(updatedEntity.getComment())
+ .build();
} catch (NoSuchEntityException e) {
throw new NoSuchTableException("No such table: %s", ident);
} catch (IOException ioe) {
throw new RuntimeException("Failed to load table entity for: " + ident,
ioe);
}
+ }
- String location = entity.getProperties().get("location");
+ private void addLanceIndex(TableEntity updatedEntity, List<Index>
addedIndexes) {
+ String location =
+ updatedEntity
+ .getProperties()
+ .get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
// For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
for (Index index : addedIndexes) {
@@ -205,28 +258,7 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
indexParams,
true);
}
- } catch (Exception e) {
- throw new RuntimeException("Failed to alter Lance table: " + ident, e);
}
-
- GenericTable oldTable = entity.toGenericTable();
- Index[] newIndexes = oldTable.index();
- for (Index index : addedIndexes) {
- newIndexes = ArrayUtils.add(newIndexes, index);
- }
-
- return GenericLakehouseTable.builder()
- .withFormat(oldTable.format())
- .withProperties(oldTable.properties())
- .withAuditInfo((AuditInfo) oldTable.auditInfo())
- .withSortOrders(oldTable.sortOrder())
- .withPartitioning(oldTable.partitioning())
- .withDistribution(oldTable.distribution())
- .withColumns(oldTable.columns())
- .withIndexes(newIndexes)
- .withName(oldTable.name())
- .withComment(oldTable.comment())
- .build();
}
private IndexParams getIndexParamsByIndexType(IndexType indexType) {
@@ -249,10 +281,18 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
@Override
public boolean dropTable(NameIdentifier ident) {
try {
- String location = lancePropertiesMap.get("location");
- // Remove the directory on storage
- FileSystem fs = FileSystem.get(new Configuration());
- return fs.delete(new Path(location), true);
+ TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
+ Map<String, String> lancePropertiesMap = tableEntity.getProperties();
+ String location =
+
lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
+
+ if (!store.delete(ident, Entity.EntityType.TABLE)) {
+ throw new RuntimeException("Failed to drop Lance table: " +
ident.name());
+ }
+
+ // Drop the Lance dataset from cloud storage.
+ Dataset.drop(location, ImmutableMap.of());
+ return true;
} catch (IOException e) {
throw new RuntimeException("Failed to drop Lance table: " +
ident.name(), e);
}
diff --git a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
similarity index 50%
rename from api/src/main/java/org/apache/gravitino/rel/GenericTable.java
rename to
catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
index 4796421c53..734309a444 100644
--- a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/utils/EntityConverter.java
@@ -17,31 +17,26 @@
* under the License.
*/
-package org.apache.gravitino.rel;
+package org.apache.gravitino.catalog.lakehouse.utils;
-/** A generic table interface that extends the Table interface. */
-public interface GenericTable extends Table {
+import java.util.List;
+import org.apache.gravitino.connector.GenericLakehouseColumn;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.rel.Column;
- /**
- * Formats the table as a string representation.
- *
- * @return the formatted string representation of the table
- */
- String format();
-
- /**
- * Gets the location of the table.
- *
- * @return the location of the table
- */
- String location();
+public class EntityConverter {
+ public static Column[] toColumns(List<ColumnEntity> columnEntities) {
+ return
columnEntities.stream().map(EntityConverter::toColumn).toArray(Column[]::new);
+ }
- /**
- * Indicates whether the table is external.
- *
- * @return true if the table is external, false otherwise
- */
- default boolean external() {
- return false;
+ private static Column toColumn(ColumnEntity columnEntity) {
+ return GenericLakehouseColumn.builder()
+ .withName(columnEntity.name())
+ .withComment(columnEntity.comment())
+ .withAutoIncrement(columnEntity.autoIncrement())
+ .withNullable(columnEntity.nullable())
+ .withType(columnEntity.dataType())
+ .withDefaultValue(columnEntity.defaultValue())
+ .build();
}
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
new file mode 100644
index 0000000000..8dfd3b5ce0
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/TestPropertiesMetadata.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertiesMetadata;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestPropertiesMetadata {
+ public static GenericLakehouseCatalog genericLakehouseCatalog;
+
+ @BeforeAll
+ static void init() {
+ genericLakehouseCatalog = new GenericLakehouseCatalog();
+ }
+
+ @Test
+ void testCatalogPropertiesMetadata() {
+ PropertiesMetadata catalogPropertiesMetadata =
+ genericLakehouseCatalog.catalogPropertiesMetadata();
+ Assertions.assertNotNull(catalogPropertiesMetadata);
+
+ Map<String, String> catalogProperties =
+ ImmutableMap.of(
+ "storage.type", "s3",
+ "storage.s3.bucket", "my-bucket",
+ "storage.s3.region", "us-west-2",
+ "location", "/tmp/test1");
+
+ String catalogLocation =
+ (String)
+ catalogPropertiesMetadata.getOrDefault(
+ catalogProperties,
GenericLakehouseCatalogPropertiesMetadata.LAKEHOUSE_LOCATION);
+ Assertions.assertEquals("/tmp/test1", catalogLocation);
+ }
+
+ @Test
+ void testSchemaPropertiesMetadata() {
+ PropertiesMetadata schemaPropertiesMetadata =
+ genericLakehouseCatalog.schemaPropertiesMetadata();
+ Assertions.assertNotNull(schemaPropertiesMetadata);
+
+ Map<String, String> schemaProperties =
+ ImmutableMap.of(
+ "storage.type", "s3",
+ "storage.s3.bucket", "my-bucket",
+ "storage.s3.region", "us-west-2",
+ "location", "/tmp/test_schema");
+
+ String schemaLocation =
+ (String)
+ schemaPropertiesMetadata.getOrDefault(
+ schemaProperties,
GenericLakehouseSchemaPropertiesMetadata.LAKEHOUSE_LOCATION);
+ Assertions.assertEquals("/tmp/test_schema", schemaLocation);
+ }
+
+ @Test
+ void testTablePropertiesMetadata() {
+ PropertiesMetadata tablePropertiesMetadata =
genericLakehouseCatalog.tablePropertiesMetadata();
+ Assertions.assertNotNull(tablePropertiesMetadata);
+
+ Map<String, String> tableProperties =
+ ImmutableMap.of(
+ "storage.type", "s3",
+ "storage.s3.bucket", "my-bucket",
+ "storage.s3.region", "us-west-2",
+ "location", "/tmp/test_table",
+ "format", "iceberg");
+
+ String tableLocation =
+ (String)
+ tablePropertiesMetadata.getOrDefault(
+ tableProperties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
+ Assertions.assertEquals("/tmp/test_table", tableLocation);
+
+ LakehouseTableFormat tableFormat =
+ (LakehouseTableFormat)
+ tablePropertiesMetadata.getOrDefault(
+ tableProperties,
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
+ Assertions.assertEquals(LakehouseTableFormat.ICEBERG, tableFormat);
+ }
+}
diff --git
a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
new file mode 100644
index 0000000000..9da5ed530e
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/utils/TestEntityConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.utils;
+
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.util.List;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestEntityConverter {
+
+ @Test
+ void testToColumns() {
+ AuditInfo auditInfo = AuditInfo.builder().build();
+ List<ColumnEntity> columnEntities =
+ List.of(
+ ColumnEntity.builder()
+ .withName("id")
+ .withId(1L)
+ .withDataType(Types.IntegerType.get())
+ .withComment("Identifier")
+ .withAutoIncrement(true)
+ .withNullable(false)
+ .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+ .withAuditInfo(auditInfo)
+ .withPosition(1)
+ .build(),
+ ColumnEntity.builder()
+ .withName("name")
+ .withId(2L)
+ .withDataType(Types.StringType.get())
+ .withComment("Name of the entity")
+ .withAutoIncrement(false)
+ .withNullable(true)
+ .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+ .withPosition(2)
+ .withAuditInfo(auditInfo)
+ .build());
+ var columns = EntityConverter.toColumns(columnEntities);
+ Assertions.assertEquals(2, columns.length);
+ for (var column : columns) {
+ if (column.name().equals("id")) {
+ Assertions.assertEquals(Types.IntegerType.get(), column.dataType());
+ Assertions.assertEquals("Identifier", column.comment());
+ Assertions.assertTrue(column.autoIncrement());
+ Assertions.assertFalse(column.nullable());
+ Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, column.defaultValue());
+ } else if (column.name().equals("name")) {
+ Assertions.assertEquals(Types.StringType.get(), column.dataType());
+ Assertions.assertEquals("Name of the entity", column.comment());
+ Assertions.assertFalse(column.autoIncrement());
+ Assertions.assertTrue(column.nullable());
+ Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, column.defaultValue());
+ }
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
index 921b14dcdf..7a0d90ead7 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
@@ -129,6 +129,16 @@ public final class EntityCombinedTable implements Table {
return table.index();
}
+ @Override
+ public String format() {
+ return table.format();
+ }
+
+ @Override
+ public String location() {
+ return table.location();
+ }
+
public boolean imported() {
return imported;
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 27119f0c99..e549b806d8 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -27,7 +27,6 @@ import static
org.apache.gravitino.utils.NameIdentifierUtil.getSchemaIdentifier;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
-import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
@@ -37,15 +36,16 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.gravitino.Catalog;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.CatalogManager.CatalogWrapper;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
@@ -54,10 +54,8 @@ import org.apache.gravitino.lock.LockType;
import org.apache.gravitino.lock.TreeLockUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.GenericTable;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -117,6 +115,18 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
*/
@Override
public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ if (isManagedTable(catalogIdent)) {
+ return TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.READ,
+ () ->
+ doWithCatalog(
+ catalogIdent,
+ c -> c.doWithTableOps(t -> t.loadTable(ident)),
+ NoSuchTableException.class));
+ }
+
EntityCombinedTable entityCombinedTable =
TreeLockUtils.doWithTreeLock(ident, LockType.READ, () ->
internalLoadTable(ident));
@@ -240,6 +250,17 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
NoSuchTableException.class,
IllegalArgumentException.class);
+ if (isManagedTable(catalogIdent)) {
+ // For generic lakehouse catalog, all operations will be
dispatched to the underlying
+ // catalog.
+ return EntityCombinedTable.of(alteredTable)
+ .withHiddenProperties(
+ getHiddenPropertyNames(
+ getCatalogIdentifier(ident),
+ HasPropertyMetadata::tablePropertiesMetadata,
+ alteredTable.properties()));
+ }
+
StringIdentifier stringId =
getStringIdFromProperties(alteredTable.properties());
// Case 1: The table is not created by Gravitino and this table is
never imported.
TableEntity te = null;
@@ -262,57 +283,6 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
tableId = te.id();
}
- if (isGenericLakehouseCatalog(catalogIdent)) {
- // For generic lakehouse catalog, we only update the table entity
with basic info.
- GenericTableEntity genericTableEntity =
- operateOnEntity(
- ident, id -> store.get(id, TABLE,
GenericTableEntity.class), "GET", tableId);
- if (genericTableEntity == null) {
- throw new NoSuchTableException("No such table: %s", ident);
- }
-
- GenericTable genericTable = (GenericTable) alteredTable;
- GenericTableEntity updatedGenericTableEntity =
- operateOnEntity(
- ident,
- id ->
- store.update(
- id,
- GenericTableEntity.class,
- TABLE,
- tableEntity ->
- GenericTableEntity.getBuilder()
- .withId(tableEntity.id())
- .withName(alteredTable.name())
- .withNamespace(getNewNamespace(ident,
changes))
- .withFormat(genericTable.format())
- .withAuditInfo(
- AuditInfo.builder()
-
.withCreator(tableEntity.auditInfo().creator())
-
.withCreateTime(tableEntity.auditInfo().createTime())
- .withLastModifier(
-
PrincipalUtils.getCurrentPrincipal().getName())
-
.withLastModifiedTime(Instant.now())
- .build())
- .withColumns(tableEntity.columns())
- .withIndexes(genericTable.index())
-
.withDistribution(genericTable.distribution())
-
.withPartitions(genericTable.partitioning())
- .withSortOrder(genericTable.sortOrder())
- .withProperties(genericTable.properties())
- .withComment(genericTable.comment())
- .build()),
- "UPDATE",
- tableId);
-
- return EntityCombinedTable.of(alteredTable,
updatedGenericTableEntity)
- .withHiddenProperties(
- getHiddenPropertyNames(
- getCatalogIdentifier(ident),
- HasPropertyMetadata::tablePropertiesMetadata,
- alteredTable.properties()));
- }
-
TableEntity updatedTableEntity =
operateOnEntity(
ident,
@@ -371,6 +341,13 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
LockType.WRITE,
() -> {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ if (isManagedTable(catalogIdent)) {
+ return doWithCatalog(
+ catalogIdent,
+ c -> c.doWithTableOps(t -> t.dropTable(ident)),
+ RuntimeException.class);
+ }
+
boolean droppedFromCatalog =
doWithCatalog(
catalogIdent,
@@ -542,19 +519,6 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
}
private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
- NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- if (isGenericLakehouseCatalog(catalogIdent)) {
- try {
- GenericTableEntity tableEntity = store.get(ident, TABLE,
GenericTableEntity.class);
- if (tableEntity != null) {
- GenericTable genericTable = tableEntity.toGenericTable();
- return EntityCombinedTable.of(genericTable).withImported(true);
- }
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to load table entity " + ident,
ioe);
- }
- }
-
NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
Table table =
doWithCatalog(
@@ -627,6 +591,32 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
return null;
}),
IllegalArgumentException.class);
+
+ if (isManagedTable(catalogIdent)) {
+ // For generic lakehouse catalog, all operations will be dispatched to
the underlying catalog.
+ Table table =
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithTableOps(
+ t ->
+ t.createTable(
+ ident,
+ columns,
+ comment,
+ properties,
+ partitions == null ? EMPTY_TRANSFORM :
partitions,
+ distribution == null ? Distributions.NONE :
distribution,
+ sortOrders == null ? new SortOrder[0] :
sortOrders,
+ indexes == null ? Indexes.EMPTY_INDEXES :
indexes)),
+ NoSuchSchemaException.class,
+ TableAlreadyExistsException.class);
+ return EntityCombinedTable.of(table)
+ .withHiddenProperties(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
+ }
+
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will
handle this
// StringIdentifier to make sure only when the operation is successful,
the related
@@ -665,41 +655,19 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
.mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i,
idGenerator.nextId(), audit))
.collect(Collectors.toList());
- TableEntity tableEntity;
- if (isGenericLakehouseCatalog(catalogIdent)) {
- // For generic lakehouse catalog, we only create the table entity with
basic info.
- GenericTable genericTable = (GenericTable) table;
- tableEntity =
- GenericTableEntity.getBuilder()
- .withId(uid)
- .withName(ident.name())
- .withNamespace(ident.namespace())
- .withFormat(genericTable.format())
- .withAuditInfo(audit)
- .withColumns(columnEntityList)
- .withIndexes(table.index())
- .withDistribution(table.distribution())
- .withFormat(genericTable.format())
- .withPartitions(table.partitioning())
- .withSortOrder(table.sortOrder())
- .withProperties(genericTable.properties())
- .withComment(genericTable.comment())
- .build();
- } else {
- tableEntity =
- TableEntity.builder()
- .withId(uid)
- .withName(ident.name())
- .withNamespace(ident.namespace())
- .withColumns(columnEntityList)
- .withAuditInfo(audit)
- .build();
- }
+ TableEntity tableEntity =
+ TableEntity.builder()
+ .withId(uid)
+ .withName(ident.name())
+ .withNamespace(ident.namespace())
+ .withColumns(columnEntityList)
+ .withAuditInfo(audit)
+ .build();
try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
- if (isGenericLakehouseCatalog(catalogIdent)) {
+ if (isManagedTable(catalogIdent)) {
// Drop table
doWithCatalog(
catalogIdent, c -> c.doWithTableOps(t -> t.dropTable(ident)),
RuntimeException.class);
@@ -727,16 +695,13 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
.collect(Collectors.toList());
}
- private boolean isGenericLakehouseCatalog(NameIdentifier catalogIdent) {
+ private boolean isManagedTable(NameIdentifier catalogIdent) {
CatalogManager catalogManager =
GravitinoEnv.getInstance().catalogManager();
- try {
- Catalog catalog = catalogManager.loadCatalog(catalogIdent);
- return catalog.type() == Catalog.Type.RELATIONAL
- && catalog.provider().equals("generic-lakehouse");
- } catch (NoSuchEntityException e) {
- LOG.warn("Catalog not found: {}", catalogIdent, e);
- return false;
- }
+ CatalogWrapper wrapper = catalogManager.loadCatalogAndWrap(catalogIdent);
+ Capability capability = wrapper.catalog().capability();
+
+ CapabilityResult result =
capability.managedStorage(Capability.Scope.TABLE);
+ return result == CapabilityResult.SUPPORTED;
}
private boolean isSameColumn(Column left, int columnPosition, ColumnEntity
right) {
diff --git
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
index b84b265256..4fb7b5d12d 100644
---
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
+++
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
@@ -50,6 +50,7 @@ public class GenericLakehouseColumn extends BaseColumn {
hiveColumn.dataType = dataType;
hiveColumn.nullable = nullable;
hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET :
defaultValue;
+ hiveColumn.autoIncrement = autoIncrement;
return hiveColumn;
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
index a9379a5b31..7206125386 100644
---
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
+++
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
@@ -19,12 +19,7 @@
package org.apache.gravitino.connector;
-import org.apache.gravitino.rel.GenericTable;
-
-public class GenericLakehouseTable extends BaseTable implements GenericTable {
- @SuppressWarnings("unused")
- private String schemaName;
-
+public class GenericLakehouseTable extends BaseTable {
private String format;
public static Builder builder() {
@@ -53,14 +48,8 @@ public class GenericLakehouseTable extends BaseTable
implements GenericTable {
public static class Builder extends BaseTableBuilder<Builder,
GenericLakehouseTable> {
- private String schemaName;
private String format;
- public Builder withSchemaName(String schemaName) {
- this.schemaName = schemaName;
- return this;
- }
-
public Builder withFormat(String format) {
this.format = format;
return this;
@@ -69,7 +58,6 @@ public class GenericLakehouseTable extends BaseTable
implements GenericTable {
@Override
protected GenericLakehouseTable internalBuild() {
GenericLakehouseTable genericLakehouseTable = new
GenericLakehouseTable();
- genericLakehouseTable.schemaName = this.schemaName;
genericLakehouseTable.format = this.format;
genericLakehouseTable.columns = this.columns;
genericLakehouseTable.comment = this.comment;
diff --git
a/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
b/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
deleted file mode 100644
index 4b2dd9ad03..0000000000
--- a/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.gravitino.meta;
-
-import com.google.common.collect.Maps;
-import java.util.Map;
-import lombok.Getter;
-import org.apache.gravitino.Field;
-import org.apache.gravitino.connector.GenericLakehouseColumn;
-import org.apache.gravitino.connector.GenericLakehouseTable;
-import org.apache.gravitino.rel.GenericTable;
-import org.apache.gravitino.rel.expressions.distributions.Distribution;
-import org.apache.gravitino.rel.expressions.sorts.SortOrder;
-import org.apache.gravitino.rel.expressions.transforms.Transform;
-import org.apache.gravitino.rel.indexes.Index;
-
-@Getter
-public class GenericTableEntity extends TableEntity {
- public static final Field FORMAT = Field.required("format", Long.class, "The
table's format");
- public static final Field PROPERTIES =
- Field.optional("properties", Map.class, "The table's properties");
-
- public static final Field PARTITIONS =
- Field.optional("partitions", Transform[].class, "The table's partition");
-
- public static final Field SORT_ORDER =
- Field.optional("sortOrders", SortOrder[].class, "The table's sort
order");
-
- public static final Field DISTRIBUTION =
- Field.optional("distribution", Distribution.class, "The table's
distribution");
-
- public static final Field INDEXES =
- Field.optional("indexes", Index[].class, "The table's indexes");
-
- public static final Field COMMENT =
- Field.optional("comment", String.class, "The table's comment");
-
- public GenericTableEntity() {
- super();
- }
-
- @Override
- public Map<Field, Object> fields() {
- Map<Field, Object> superFields = super.fields();
- Map<Field, Object> result = Maps.newHashMap(superFields);
- result.put(FORMAT, format);
- result.put(PROPERTIES, properties);
- result.put(PARTITIONS, partitions);
- result.put(SORT_ORDER, sortOrder);
- result.put(DISTRIBUTION, distribution);
- result.put(INDEXES, indexes);
- result.put(COMMENT, comment);
-
- return result;
- }
-
- private String format;
- @Getter private Map<String, String> properties;
- private Transform[] partitions;
- private SortOrder[] sortOrder;
- private Distribution distribution;
- private Index[] indexes;
- private String comment;
-
- public static class Builder {
- private final GenericTableEntity tableEntity;
-
- public Builder() {
- this.tableEntity = new GenericTableEntity();
- }
-
- public Builder withId(Long id) {
- tableEntity.id = id;
- return this;
- }
-
- public Builder withName(String name) {
- tableEntity.name = name;
- return this;
- }
-
- public Builder withAuditInfo(AuditInfo auditInfo) {
- tableEntity.auditInfo = auditInfo;
- return this;
- }
-
- public Builder withColumns(java.util.List<ColumnEntity> columns) {
- tableEntity.columns = columns;
- return this;
- }
-
- public Builder withNamespace(org.apache.gravitino.Namespace namespace) {
- tableEntity.namespace = namespace;
- return this;
- }
-
- public Builder withFormat(String format) {
- tableEntity.format = format;
- return this;
- }
-
- public Builder withProperties(Map<String, String> properties) {
- tableEntity.properties = properties;
- return this;
- }
-
- public Builder withPartitions(Transform[] partitions) {
- tableEntity.partitions = partitions;
- return this;
- }
-
- public Builder withSortOrder(SortOrder[] sortOrder) {
- tableEntity.sortOrder = sortOrder;
- return this;
- }
-
- public Builder withDistribution(Distribution distribution) {
- tableEntity.distribution = distribution;
- return this;
- }
-
- public Builder withIndexes(Index[] indexes) {
- tableEntity.indexes = indexes;
- return this;
- }
-
- public Builder withComment(String comment) {
- tableEntity.comment = comment;
- return this;
- }
-
- public GenericTableEntity build() {
- return tableEntity;
- }
- }
-
- public static GenericTableEntity.Builder getBuilder() {
- return new GenericTableEntity.Builder();
- }
-
- public GenericTable toGenericTable() {
- return GenericLakehouseTable.builder()
- .withFormat(format)
- .withProperties(properties)
- .withAuditInfo(auditInfo)
- .withSortOrders(sortOrder)
- .withPartitioning(partitions)
- .withDistribution(distribution)
- .withColumns(
- columns.stream()
- .map(this::toGenericLakehouseColumn)
- .toArray(GenericLakehouseColumn[]::new))
- .withIndexes(indexes)
- .withName(name)
- .withComment(comment)
- .build();
- }
-
- private GenericLakehouseColumn toGenericLakehouseColumn(ColumnEntity
columnEntity) {
- return GenericLakehouseColumn.builder()
- .withName(columnEntity.name())
- .withComment(columnEntity.comment())
- .withAutoIncrement(columnEntity.autoIncrement())
- .withNullable(columnEntity.nullable())
- .withType(columnEntity.dataType())
- .withDefaultValue(columnEntity.defaultValue())
- .build();
- }
-}
diff --git a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
index 595defed08..795db870d9 100644
--- a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
@@ -20,15 +20,21 @@ package org.apache.gravitino.meta;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import lombok.Getter;
import lombok.ToString;
import org.apache.gravitino.Auditable;
import org.apache.gravitino.Entity;
import org.apache.gravitino.Field;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.utils.CollectionUtils;
/** A class representing a table entity in Apache Gravitino. */
@@ -42,15 +48,42 @@ public class TableEntity implements Entity, Auditable,
HasIdentifier {
public static final Field COLUMNS =
Field.optional("columns", List.class, "The columns of the table");
- protected Long id;
+ public static final Field FORMAT = Field.optional("format", String.class,
"The table's format");
+ public static final Field PROPERTIES =
+ Field.optional("properties", Map.class, "The table's properties");
- protected String name;
+ public static final Field PARTITIONS =
+ Field.optional("partitions", Transform[].class, "The table's partition");
- protected AuditInfo auditInfo;
+ public static final Field SORT_ORDER =
+ Field.optional("sortOrders", SortOrder[].class, "The table's sort
order");
- protected Namespace namespace;
+ public static final Field DISTRIBUTION =
+ Field.optional("distribution", Distribution.class, "The table's
distribution");
- protected List<ColumnEntity> columns;
+ public static final Field INDEXES =
+ Field.optional("indexes", Index[].class, "The table's indexes");
+
+ public static final Field COMMENT =
+ Field.optional("comment", String.class, "The table's comment");
+
+ private Long id;
+
+ private String name;
+
+ private AuditInfo auditInfo;
+
+ private Namespace namespace;
+
+ private List<ColumnEntity> columns;
+
+ @Getter private String format;
+ @Getter private Map<String, String> properties;
+ @Getter private Transform[] partitions;
+ @Getter private SortOrder[] sortOrder;
+ @Getter private Distribution distribution;
+ @Getter private Index[] indexes;
+ @Getter private String comment;
/**
* Returns a map of the fields and their corresponding values for this table.
@@ -65,6 +98,14 @@ public class TableEntity implements Entity, Auditable,
HasIdentifier {
fields.put(AUDIT_INFO, auditInfo);
fields.put(COLUMNS, columns);
+ fields.put(FORMAT, format);
+ fields.put(PROPERTIES, properties);
+ fields.put(PARTITIONS, partitions);
+ fields.put(SORT_ORDER, sortOrder);
+ fields.put(DISTRIBUTION, distribution);
+ fields.put(INDEXES, indexes);
+ fields.put(COMMENT, comment);
+
return fields;
}
@@ -136,7 +177,15 @@ public class TableEntity implements Entity, Auditable,
HasIdentifier {
&& Objects.equal(name, baseTable.name)
&& Objects.equal(namespace, baseTable.namespace)
&& Objects.equal(auditInfo, baseTable.auditInfo)
- && CollectionUtils.isEqualCollection(columns, baseTable.columns);
+ && CollectionUtils.isEqualCollection(columns, baseTable.columns)
+ && Objects.equal(format, baseTable.format)
+ // Please check the correctness of this comparison.
+ && Objects.equal(properties, baseTable.properties)
+ && Arrays.equals(partitions, baseTable.partitions)
+ && Arrays.equals(sortOrder, baseTable.sortOrder)
+ && Objects.equal(distribution, baseTable.distribution)
+ && Arrays.equals(indexes, baseTable.indexes)
+ && Objects.equal(comment, baseTable.comment);
}
@Override
@@ -177,6 +226,41 @@ public class TableEntity implements Entity, Auditable,
HasIdentifier {
return this;
}
+ public Builder withFormat(String format) {
+ tableEntity.format = format;
+ return this;
+ }
+
+ public Builder withProperties(Map<String, String> properties) {
+ tableEntity.properties = properties;
+ return this;
+ }
+
+ public Builder withPartitions(Transform[] partitions) {
+ tableEntity.partitions = partitions;
+ return this;
+ }
+
+ public Builder withSortOrder(SortOrder[] sortOrder) {
+ tableEntity.sortOrder = sortOrder;
+ return this;
+ }
+
+ public Builder withDistribution(Distribution distribution) {
+ tableEntity.distribution = distribution;
+ return this;
+ }
+
+ public Builder withIndexes(Index[] indexes) {
+ tableEntity.indexes = indexes;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ tableEntity.comment = comment;
+ return this;
+ }
+
public TableEntity build() {
tableEntity.validate();
@@ -184,6 +268,22 @@ public class TableEntity implements Entity, Auditable,
HasIdentifier {
tableEntity.columns = Collections.emptyList();
}
+ if (tableEntity.properties == null) {
+ tableEntity.properties = Collections.emptyMap();
+ }
+
+ if (tableEntity.indexes == null) {
+ tableEntity.indexes = new Index[0];
+ }
+
+ if (tableEntity.partitions == null) {
+ tableEntity.partitions = new Transform[0];
+ }
+
+ if (tableEntity.sortOrder == null) {
+ tableEntity.sortOrder = new SortOrder[0];
+ }
+
return tableEntity;
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
index e919e539e9..0eb13e4a1e 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
@@ -56,9 +56,6 @@ public interface TableMetaMapper {
TablePO selectTableMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("tableName") String name);
- @SelectProvider(type = TableMetaSQLProviderFactory.class, method =
"selectTableMetaById")
- TablePO selectTableMetaById(@Param("tableId") Long tableId);
-
@InsertProvider(type = TableMetaSQLProviderFactory.class, method =
"insertTableMeta")
void insertTableMeta(@Param("tableMeta") TablePO tablePO);
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
index a723c3db4a..16f1ad7a34 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
@@ -20,8 +20,10 @@
package org.apache.gravitino.storage.relational.mapper;
import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.DeleteProvider;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.UpdateProvider;
public interface TableVersionMapper {
String TABLE_NAME = "table_version_info";
@@ -33,4 +35,16 @@ public interface TableVersionMapper {
type = TableVersionSQLProviderFactory.class,
method = "insertTableVersionOnDuplicateKeyUpdate")
void insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO
tablePO);
+
+ @UpdateProvider(
+ type = TableVersionSQLProviderFactory.class,
+ method = "softDeleteTableVersionByTableIdAndVersion")
+ void softDeleteTableVersionByTableIdAndVersion(
+ @Param("tableId") Long tableId, @Param("version") Long version);
+
+ @DeleteProvider(
+ type = TableVersionSQLProviderFactory.class,
+ method = "deleteTableVersionByLegacyTimeline")
+ Integer deleteTableVersionByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
index ab27353c00..4c518ef4bd 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
@@ -59,4 +59,14 @@ public class TableVersionSQLProviderFactory {
public static String
insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO tablePO) {
return getProvider().insertTableVersionOnDuplicateKeyUpdate(tablePO);
}
+
+ public static String softDeleteTableVersionByTableIdAndVersion(
+ @Param("tableId") Long tableId, @Param("version") Long version) {
+ return getProvider().softDeleteTableVersionByTableIdAndVersion(tableId,
version);
+ }
+
+ public static String deleteTableVersionByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return getProvider().deleteTableVersionByLegacyTimeline(legacyTimeline,
limit);
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 8065476a61..f8f116c5b3 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -48,15 +48,23 @@ public class TableMetaBaseSQLProvider {
public String listTablePOsByTableIds(List<Long> tableIds) {
return "<script>"
- + " SELECT table_id as tableId, table_name as tableName,"
- + " metalake_id as metalakeId, catalog_id as catalogId,"
- + " schema_id as schemaId, audit_info as auditInfo,"
- + " current_version as currentVersion, last_version as lastVersion,"
- + " deleted_at as deletedAt"
+ + "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+ + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+ + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+ + " tm.current_version as currentVersion, tm.last_version as
lastVersion,"
+ + " tm.deleted_at as deletedAt,"
+ + " tv.format as format, "
+ + " tv.properties as properties,"
+ + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+ + " tv.distribution as distribution, tv.indexes as indexes,"
+ + " tv.comment as comment"
+ " FROM "
+ TABLE_NAME
- + " WHERE deleted_at = 0"
- + " AND table_id IN ("
+ + " tm LEFT JOIN "
+ + TableVersionMapper.TABLE_NAME
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version AND tv.deleted_at = 0"
+ + " WHERE tm.deleted_at = 0"
+ + " AND tm.table_id IN ("
+ "<foreach collection='tableIds' item='tableId' separator=','>"
+ "#{tableId}"
+ "</foreach>"
@@ -93,14 +101,23 @@ public class TableMetaBaseSQLProvider {
}
public String selectTableMetaById(@Param("tableId") Long tableId) {
- return "SELECT table_id as tableId, table_name as tableName,"
- + " metalake_id as metalakeId, catalog_id as catalogId,"
- + " schema_id as schemaId, audit_info as auditInfo,"
- + " current_version as currentVersion, last_version as lastVersion,"
- + " deleted_at as deletedAt"
+ return "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+ + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+ + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+ + " tm.current_version as currentVersion, tm.last_version as
lastVersion,"
+ + " tm.deleted_at as deletedAt,"
+ + " tv.format as format, "
+ + " tv.properties as properties,"
+ + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+ + " tv.distribution as distribution, tv.indexes as indexes,"
+ + " tv.comment as comment"
+ " FROM "
+ TABLE_NAME
- + " WHERE table_id = #{tableId} AND deleted_at = 0";
+ + " tm LEFT JOIN "
+ + TableVersionMapper.TABLE_NAME
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version"
+ + " AND tv.deleted_at = 0"
+ + " WHERE tm.table_id = #{tableId} AND tm.deleted_at = 0";
}
public String insertTableMeta(@Param("tableMeta") TablePO tablePO) {
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
index c39b8cbabb..68bd28a63d 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
@@ -75,4 +75,20 @@ public class TableVersionBaseSQLProvider {
+ " version = #{tablePO.currentVersion},"
+ " deleted_at = #{tablePO.deletedAt}";
}
+
+ public String softDeleteTableVersionByTableIdAndVersion(
+ @Param("tableId") Long tableId, @Param("version") Long version) {
+ return "UPDATE "
+ + TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE table_id = #{tableId} AND version = #{version} AND
deleted_at = 0";
+ }
+
+ public String deleteTableVersionByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + TABLE_NAME
+ + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT
#{limit}";
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
index 13eebeaa2c..11be90f345 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
@@ -45,7 +45,7 @@ public class TableVersionPostgreSQLProvider extends
TableVersionBaseSQLProvider
+ " #{tablePO.currentVersion},"
+ " #{tablePO.deletedAt}"
+ " )"
- + " ON CONFLICT (table_id, deleted_at) DO UPDATE SET"
+ + " ON CONFLICT (table_id, version, deleted_at) DO UPDATE SET"
+ " format = #{tablePO.format},"
+ " properties = #{tablePO.properties},"
+ " partitioning = #{tablePO.partitions},"
@@ -56,4 +56,13 @@ public class TableVersionPostgreSQLProvider extends
TableVersionBaseSQLProvider
+ " version = #{tablePO.currentVersion},"
+ " deleted_at = #{tablePO.deletedAt}";
}
+
+ @Override
+ public String softDeleteTableVersionByTableIdAndVersion(Long tableId, Long
version) {
+ return "UPDATE "
+ + TABLE_NAME
+ + " SET deleted_at = round(extract(epoch from(current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')) * 1000)"
+ + " WHERE table_id = #{tableId} AND version = #{version}";
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 7a42d95db6..bfbbdaf39f 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -136,12 +136,10 @@ public class TableMetaService {
SessionUtils.doWithCommit(
TableVersionMapper.class,
mapper -> {
- if (po.getFormat() != null) {
- if (overwrite) {
- mapper.insertTableVersionOnDuplicateKeyUpdate(po);
- } else {
- mapper.insertTableVersion(po);
- }
+ if (overwrite) {
+ mapper.insertTableVersionOnDuplicateKeyUpdate(po);
+ } else {
+ mapper.insertTableVersion(po);
}
}),
() -> {
@@ -196,11 +194,8 @@ public class TableMetaService {
.getParentEntityIdByNamespace(newTableEntity.namespace())
: schemaId;
- boolean isColumnChanged =
- TableColumnMetaService.getInstance().isColumnUpdated(oldTableEntity,
newTableEntity);
TablePO newTablePO =
- POConverters.updateTablePOWithVersionAndSchemaId(
- oldTablePO, newTableEntity, isColumnChanged, newSchemaId);
+ POConverters.updateTablePOWithVersionAndSchemaId(oldTablePO,
newTableEntity, newSchemaId);
final AtomicInteger updateResult = new AtomicInteger(0);
try {
@@ -214,12 +209,12 @@ public class TableMetaService {
SessionUtils.doWithCommit(
TableVersionMapper.class,
mapper -> {
- if (newTablePO.getFormat() != null) {
-
mapper.insertTableVersionOnDuplicateKeyUpdate(newTablePO);
- }
+ mapper.softDeleteTableVersionByTableIdAndVersion(
+ oldTablePO.getTableId(),
oldTablePO.getCurrentVersion());
+ mapper.insertTableVersionOnDuplicateKeyUpdate(newTablePO);
}),
() -> {
- if (updateResult.get() > 0 && (isColumnChanged ||
isSchemaChanged)) {
+ if (updateResult.get() > 0) {
TableColumnMetaService.getInstance()
.updateColumnPOsFromTableDiff(oldTableEntity,
newTableEntity, newTablePO);
}
@@ -250,7 +245,11 @@ public class TableMetaService {
Long tableId = getTableIdBySchemaIdAndName(schemaId, tableName);
AtomicInteger deleteResult = new AtomicInteger(0);
+ TablePO[] tablePOHolder = new TablePO[1];
SessionUtils.doMultipleWithCommit(
+ () -> {
+ tablePOHolder[0] = getTablePOBySchemaIdAndName(schemaId, tableName);
+ },
() ->
deleteResult.set(
SessionUtils.getWithoutCommit(
@@ -284,6 +283,11 @@ public class TableMetaService {
SessionUtils.doWithoutCommit(
PolicyMetadataObjectRelMapper.class,
mapper ->
mapper.softDeletePolicyMetadataObjectRelsByTableId(tableId));
+ SessionUtils.doWithCommit(
+ TableVersionMapper.class,
+ mapper ->
+ mapper.softDeleteTableVersionByTableIdAndVersion(
+ tableId, tablePOHolder[0].getCurrentVersion()));
}
});
@@ -295,8 +299,18 @@ public class TableMetaService {
baseMetricName = "deleteTableMetasByLegacyTimeline")
public int deleteTableMetasByLegacyTimeline(Long legacyTimeline, int limit) {
return SessionUtils.doWithCommitAndFetchResult(
- TableMetaMapper.class,
- mapper -> mapper.deleteTableMetasByLegacyTimeline(legacyTimeline,
limit));
+ TableMetaMapper.class,
+ mapper -> mapper.deleteTableMetasByLegacyTimeline(legacyTimeline,
limit))
+ + deleteTableVersionByLegacyTimeline(legacyTimeline, limit);
+ }
+
+ @Monitored(
+ metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+ baseMetricName = "deleteTableVersionByLegacyTimeline")
+ public int deleteTableVersionByLegacyTimeline(Long legacyTimeline, int
limit) {
+ return SessionUtils.doWithCommitAndFetchResult(
+ TableVersionMapper.class,
+ mapper -> mapper.deleteTableVersionByLegacyTimeline(legacyTimeline,
limit));
}
private void fillTablePOBuilderParentEntityId(TablePO.Builder builder,
Namespace namespace) {
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index fa8f06a9f8..48cf8cc00e 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -45,7 +45,6 @@ import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.ColumnEntity;
import org.apache.gravitino.meta.FilesetEntity;
-import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.ModelVersionEntity;
@@ -398,37 +397,19 @@ public class POConverters {
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.auditInfo()))
.withCurrentVersion(INIT_VERSION)
.withLastVersion(INIT_VERSION)
- .withDeletedAt(DEFAULT_DELETED_AT);
-
- if (tableEntity instanceof GenericTableEntity genericTable) {
- builder.withFormat(genericTable.getFormat());
- builder.withComment(genericTable.getComment());
- builder.withProperties(
- genericTable.getProperties() == null
- ? null
- :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
-
- // TODO store the following information to databases;
- /**
- * builder.withDistribution( genericTable.getDistribution() == null ?
null :
- *
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getDistribution()));
- * builder.withPartitions( genericTable.getPartitions() == null ? null
:
- *
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getPartitions()));
- */
- builder.withIndexes(
- genericTable.getIndexes() == null
- ? null
- :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
- builder.withProperties(
- genericTable.getProperties() == null
- ? null
- :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
- builder.withSortOrders(
- genericTable.getSortOrder() == null
- ? null
- :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getSortOrder()));
- }
+ .withDeletedAt(DEFAULT_DELETED_AT)
+ .withFormat(tableEntity.getFormat())
+ .withComment(tableEntity.getComment())
+ .withProperties(
+ tableEntity.getProperties() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.getProperties()))
+ .withIndexes(
+ tableEntity.getIndexes() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.getIndexes()));
+ // TODO, handle these fields(distribution, sort order, partition) later
return builder.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize json object:", e);
@@ -440,21 +421,15 @@ public class POConverters {
*
* @param oldTablePO the old TablePO object
* @param newTable the new TableEntity object
- * @param needUpdateVersion whether need to update the version
* @param newSchemaId the new schema id
* @return TablePO object with updated version
*/
public static TablePO updateTablePOWithVersionAndSchemaId(
- TablePO oldTablePO, TableEntity newTable, boolean needUpdateVersion,
Long newSchemaId) {
+ TablePO oldTablePO, TableEntity newTable, Long newSchemaId) {
Long lastVersion;
Long currentVersion;
- if (needUpdateVersion) {
- lastVersion = oldTablePO.getLastVersion() + 1;
- currentVersion = lastVersion;
- } else {
- lastVersion = oldTablePO.getLastVersion();
- currentVersion = oldTablePO.getCurrentVersion();
- }
+ lastVersion = oldTablePO.getLastVersion() + 1;
+ currentVersion = lastVersion;
try {
TablePO.Builder builder =
@@ -467,23 +442,18 @@ public class POConverters {
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newTable.auditInfo()))
.withCurrentVersion(currentVersion)
.withLastVersion(lastVersion)
- .withDeletedAt(DEFAULT_DELETED_AT);
-
- // Note: GenericTableEntity will be removed in the refactor PR, so here
just keep the old
- // logic to make the UT pass.
- if (newTable instanceof GenericTableEntity genericTable) {
- builder.withFormat(genericTable.getFormat());
- builder.withComment(genericTable.getComment());
- builder.withProperties(
- genericTable.getProperties() == null
- ? null
- :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
- builder.withIndexes(
- genericTable.getIndexes() == null
- ? null
- :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
- // TODO other fields in the refactor PRs.
- }
+ .withDeletedAt(DEFAULT_DELETED_AT)
+ .withComment(newTable.getComment())
+ .withProperties(
+ newTable.getProperties() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(newTable.getProperties()))
+ .withIndexes(
+ newTable.getIndexes() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(newTable.getIndexes()))
+ .withFormat(newTable.getFormat());
+ // TODO other fields(partitioning, distribution, sortorder) in the
refactor PRs.
return builder.build();
} catch (JsonProcessingException e) {
@@ -505,29 +475,6 @@ public class POConverters {
public static TableEntity fromTableAndColumnPOs(
TablePO tablePO, List<ColumnPO> columnPOs, Namespace namespace) {
try {
- if (tablePO.getFormat() != null) {
- return GenericTableEntity.getBuilder()
- .withId(tablePO.getTableId())
- .withName(tablePO.getTableName())
- .withNamespace(namespace)
- .withColumns(fromColumnPOs(columnPOs))
- .withAuditInfo(
- JsonUtils.anyFieldMapper().readValue(tablePO.getAuditInfo(),
AuditInfo.class))
- // TODO add field partition, distribution and sort order;
- .withIndexes(
- StringUtils.isBlank(tablePO.getIndexes())
- ? null
- :
JsonUtils.anyFieldMapper().readValue(tablePO.getIndexes(), IndexImpl[].class))
- .withFormat(tablePO.getFormat())
- .withComment(tablePO.getComment())
- .withProperties(
- StringUtils.isBlank(tablePO.getProperties())
- ? null
- :
JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), Map.class))
- .withColumns(fromColumnPOs(columnPOs))
- .build();
- }
-
return TableEntity.builder()
.withId(tablePO.getTableId())
.withName(tablePO.getTableName())
@@ -535,6 +482,18 @@ public class POConverters {
.withColumns(fromColumnPOs(columnPOs))
.withAuditInfo(
JsonUtils.anyFieldMapper().readValue(tablePO.getAuditInfo(),
AuditInfo.class))
+ // TODO add field partition, distribution and sort order;
+ .withIndexes(
+ StringUtils.isBlank(tablePO.getIndexes())
+ ? null
+ : JsonUtils.anyFieldMapper().readValue(tablePO.getIndexes(),
IndexImpl[].class))
+ .withFormat(tablePO.getFormat())
+ .withComment(tablePO.getComment())
+ .withProperties(
+ StringUtils.isBlank(tablePO.getProperties())
+ ? null
+ :
JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), Map.class))
+ .withColumns(fromColumnPOs(columnPOs))
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to deserialize json object:", e);
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
index 5b83a0e22b..73f8a146af 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
@@ -98,6 +98,7 @@ public abstract class TestOperationDispatcher {
entityStore.put(metalakeEntity, true);
catalogManager = new CatalogManager(config, entityStore, idGenerator);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager",
catalogManager, true);
Config config = mock(Config.class);
doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
diff --git
a/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
b/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
index c67dae6dd1..92c0e16131 100644
--- a/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
+++ b/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
@@ -52,6 +52,7 @@ import org.apache.gravitino.catalog.TestOperationDispatcher;
import org.apache.gravitino.catalog.TestTableOperationDispatcher;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableChange;
@@ -95,7 +96,13 @@ public class TestTableHookDispatcher extends
TestOperationDispatcher {
catalogManager = Mockito.mock(CatalogManager.class);
FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager",
catalogManager, true);
BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
+ Mockito.when(catalog.capability()).thenReturn(Capability.DEFAULT);
+ CatalogManager.CatalogWrapper catalogWrapper =
+ Mockito.mock(CatalogManager.CatalogWrapper.class);
+ Mockito.when(catalogWrapper.catalog()).thenReturn(catalog);
+
Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+
Mockito.when(catalogManager.loadCatalogAndWrap(any())).thenReturn(catalogWrapper);
authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
}
diff --git a/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
b/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
index 8bff5ce966..9ac22212be 100644
--- a/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
+++ b/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
@@ -31,6 +31,14 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.authorization.Privileges;
import org.apache.gravitino.authorization.SecurableObjects;
import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -152,13 +160,38 @@ public class TestEntity {
@Test
public void testTable() {
+ String format = "parquet";
+ String comment = "test table comment";
+ Map<String, String> tableProperties = ImmutableMap.of("tableKey1",
"tableValue1");
+ SortOrder[] sortOrders =
+ new SortOrder[] {SortOrders.of(NamedReference.field("col1"),
SortDirection.ASCENDING)};
+ Index[] indexes =
+ new Index[] {Indexes.of(Index.IndexType.BTREE, "idx1", new String[][]
{{"col1"}})};
+ Distribution distribution = Distributions.hash(4,
NamedReference.field("col1"));
+
TableEntity testTable =
-
TableEntity.builder().withId(tableId).withName(tableName).withAuditInfo(auditInfo).build();
+ TableEntity.builder()
+ .withId(tableId)
+ .withName(tableName)
+ .withAuditInfo(auditInfo)
+ .withFormat(format)
+ .withSortOrder(sortOrders)
+ .withProperties(tableProperties)
+ .withComment(comment)
+ .withIndexes(indexes)
+ .withDistribution(distribution)
+ .build();
Map<Field, Object> fields = testTable.fields();
Assertions.assertEquals(tableId, fields.get(TableEntity.ID));
Assertions.assertEquals(tableName, fields.get(TableEntity.NAME));
Assertions.assertEquals(auditInfo, fields.get(TableEntity.AUDIT_INFO));
+ Assertions.assertEquals(format, fields.get(TableEntity.FORMAT));
+ Assertions.assertEquals(tableProperties,
fields.get(TableEntity.PROPERTIES));
+ Assertions.assertEquals(comment, fields.get(TableEntity.COMMENT));
+ Assertions.assertEquals(sortOrders, fields.get(TableEntity.SORT_ORDER));
+ Assertions.assertEquals(indexes, fields.get(TableEntity.INDEXES));
+ Assertions.assertEquals(distribution,
fields.get(TableEntity.DISTRIBUTION));
}
@Test
diff --git
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index ce9f0f5c73..ad9cd173bc 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -79,7 +79,6 @@ import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.ColumnEntity;
import org.apache.gravitino.meta.FilesetEntity;
-import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.ModelVersionEntity;
@@ -2686,8 +2685,8 @@ public class TestEntityStorage {
store.put(schemaEntity, false);
long column1Id = RandomIdGenerator.INSTANCE.nextId();
- GenericTableEntity table =
- GenericTableEntity.getBuilder()
+ TableEntity table =
+ TableEntity.builder()
.withId(RandomIdGenerator.INSTANCE.nextId())
.withNamespace(NamespaceUtil.ofTable("metalake", "catalog",
"schema"))
.withName("table")
@@ -2707,8 +2706,8 @@ public class TestEntityStorage {
.withProperties(ImmutableMap.of("location", "/tmp/test",
"format", "lance"))
.build();
store.put(table, false);
- GenericTableEntity fetchedTable =
- store.get(table.nameIdentifier(), Entity.EntityType.TABLE,
GenericTableEntity.class);
+ TableEntity fetchedTable =
+ store.get(table.nameIdentifier(), Entity.EntityType.TABLE,
TableEntity.class);
// check table properties
Assertions.assertEquals("/tmp/test",
fetchedTable.getProperties().get("location"));
@@ -2718,8 +2717,8 @@ public class TestEntityStorage {
Assertions.assertEquals("column1", fetchedTable.columns().get(0).name());
// Now try to update the table
- GenericTableEntity updatedTable =
- GenericTableEntity.getBuilder()
+ TableEntity updatedTable =
+ TableEntity.builder()
.withId(table.id())
.withNamespace(table.namespace())
.withName(table.name())
@@ -2748,12 +2747,9 @@ public class TestEntityStorage {
.build();
store.update(
- table.nameIdentifier(),
- GenericTableEntity.class,
- Entity.EntityType.TABLE,
- e -> updatedTable);
- GenericTableEntity fetchedUpdatedTable =
- store.get(table.nameIdentifier(), Entity.EntityType.TABLE,
GenericTableEntity.class);
+ table.nameIdentifier(), TableEntity.class, Entity.EntityType.TABLE,
e -> updatedTable);
+ TableEntity fetchedUpdatedTable =
+ store.get(table.nameIdentifier(), Entity.EntityType.TABLE,
TableEntity.class);
// check updated table properties
Assertions.assertEquals(
@@ -2772,6 +2768,11 @@ public class TestEntityStorage {
.filter(c -> c.name().equals("column2"))
.findFirst()
.isPresent());
+
+ // Test drop the table
+ Assertions.assertTrue(store.delete(table.nameIdentifier(),
Entity.EntityType.TABLE));
+ Assertions.assertFalse(store.exists(table.nameIdentifier(),
Entity.EntityType.TABLE));
+
destroy(type);
} catch (IOException e) {
throw new RuntimeException(e);
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index a552598709..b2916cfcaf 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -61,6 +61,7 @@ import org.apache.gravitino.Catalog;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
+import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
@@ -1195,6 +1196,128 @@ public class TestJDBCBackend {
assertEquals(1, countActiveOwnerRel(user.id()));
}
+ @Test
+ void testUpdateAndDropLanceTable() throws IOException, InterruptedException {
+ AuditInfo auditInfo =
+
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+ String metalakeName = "metalake" + RandomIdGenerator.INSTANCE.nextId();
+ BaseMetalake metalake =
+ BaseMetalake.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName(metalakeName)
+ .withAuditInfo(auditInfo)
+ .withComment(null)
+ .withProperties(null)
+ .withVersion(SchemaVersion.V_0_1)
+ .build();
+ backend.insert(metalake, false);
+
+ CatalogEntity catalog =
+ CatalogEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withNamespace(NamespaceUtil.ofCatalog(metalakeName))
+ .withName("catalog")
+ .withAuditInfo(auditInfo)
+ .withComment(null)
+ .withProperties(null)
+ .withType(Catalog.Type.RELATIONAL)
+ .withProvider("generic-lakehouse")
+ .build();
+
+ backend.insert(catalog, false);
+
+ SchemaEntity schema =
+ SchemaEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withNamespace(NamespaceUtil.ofSchema(metalakeName,
catalog.name()))
+ .withName("schema")
+ .withAuditInfo(auditInfo)
+ .withComment(null)
+ .withProperties(null)
+ .build();
+
+ backend.insert(schema, false);
+
+ TableEntity table =
+ TableEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withNamespace(NamespaceUtil.ofTable(metalakeName, catalog.name(),
schema.name()))
+ .withName("table")
+ .withAuditInfo(auditInfo)
+ .withFormat("lance")
+ .withComment(null)
+ .withProperties(ImmutableMap.of("format", "LANCE", "location",
"/tmp/test/lance"))
+ .build();
+
+ backend.insert(table, false);
+
+ TableEntity fetchedTable = backend.get(table.nameIdentifier(),
Entity.EntityType.TABLE);
+ Assertions.assertEquals("LANCE",
fetchedTable.getProperties().get("format"));
+
+ TableEntity updatedTable =
+ TableEntity.builder()
+ .withId(table.id())
+ .withNamespace(table.namespace())
+ .withName(table.name())
+ .withAuditInfo(auditInfo)
+ .withComment("update comment")
+ .withProperties(ImmutableMap.of("format", "LANCE", "location",
"/tmp/test/lance"))
+ .build();
+
+ backend.update(table.nameIdentifier(), EntityType.TABLE, e ->
updatedTable);
+ Thread.sleep(1000);
+ long now = System.currentTimeMillis();
+
+ // For update lance table, we will do the following things
+ // 1. Update info in table table_meta;
+ // 2. Soft drop info in the table table_version_info with an old version.
+ // 3. Insert new version info in the table table_version_info.
+ // Let us check it.
+ int tableInfoCount =
+ countTableVersionInfoRow(
+ String.format(
+ "SELECT count(*) FROM table_version_info WHERE table_id = %d
AND deleted_at = 0",
+ table.id()));
+ Assertions.assertEquals(1, tableInfoCount);
+ int tableInfoDeletedCount =
+ countTableVersionInfoRow(
+ String.format(
+ "SELECT count(*) FROM table_version_info WHERE table_id = %d
AND deleted_at != 0",
+ table.id()));
+ Assertions.assertEquals(1, tableInfoDeletedCount);
+
+ // Now try to clean up old version data
+ int deletedCount = backend.hardDeleteLegacyData(Entity.EntityType.TABLE,
now);
+ // Only one old version data should be cleaned up in table version info
table.
+ Assertions.assertEquals(1, deletedCount);
+
+ // Try to drop the table
+ backend.delete(table.nameIdentifier(), Entity.EntityType.TABLE, false);
+
+ Thread.sleep(1000);
+ now = System.currentTimeMillis();
+ // After drop, there should be one more deleted record in
table_version_info
+ int deleteCountNow = backend.hardDeleteLegacyData(Entity.EntityType.TABLE,
now);
+ // One row from table_meta and one row from table_version_info should be
cleaned up.
+ Assertions.assertEquals(2, deleteCountNow);
+ }
+
+ private int countTableVersionInfoRow(String sql) {
+ try (SqlSession sqlSession =
+
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+ Connection connection = sqlSession.getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet rs = statement.executeQuery(sql)) {
+ while (rs.next()) {
+ return rs.getInt(1);
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("SQL execution failed", e);
+ }
+ return 0;
+ }
+
private boolean legacyRecordExistsInDB(Long id, Entity.EntityType
entityType) {
String tableName;
String idColumnName;
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index a2d8a259da..db9d0cce86 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -694,8 +694,7 @@ public class TestPOConverters {
TablePO.Builder builder =
TablePO.builder().withMetalakeId(1L).withCatalogId(1L).withSchemaId(1L);
TablePO initPO = POConverters.initializeTablePOWithVersion(tableEntity,
builder);
- TablePO updatePO =
- POConverters.updateTablePOWithVersionAndSchemaId(initPO, updatedTable,
false, 1L);
+ TablePO updatePO =
POConverters.updateTablePOWithVersionAndSchemaId(initPO, updatedTable, 1L);
assertEquals(1, initPO.getCurrentVersion());
assertEquals(1, initPO.getLastVersion());
assertEquals(0, initPO.getDeletedAt());
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
index 493d0acace..4c3b96ce9a 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
+++
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
@@ -163,6 +163,8 @@ public class LanceNamespaceOperations {
@GET
@Path("{id}/table/list")
+ @Timed(name = "list-tables." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
+ @ResponseMetered(name = "list-tables", absolute = true)
public Response listTables(
@PathParam("id") String namespaceId,
@DefaultValue("$") @QueryParam("delimiter") String delimiter,
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
index 359fc94c42..dd10a31534 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
+++
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.lance.service.rest;
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.core.type.TypeReference;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DescribeTableResponse;
@@ -37,6 +39,7 @@ import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
import org.apache.gravitino.lance.service.LanceExceptionMapper;
+import org.apache.gravitino.metrics.MetricNames;
@Path("/v1/table/{id}")
@Consumes(MediaType.APPLICATION_JSON)
@@ -52,6 +55,8 @@ public class LanceTableOperations {
@POST
@Path("/describe")
+ @Timed(name = "describe-table." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "describe-table", absolute = true)
public Response describeTable(
@PathParam("id") String tableId,
@DefaultValue("$") @QueryParam("delimiter") String delimiter) {
@@ -68,6 +73,8 @@ public class LanceTableOperations {
@Path("/create")
@Consumes("application/vnd.apache.arrow.stream")
@Produces("application/json")
+ @Timed(name = "create-table." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
+ @ResponseMetered(name = "create-table", absolute = true)
public Response createTable(
@PathParam("id") String tableId,
@QueryParam("mode") @DefaultValue("create") String mode, // create,
exist_ok, overwrite
@@ -92,6 +99,8 @@ public class LanceTableOperations {
@POST
@Path("/create-empty")
+ @Timed(name = "create-empty-table." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "create-empty-table", absolute = true)
public Response createEmptyTable(
@PathParam("id") String tableId,
@QueryParam("mode") @DefaultValue("create") String mode, // create,
exist_ok, overwrite
diff --git a/scripts/h2/schema-1.1.0-h2.sql b/scripts/h2/schema-1.1.0-h2.sql
index 6172915f1f..2a8db7fa07 100644
--- a/scripts/h2/schema-1.1.0-h2.sql
+++ b/scripts/h2/schema-1.1.0-h2.sql
@@ -432,7 +432,7 @@ CREATE TABLE IF NOT EXISTS `job_run_meta` (
CREATE TABLE IF NOT EXISTS `table_version_info` (
`table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
- `format` VARCHAR(64) NOT NULL COMMENT 'table format, such as
Lance, Iceberg and so on',
+ `format` VARCHAR(64) COMMENT 'table format, such as Lance,
Iceberg and so on, it will be null if it is not a lakehouse table' ,
`properties` CLOB DEFAULT NULL COMMENT 'table properties',
`partitioning` CLOB DEFAULT NULL COMMENT 'table partition info',
`distribution` CLOB DEFAULT NULL COMMENT 'table distribution info',
@@ -441,6 +441,5 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
`comment` CLOB DEFAULT NULL COMMENT 'table comment',
`version` BIGINT(20) UNSIGNED COMMENT 'table current version',
`deleted_at` BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion
timestamp, 0 means not deleted',
- PRIMARY KEY (table_id),
- UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `deleted_at`)
+ UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`,
`deleted_at`)
) ENGINE=InnoDB COMMENT 'table detail information including format, location,
properties, partition, distribution, sort order, index and so on';
\ No newline at end of file
diff --git a/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
b/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
index cf42a02b57..2274b64ab0 100644
--- a/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
+++ b/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
@@ -19,7 +19,7 @@
CREATE TABLE IF NOT EXISTS `table_version_info` (
`table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
- `format` VARCHAR(64) NOT NULL COMMENT 'table format, such as
Lance, Iceberg and so on',
+ `format` VARCHAR(64) COMMENT 'table format, such as Lance,
Iceberg and so on, it will be null if it is not a lakehouse table',
`properties` CLOB DEFAULT NULL COMMENT 'table properties',
`partitioning` MEDIUMTEXT DEFAULT NULL COMMENT 'table partition info',
`distribution` MEDIUMTEXT DEFAULT NULL COMMENT 'table distribution info',
@@ -28,6 +28,5 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
`comment` MEDIUMTEXT DEFAULT NULL COMMENT 'table comment',
`version` BIGINT(20) UNSIGNED COMMENT 'table current version',
`deleted_at` BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion
timestamp, 0 means not deleted',
- PRIMARY KEY (table_id),
- UNIQUE KEY `uk_table_id_deleted_at` (`table_id`, `deleted_at`)
+ UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`,
`deleted_at`)
) ENGINE=InnoDB COMMENT 'table detail information including format, location,
properties, partition, distribution, sort order, index and so on';
\ No newline at end of file
diff --git a/scripts/mysql/schema-1.1.0-mysql.sql
b/scripts/mysql/schema-1.1.0-mysql.sql
index ca9b351b03..5659d71ea2 100644
--- a/scripts/mysql/schema-1.1.0-mysql.sql
+++ b/scripts/mysql/schema-1.1.0-mysql.sql
@@ -423,7 +423,7 @@ CREATE TABLE IF NOT EXISTS `job_run_meta` (
CREATE TABLE IF NOT EXISTS `table_version_info` (
`table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
- `format` VARCHAR(64) NOT NULL COMMENT 'table format, such as
Lance, Iceberg and so on',
+ `format` VARCHAR(64) COMMENT 'table format, such as Lance,
Iceberg and so on, it will be null if it is not a lakehouse table',
`properties` MEDIUMTEXT DEFAULT NULL COMMENT 'table properties',
`partitioning` MEDIUMTEXT DEFAULT NULL COMMENT 'table partition info',
`distribution` MEDIUMTEXT DEFAULT NULL COMMENT 'table distribution info',
@@ -432,7 +432,6 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
`comment` MEDIUMTEXT DEFAULT NULL COMMENT 'table comment',
`version` BIGINT(20) UNSIGNED COMMENT 'table current version',
`deleted_at` BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion
timestamp, 0 means not deleted',
- PRIMARY KEY (table_id),
- UNIQUE KEY `uk_table_id_deleted_at` (`table_id`, `deleted_at`)
+ UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`,
`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'table
detail information including format, location, properties, partition,
distribution, sort order, index and so on';
diff --git a/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
b/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
index 6663150f15..a883bafcd3 100644
--- a/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
+++ b/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
@@ -19,7 +19,7 @@
CREATE TABLE IF NOT EXISTS `table_version_info` (
`table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
- `format` VARCHAR(64) NOT NULL COMMENT 'table format, such as
Lance, Iceberg and so on',
+ `format` VARCHAR(64) COMMENT 'table format, such as Lance,
Iceberg and so on, it will be null if it is not a lakehouse table',
`properties` MEDIUMTEXT DEFAULT NULL COMMENT 'table properties',
`partitioning` MEDIUMTEXT DEFAULT NULL COMMENT 'table partition info',
`distribution` MEDIUMTEXT DEFAULT NULL COMMENT 'table distribution info',
@@ -28,6 +28,5 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
`comment` MEDIUMTEXT DEFAULT NULL COMMENT 'table comment',
`version` BIGINT(20) UNSIGNED COMMENT 'table current version',
`deleted_at` BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion
timestamp, 0 means not deleted',
- PRIMARY KEY (table_id),
- UNIQUE KEY `uk_table_id_deleted_at` (`table_id`, `deleted_at`)
+ UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`,
`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'table
detail information including format, location, properties, partition,
distribution, sort order, index and so on';
\ No newline at end of file
diff --git a/scripts/postgresql/schema-1.1.0-postgresql.sql
b/scripts/postgresql/schema-1.1.0-postgresql.sql
index c5bc6b3205..2f3bf16fab 100644
--- a/scripts/postgresql/schema-1.1.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.1.0-postgresql.sql
@@ -750,8 +750,8 @@ COMMENT ON COLUMN job_run_meta.last_version IS 'job run
last version';
COMMENT ON COLUMN job_run_meta.deleted_at IS 'job run deleted at';
CREATE TABLE IF NOT EXISTS table_version_info (
- table_id BIGINT PRIMARY KEY,
- format VARCHAR(64) NOT NULL,
+ table_id BIGINT NOT NULL,
+ format VARCHAR(64),
properties TEXT,
partitioning TEXT,
distribution TEXT,
@@ -760,11 +760,11 @@ CREATE TABLE IF NOT EXISTS table_version_info (
"comment" TEXT,
version BIGINT,
deleted_at BIGINT DEFAULT 0,
- UNIQUE (table_id, deleted_at)
+ UNIQUE (table_id, version, deleted_at)
);
COMMENT ON TABLE table_version_info IS 'table detail
information including format, location, properties, partition, distribution,
sort order, index and so on';
COMMENT ON COLUMN table_version_info.table_id IS 'table id';
-COMMENT ON COLUMN table_version_info.format IS 'table format, such as
Lance, Iceberg and so on';
+COMMENT ON COLUMN table_version_info.format IS 'table format, such as
Lance, Iceberg and so on, it will be null if it is not a lakehouse table';
COMMENT ON COLUMN table_version_info.properties IS 'table properties';
COMMENT ON COLUMN table_version_info.partitioning IS 'table partition
info';
COMMENT on COLUMN table_version_info.distribution IS 'table distribution
info';
diff --git a/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
b/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
index 42d06e30a8..80c279af57 100644
--- a/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
@@ -19,8 +19,8 @@
CREATE TABLE IF NOT EXISTS table_version_info (
- table_id BIGINT PRIMARY KEY,
- format VARCHAR(64) NOT NULL,
+ table_id BIGINT NOT NULL,
+ format VARCHAR(64),
properties TEXT,
partitions TEXT,
distribution TEXT,
@@ -29,11 +29,11 @@ CREATE TABLE IF NOT EXISTS table_version_info (
"comment" TEXT,
version BIGINT,
deleted_at BIGINT DEFAULT 0,
- UNIQUE (table_id, deleted_at)
+ UNIQUE (table_id, version, deleted_at)
);
COMMENT ON TABLE table_version_info IS 'table detail
information including format, location, properties, partition, distribution,
sort order, index and so on';
COMMENT ON COLUMN table_version_info.table_id IS 'table id';
-COMMENT ON COLUMN table_version_info.format IS 'table format, such as
Lance, Iceberg and so on';
+COMMENT ON COLUMN table_version_info.format IS 'table format, such as
Lance, Iceberg and so on, it will be null if it is not a lakehouse table';
COMMENT ON COLUMN table_version_info.properties IS 'table properties';
COMMENT ON COLUMN table_version_info.partitions IS 'table partition info';
COMMENT on COLUMN table_version_info.distribution IS 'table distribution
info';