This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by
this push:
new d093824b72 [#8838] feat(catalogs): Support create/load/list table
operation for lance table. (#8879)
d093824b72 is described below
commit d093824b7268aad1735ead114e5e760f28832fea
Author: Mini Yu <[email protected]>
AuthorDate: Fri Oct 24 22:56:03 2025 +0800
[#8838] feat(catalogs): Support create/load/list table operation for lance
table. (#8879)
### What changes were proposed in this pull request?
Add support create and load table operations for lance table.
### Why are the changes needed?
It's a need.
Fix: #8838
Fix: #8837
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
Currently, I have only tested it locally
```shell
➜ [/Users/yuqi/Downloads] curl -X POST -H "Accept:
application/vnd.gravitino.v1+json" \
-H "Content-Type: application/json" -d '{
"name": "lance_table14",
"comment": "This is an example table",
"columns": [
{
"name": "id",
"type": "integer",
"comment": "id column comment",
"nullable": false,
"autoIncrement": true,
"defaultValue": {
"type": "literal",
"dataType": "integer",
"value": "-1"
}
}
],
"indexes": [
{
"indexType": "primary_key",
"name": "PRIMARY",
"fieldNames": [["id"]]
}
],
"properties": {
"format": "lance",
"location": "/tmp/lance_catalog/schema/lance_table14"
}
}'
http://localhost:8090/api/metalakes/test/catalogs/lance_catalog/schemas/schema/tables
{"code":0,"table":{"name":"lance_table14","comment":"This is an example
table","columns":[{"name":"id","type":"integer","comment":"id column
comment","nullable":false,"autoIncrement":true,"defaultValue":{"type":"literal","dataType":"integer","value":"-1"}}],"properties":{"format":"lance","location":"/tmp/lance_catalog/schema/lance_table14/"},"audit":{"creator":"anonymous","createTime":"2025-10-23T03:18:39.123151Z"},"distribution":{"strategy":"none","number":0,"funcArgs":[]},"sortOrder
[...]
➜ [/Users/yuqi/Downloads] curl -X GET -H "Accept:
application/vnd.gravitino.v1+json" \
-H "Content-Type: application/json"
http://localhost:8090/api/metalakes/test/catalogs/lance_catalog/schemas/schema/tables/lance_table14
{"code":0,"table":{"name":"lance_table14","comment":"This is an example
table","columns":[{"name":"id","type":"integer","comment":"id column
comment","nullable":false,"autoIncrement":false,"defaultValue":{"type":"literal","dataType":"integer","value":"-1"}}],"properties":{"format":"lance","location":"/tmp/lance_catalog/schema/lance_table14/"},"audit":{"creator":"anonymous","createTime":"2025-10-23T03:18:39.123151Z"},"distribution":{"strategy":"none","number":0,"funcArgs":[]},"sortOrde
[...]
-H "Content-Type: application/json"
http://localhost:8090/api/metalakes/test/catalogs/lance_catalog/schemas/schema/tables
{"code":0,"identifiers":[{"namespace":["test","lance_catalog","schema"],"name":"lance_table10"},{"namespace":["test","lance_catalog","schema"],"name":"lance_table11"},{"namespace":["test","lance_catalog","schema"],"name":"lance_table12"},{"namespace":["test","lance_catalog","schema"],"name":"lance_table13"},{"namespace":["test","lance_catalog","schema"],"name":"lance_table14"}]}
➜ [/Users/yuqi/Downloads]
```
And the lance location
```shell
➜ [/tmp/lance_catalog/schema] ls
lance_table10 lance_table11 lance_table12 lance_table13 lance_table14
➜ [/tmp/lance_catalog/schema] cd lance_table14
➜ [/tmp/lance_catalog/schema/lance_table14] ls -al
total 0
drwxr-xr-x@ 4 yuqi wheel 128 10 23 11:18 .
drwxr-xr-x@ 7 yuqi wheel 224 10 23 11:18 ..
drwxr-xr-x@ 3 yuqi wheel 96 10 23 11:18 _transactions
drwxr-xr-x@ 3 yuqi wheel 96 10 23 11:18 _versions
➜ [/tmp/lance_catalog/schema/lance_table14] ls -al _versions
total 8
drwxr-xr-x@ 3 yuqi wheel 96 10 23 11:18 .
drwxr-xr-x@ 4 yuqi wheel 128 10 23 11:18 ..
-rw-r--r--@ 1 yuqi wheel 225 10 23 11:18 1.manifest
➜ [/tmp/lance_catalog/schema/lance_table14]
```
---
api/build.gradle.kts | 2 +
.../org/apache/gravitino/rel/GenericTable.java | 47 ++++++
.../org/apache/gravitino/rel/indexes/Indexes.java | 87 ++++++++++
.../java/org/apache/gravitino/rel/TestIndex.java | 57 +++++++
.../catalog-generic-lakehouse/build.gradle.kts | 1 +
.../GenericLakehouseCatalogOperations.java | 134 ++++++++++++++-
.../lakehouse/LakehouseCatalogOperations.java | 25 +++
.../lakehouse/lance/LanceCatalogOperations.java | 173 +++++++++++++++++++
.../lakehouse/lance/LanceDataTypeConverter.java | 123 ++++++++++++++
.../apache/gravitino/config/ConfigConstants.java | 5 +-
.../catalog/TableOperationDispatcher.java | 74 +++++++-
.../connector/GenericLakehouseColumn.java | 56 +++++++
.../gravitino/connector/GenericLakehouseTable.java | 86 ++++++++++
.../apache/gravitino/meta/GenericTableEntity.java | 186 +++++++++++++++++++++
.../org/apache/gravitino/meta/TableEntity.java | 10 +-
.../relational/mapper/TableVersionMapper.java | 36 ++++
.../mapper/TableVersionSQLProviderFactory.java | 62 +++++++
.../provider/DefaultMapperPackageProvider.java | 4 +-
.../provider/base/TableMetaBaseSQLProvider.java | 41 +++--
.../provider/base/TableVersionBaseSQLProvider.java | 79 +++++++++
.../postgresql/TableVersionPostgreSQLProvider.java | 24 +++
.../gravitino/storage/relational/po/TablePO.java | 46 +++++
.../relational/service/TableMetaService.java | 16 +-
.../storage/relational/utils/POConverters.java | 61 ++++++-
.../storage/relational/utils/SessionUtils.java | 12 ++
scripts/h2/schema-1.1.0-h2.sql | 19 ---
scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql | 36 ++--
scripts/mysql/schema-1.1.0-mysql.sql | 19 ---
scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql | 36 ++--
scripts/postgresql/schema-1.1.0-postgresql.sql | 19 ---
.../upgrade-1.0.0-to-1.1.0-postgresql.sql | 36 ++--
31 files changed, 1466 insertions(+), 146 deletions(-)
diff --git a/api/build.gradle.kts b/api/build.gradle.kts
index b4399b13c0..f0fe3ba5ee 100644
--- a/api/build.gradle.kts
+++ b/api/build.gradle.kts
@@ -26,6 +26,8 @@ dependencies {
implementation(libs.commons.lang3)
implementation(libs.commons.collections4)
implementation(libs.guava)
+ implementation(libs.jackson.annotations)
+ implementation(libs.jackson.databind)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
diff --git a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
b/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
new file mode 100644
index 0000000000..4796421c53
--- /dev/null
+++ b/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.rel;
+
+/** A generic table interface that extends the Table interface. */
+public interface GenericTable extends Table {
+
+ /**
+ * Formats the table as a string representation.
+ *
+ * @return the formatted string representation of the table
+ */
+ String format();
+
+ /**
+ * Gets the location of the table.
+ *
+ * @return the location of the table
+ */
+ String location();
+
+ /**
+ * Indicates whether the table is external.
+ *
+ * @return true if the table is external, false otherwise
+ */
+ default boolean external() {
+ return false;
+ }
+}
diff --git a/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
b/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
index ce10fd0a0f..d1b1a1f523 100644
--- a/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
+++ b/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
@@ -18,6 +18,22 @@
*/
package org.apache.gravitino.rel.indexes;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
/** Helper methods to create index to pass into Apache Gravitino. */
public class Indexes {
@@ -73,10 +89,81 @@ public class Indexes {
.build();
}
+ /** Custom JSON serializer for Index objects. */
+ public static class IndexSerializer extends JsonSerializer<Index> {
+ @Override
+ public void serialize(Index value, JsonGenerator gen, SerializerProvider
serializers)
+ throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("indexType",
value.type().name().toUpperCase(Locale.ROOT));
+ if (null != value.name()) {
+ gen.writeStringField("name", value.name());
+ }
+ gen.writeFieldName("fieldNames");
+ gen.writeObject(value.fieldNames());
+ gen.writeEndObject();
+ }
+ }
+
+ /** Custom JSON deserializer for Index objects. */
+ public static class IndexDeserializer extends JsonDeserializer<Index> {
+
+ @Override
+ public Index deserialize(JsonParser p, DeserializationContext ctxt) throws
IOException {
+ JsonNode node = p.getCodec().readTree(p);
+ Preconditions.checkArgument(
+ node != null && !node.isNull() && node.isObject(),
+ "Index must be a valid JSON object, but found: %s",
+ node);
+
+ IndexImpl.Builder builder = IndexImpl.builder();
+ Preconditions.checkArgument(
+ node.has("indexType"), "Cannot parse index from missing type: %s",
node);
+ String indexType = getString("indexType", node);
+
builder.withIndexType(Index.IndexType.valueOf(indexType.toUpperCase(Locale.ROOT)));
+ if (node.has("name")) {
+ builder.withName(getString("name", node));
+ }
+ Preconditions.checkArgument(
+ node.has("fieldNames"), "Cannot parse index from missing field
names: %s", node);
+ List<String[]> fieldNames = Lists.newArrayList();
+ node.get("fieldNames").forEach(field ->
fieldNames.add(getStringArray((ArrayNode) field)));
+ builder.withFieldNames(fieldNames.toArray(new String[0][0]));
+ return builder.build();
+ }
+
+ private static String[] getStringArray(ArrayNode node) {
+ String[] array = new String[node.size()];
+ for (int i = 0; i < node.size(); i++) {
+ array[i] = node.get(i).asText();
+ }
+ return array;
+ }
+
+ private static String getString(String property, JsonNode node) {
+ Preconditions.checkArgument(node.has(property), "Cannot parse missing
string: %s", property);
+ JsonNode pNode = node.get(property);
+ return convertToString(property, pNode);
+ }
+
+ private static String convertToString(String property, JsonNode pNode) {
+ Preconditions.checkArgument(
+ pNode != null && !pNode.isNull() && pNode.isTextual(),
+ "Cannot parse to a string value %s: %s",
+ property,
+ pNode);
+ return pNode.asText();
+ }
+ }
+
/** The user side implementation of the index. */
+ @JsonSerialize(using = IndexSerializer.class)
+ @JsonDeserialize(using = IndexDeserializer.class)
public static final class IndexImpl implements Index {
private final IndexType indexType;
+
private final String name;
+
private final String[][] fieldNames;
/**
diff --git a/api/src/test/java/org/apache/gravitino/rel/TestIndex.java
b/api/src/test/java/org/apache/gravitino/rel/TestIndex.java
new file mode 100644
index 0000000000..4a807fbb7b
--- /dev/null
+++ b/api/src/test/java/org/apache/gravitino/rel/TestIndex.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.rel;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.cfg.EnumFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestIndex {
+
+ @Test
+ void testIndexSerialization() throws JsonProcessingException {
+ String[][] fields = {{"column1"}, {"column2", "subcolumn"}};
+ Index index = Indexes.unique("test_index", fields);
+
+ JsonMapper jsonMapper =
+ JsonMapper.builder()
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+ .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
+ .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
+ .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+
.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .build();
+
+ String json = jsonMapper.writeValueAsString(index);
+
+ Index deserializedIndex = jsonMapper.readValue(json, IndexImpl.class);
+ Assertions.assertEquals(index.type(), deserializedIndex.type());
+ Assertions.assertEquals(index.name(), deserializedIndex.name());
+ Assertions.assertArrayEquals(index.fieldNames(),
deserializedIndex.fieldNames());
+ }
+}
diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts
b/catalogs/catalog-generic-lakehouse/build.gradle.kts
index fceac14304..704dbda7e3 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-generic-lakehouse/build.gradle.kts
@@ -43,6 +43,7 @@ dependencies {
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.hadoop3.client.api)
+ implementation(libs.lance)
annotationProcessor(libs.lombok)
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index b626aabc16..acac35528e 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -18,11 +18,17 @@
*/
package org.apache.gravitino.catalog.lakehouse;
+import static org.apache.gravitino.Entity.EntityType.TABLE;
+
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
@@ -30,16 +36,20 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.lakehouse.lance.LanceCatalogOperations;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.GenericTableEntity;
+import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
@@ -61,6 +71,11 @@ public class GenericLakehouseCatalogOperations
@SuppressWarnings("unused") // todo: remove this after implementing table
operations
private Optional<Path> catalogLakehouseDir;
+ private static final Map<String, LakehouseCatalogOperations>
SUPPORTED_FORMATS =
+ Maps.newHashMap();
+
+ private CatalogInfo catalogInfo;
+ private HasPropertyMetadata propertiesMetadata;
/**
* Initializes the generic lakehouse catalog operations with the provided
configuration.
*
@@ -141,7 +156,25 @@ public class GenericLakehouseCatalogOperations
@Override
public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
- throw new UnsupportedOperationException("Not implemented yet.");
+ EntityStore store = GravitinoEnv.getInstance().entityStore();
+ NameIdentifier identifier = NameIdentifier.of(namespace.levels());
+ try {
+ store.get(identifier, Entity.EntityType.SCHEMA, SchemaEntity.class);
+ } catch (NoSuchTableException e) {
+ throw new NoSuchEntityException(e, "Schema %s does not exist",
namespace);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to get schema " + identifier);
+ }
+
+ try {
+ List<GenericTableEntity> tableEntityList =
+ store.list(namespace, GenericTableEntity.class, TABLE);
+ return tableEntityList.stream()
+ .map(e -> NameIdentifier.of(namespace, e.name()))
+ .toArray(NameIdentifier[]::new);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
+ }
}
@Override
@@ -160,7 +193,66 @@ public class GenericLakehouseCatalogOperations
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
- throw new UnsupportedOperationException("Not implemented yet.");
+ String format = properties.getOrDefault("format", "lance");
+ String tableLocation = calculateTableLocation(ident, properties);
+ Map<String, String> newProperties = Maps.newHashMap(properties);
+ newProperties.put("location", tableLocation);
+
+ LakehouseCatalogOperations lakehouseCatalogOperations =
+ SUPPORTED_FORMATS.compute(
+ format,
+ (k, v) ->
+ v == null
+ ? createLakehouseCatalogOperations(
+ format, properties, catalogInfo, propertiesMetadata)
+ : v);
+
+ return lakehouseCatalogOperations.createTable(
+ ident, columns, comment, newProperties, partitions, distribution,
sortOrders, indexes);
+ }
+
+ private String calculateTableLocation(
+ NameIdentifier tableIdent, Map<String, String> tableProperties) {
+ String tableLocation = tableProperties.get("location");
+ if (StringUtils.isNotBlank(tableLocation)) {
+ return ensureTrailingSlash(tableLocation);
+ }
+
+ String schemaLocation;
+ try {
+ Schema schema =
loadSchema(NameIdentifier.of(tableIdent.namespace().levels()));
+ schemaLocation = schema.properties().get("location");
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to load schema for table %s to determine default
location.", tableIdent),
+ e);
+ }
+
+ // If we do not set location in table properties, and schema location is
set, use schema
+ // location
+ // as the base path.
+ if (StringUtils.isNotBlank(schemaLocation)) {
+ return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
+ }
+
+ // If the schema location is not set, use catalog lakehouse dir as the
base path. Or else, throw
+ // an exception.
+ if (catalogLakehouseDir.isEmpty()) {
+ throw new RuntimeException(
+ String.format(
+ "No location specified for table %s, you need to set location
either in catalog, schema, or table properties",
+ tableIdent));
+ }
+
+ String catalogLakehousePath = catalogLakehouseDir.get().toString();
+ String[] nsLevels = tableIdent.namespace().levels();
+ String schemaName = nsLevels[nsLevels.length - 1];
+ return ensureTrailingSlash(catalogLakehousePath)
+ + schemaName
+ + SLASH
+ + tableIdent.name()
+ + SLASH;
}
@Override
@@ -171,10 +263,46 @@ public class GenericLakehouseCatalogOperations
@Override
public boolean dropTable(NameIdentifier ident) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ EntityStore store = GravitinoEnv.getInstance().entityStore();
+ Namespace namespace = ident.namespace();
+ try {
+ GenericTableEntity tableEntity =
+ store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
+ Map<String, String> tableProperties = tableEntity.getProperties();
+ String format = tableProperties.getOrDefault("format", "lance");
+ LakehouseCatalogOperations lakehouseCatalogOperations =
+ SUPPORTED_FORMATS.compute(
+ format,
+ (k, v) ->
+ v == null
+ ? createLakehouseCatalogOperations(
+ format, tableProperties, catalogInfo,
propertiesMetadata)
+ : v);
+ return lakehouseCatalogOperations.dropTable(ident);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
+ }
}
private String ensureTrailingSlash(String path) {
return path.endsWith(SLASH) ? path : path + SLASH;
}
+
+ private LakehouseCatalogOperations createLakehouseCatalogOperations(
+ String format,
+ Map<String, String> properties,
+ CatalogInfo catalogInfo,
+ HasPropertyMetadata propertiesMetadata) {
+ LakehouseCatalogOperations operations;
+ switch (format.toLowerCase()) {
+ case "lance":
+ operations = new LanceCatalogOperations();
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported lakehouse format:
" + format);
+ }
+
+ operations.initialize(properties, catalogInfo, propertiesMetadata);
+ return operations;
+ }
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
new file mode 100644
index 0000000000..66c7147626
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse;
+
+import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.rel.TableCatalog;
+
+public interface LakehouseCatalogOperations extends CatalogOperations,
TableCatalog {}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
new file mode 100644
index 0000000000..3e1146b7ad
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import com.google.common.collect.ImmutableMap;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
+import org.apache.gravitino.connector.CatalogInfo;
+import org.apache.gravitino.connector.GenericLakehouseTable;
+import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class LanceCatalogOperations implements LakehouseCatalogOperations {
+
+ private Map<String, String> lancePropertiesMap;
+
+ @Override
+ public void initialize(
+ Map<String, String> config, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
+ throws RuntimeException {
+ lancePropertiesMap = ImmutableMap.copyOf(config);
+ }
+
+ @Override
+ public void testConnection(
+ NameIdentifier catalogIdent,
+ Catalog.Type type,
+ String provider,
+ String comment,
+ Map<String, String> properties)
+ throws Exception {}
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
+ return new NameIdentifier[0];
+ }
+
+ @Override
+ public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+ // Should not come here.
+ return null;
+ }
+
+ @Override
+ public Table createTable(
+ NameIdentifier ident,
+ Column[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders,
+ Index[] indexes)
+ throws NoSuchSchemaException, TableAlreadyExistsException {
+ // Ignore partitions, distributions, sortOrders, and indexes for Lance
tables;
+ String location = properties.get("location");
+ try (Dataset dataset =
+ Dataset.create(
+ new RootAllocator(),
+ location,
+ convertColumnsToSchema(columns),
+ new WriteParams.Builder().build())) {
+ GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
+ return builder
+ .withName(ident.name())
+ .withColumns(columns)
+ .withComment(comment)
+ .withProperties(properties)
+ .withDistribution(distribution)
+ .withIndexes(indexes)
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .build())
+ .withPartitioning(partitions)
+ .withSortOrders(sortOrders)
+ .withFormat("lance")
+ .build();
+ }
+ }
+
+ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToSchema(Column[] columns) {
+ LanceDataTypeConverter converter = new LanceDataTypeConverter();
+ List<Field> fields =
+ Arrays.stream(columns)
+ .map(
+ col -> {
+ boolean nullable = col.nullable();
+ if (nullable) {
+ return new org.apache.arrow.vector.types.pojo.Field(
+ col.name(),
+ org.apache.arrow.vector.types.pojo.FieldType.nullable(
+ converter.fromGravitino(col.dataType())),
+ null);
+ }
+
+ // not nullable
+ return new org.apache.arrow.vector.types.pojo.Field(
+ col.name(),
+ org.apache.arrow.vector.types.pojo.FieldType.notNullable(
+ converter.fromGravitino(col.dataType())),
+ null);
+ })
+ .collect(Collectors.toList());
+ return new org.apache.arrow.vector.types.pojo.Schema(fields);
+ }
+
+ @Override
+ public Table alterTable(NameIdentifier ident, TableChange... changes)
+ throws NoSuchTableException, IllegalArgumentException {
+ // Use another PRs to implement alter table for Lance tables
+ return null;
+ }
+
+ @Override
+ public boolean dropTable(NameIdentifier ident) {
+ try {
+ String location = lancePropertiesMap.get("location");
+ // Remove the directory on storage
+ FileSystem fs = FileSystem.get(new Configuration());
+ return fs.delete(new Path(location), true);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to drop Lance table: " +
ident.name(), e);
+ }
+ }
+}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
new file mode 100644
index 0000000000..117863659e
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
+import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.gravitino.connector.DataTypeConverter;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.FixedType;
+
+public class LanceDataTypeConverter implements DataTypeConverter<ArrowType,
ArrowType> {
+
+ @Override
+ public ArrowType fromGravitino(Type type) {
+ switch (type.name()) {
+ case BOOLEAN:
+ return Bool.INSTANCE;
+ case BYTE:
+ return new Int(8, true);
+ case SHORT:
+ return new Int(16, true);
+ case INTEGER:
+ return new Int(32, true);
+ case LONG:
+ return new Int(64, true);
+ case FLOAT:
+ return new FloatingPoint(FloatingPointPrecision.SINGLE);
+ case DOUBLE:
+ return new FloatingPoint(FloatingPointPrecision.DOUBLE);
+ case DECIMAL:
+ // Lance uses FIXED_SIZE_BINARY for decimal types
+ return new ArrowType.FixedSizeBinary(16); // assuming 16 bytes for
decimal
+ case DATE:
+ return new ArrowType.Date(DateUnit.DAY);
+ case TIME:
+ return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+ case TIMESTAMP:
+ return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+ case VARCHAR:
+ case STRING:
+ return new ArrowType.Utf8();
+ case FIXED:
+ FixedType fixedType = (FixedType) type;
+ return new ArrowType.FixedSizeBinary(fixedType.length());
+ case BINARY:
+ return new ArrowType.Binary();
+ default:
+ throw new UnsupportedOperationException("Unsupported Gravitino type: "
+ type.name());
+ }
+ }
+
+ @Override
+ public Type toGravitino(ArrowType arrowType) {
+ if (arrowType instanceof Bool) {
+ return Types.BooleanType.get();
+ } else if (arrowType instanceof Int intType) {
+ switch (intType.getBitWidth()) {
+ case 8 -> {
+ return Types.ByteType.get();
+ }
+ case 16 -> {
+ return Types.ShortType.get();
+ }
+ case 32 -> {
+ return Types.IntegerType.get();
+ }
+ case 64 -> {
+ return Types.LongType.get();
+ }
+ default -> throw new UnsupportedOperationException(
+ "Unsupported Int bit width: " + intType.getBitWidth());
+ }
+ } else if (arrowType instanceof FloatingPoint floatingPoint) {
+ switch (floatingPoint.getPrecision()) {
+ case SINGLE:
+ return Types.FloatType.get();
+ case DOUBLE:
+ return Types.DoubleType.get();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported FloatingPoint precision: " +
floatingPoint.getPrecision());
+ }
+ } else if (arrowType instanceof ArrowType.FixedSizeBinary) {
+ ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary)
arrowType;
+ return Types.FixedType.of(fixedSizeBinary.getByteWidth());
+ } else if (arrowType instanceof ArrowType.Date) {
+ return Types.DateType.get();
+ } else if (arrowType instanceof ArrowType.Time) {
+ return Types.TimeType.get();
+ } else if (arrowType instanceof ArrowType.Timestamp) {
+ return Types.TimestampType.withoutTimeZone();
+ } else if (arrowType instanceof ArrowType.Utf8) {
+ return Types.StringType.get();
+ } else if (arrowType instanceof ArrowType.Binary) {
+ return Types.BinaryType.get();
+ } else {
+ throw new UnsupportedOperationException("Unsupported Arrow type: " +
arrowType);
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
b/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
index 17c08ac0e9..0ef761a6ca 100644
--- a/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
+++ b/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
@@ -80,6 +80,9 @@ public final class ConfigConstants {
/** The version number for the 1.0.0 release. */
public static final String VERSION_1_0_0 = "1.0.0";
+ /** The version number for the 1.1.0 release. */
+ public static final String VERSION_1_1_0 = "1.1.0";
+
/** The current version of backend storage initialization script. */
- public static final String CURRENT_SCRIPT_VERSION = VERSION_1_0_0;
+ public static final String CURRENT_SCRIPT_VERSION = VERSION_1_1_0;
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index a777f2a511..1cbab0d6ed 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -27,6 +27,7 @@ import static
org.apache.gravitino.utils.NameIdentifierUtil.getSchemaIdentifier;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
+import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
@@ -36,6 +37,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
@@ -52,8 +54,10 @@ import org.apache.gravitino.lock.LockType;
import org.apache.gravitino.lock.TreeLockUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.GenericTable;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -487,6 +491,19 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
}
private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ if (isGenericLakehouseCatalog(catalogIdent)) {
+ try {
+ GenericTableEntity tableEntity = store.get(ident, TABLE,
GenericTableEntity.class);
+ if (tableEntity != null) {
+ GenericTable genericTable = tableEntity.toGenericTable();
+ return EntityCombinedTable.of(genericTable).withImported(true);
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to load table entity " + ident,
ioe);
+ }
+ }
+
NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
Table table =
doWithCatalog(
@@ -597,18 +614,46 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
.mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i,
idGenerator.nextId(), audit))
.collect(Collectors.toList());
- TableEntity tableEntity =
- TableEntity.builder()
- .withId(uid)
- .withName(ident.name())
- .withNamespace(ident.namespace())
- .withColumns(columnEntityList)
- .withAuditInfo(audit)
- .build();
+ TableEntity tableEntity;
+ if (isGenericLakehouseCatalog(catalogIdent)) {
+ // For generic lakehouse catalog, we only create the table entity with
basic info.
+ GenericTable genericTable = (GenericTable) table;
+ tableEntity =
+ GenericTableEntity.getBuilder()
+ .withId(uid)
+ .withName(ident.name())
+ .withNamespace(ident.namespace())
+ .withFormat(genericTable.format())
+ .withAuditInfo(audit)
+ .withColumns(columnEntityList)
+ .withIndexes(table.index())
+ .withDistribution(table.distribution())
+ .withFormat(genericTable.format())
+ .withPartitions(table.partitioning())
+ .withSortOrder(table.sortOrder())
+ .withProperties(genericTable.properties())
+ .withComment(genericTable.comment())
+ .build();
+ } else {
+ tableEntity =
+ TableEntity.builder()
+ .withId(uid)
+ .withName(ident.name())
+ .withNamespace(ident.namespace())
+ .withColumns(columnEntityList)
+ .withAuditInfo(audit)
+ .build();
+ }
try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
+ if (isGenericLakehouseCatalog(catalogIdent)) {
+ // Drop table
+ doWithCatalog(
+ catalogIdent, c -> c.doWithTableOps(t -> t.dropTable(ident)),
RuntimeException.class);
+ }
+
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenProperties(
@@ -616,6 +661,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
}
+ // For managed table, we can use table entity to indicate the table is
created successfully.
return EntityCombinedTable.of(table, tableEntity)
.withHiddenProperties(
getHiddenPropertyNames(
@@ -630,6 +676,18 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
.collect(Collectors.toList());
}
+ private boolean isGenericLakehouseCatalog(NameIdentifier catalogIdent) {
+ CatalogManager catalogManager =
GravitinoEnv.getInstance().catalogManager();
+ try {
+ Catalog catalog = catalogManager.loadCatalog(catalogIdent);
+ return catalog.type() == Catalog.Type.RELATIONAL
+ && catalog.provider().equals("generic-lakehouse");
+ } catch (NoSuchEntityException e) {
+ LOG.warn("Catalog not found: {}", catalogIdent, e);
+ return false;
+ }
+ }
+
private boolean isSameColumn(Column left, int columnPosition, ColumnEntity
right) {
return Objects.equal(left.name(), right.name())
&& columnPosition == right.position()
diff --git
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
new file mode 100644
index 0000000000..b84b265256
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.connector;
+
+import org.apache.gravitino.tag.SupportsTags;
+
+public class GenericLakehouseColumn extends BaseColumn {
+ @Override
+ public SupportsTags supportsTags() {
+ return super.supportsTags();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder extends BaseColumnBuilder<Builder,
GenericLakehouseColumn> {
+
+ /** Creates a new instance of {@link Builder}. */
+ private Builder() {}
+
+ /**
+ * Internal method to build a HiveColumn instance using the provided
values.
+ *
+ * @return A new HiveColumn instance with the configured values.
+ */
+ @Override
+ protected GenericLakehouseColumn internalBuild() {
+ GenericLakehouseColumn hiveColumn = new GenericLakehouseColumn();
+
+ hiveColumn.name = name;
+ hiveColumn.comment = comment;
+ hiveColumn.dataType = dataType;
+ hiveColumn.nullable = nullable;
+ hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET :
defaultValue;
+ return hiveColumn;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
new file mode 100644
index 0000000000..a9379a5b31
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.connector;
+
+import org.apache.gravitino.rel.GenericTable;
+
+public class GenericLakehouseTable extends BaseTable implements GenericTable {
+ @SuppressWarnings("unused")
+ private String schemaName;
+
+ private String format;
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public String format() {
+ return format;
+ }
+
+ @Override
+ public String location() {
+ return properties.get("location");
+ }
+
+ @Override
+ public boolean external() {
+ return properties.get("external") != null &&
Boolean.parseBoolean(properties.get("external"));
+ }
+
+ @Override
+ protected TableOperations newOps() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ public static class Builder extends BaseTableBuilder<Builder,
GenericLakehouseTable> {
+
+ private String schemaName;
+ private String format;
+
+ public Builder withSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ return this;
+ }
+
+ public Builder withFormat(String format) {
+ this.format = format;
+ return this;
+ }
+
+ @Override
+ protected GenericLakehouseTable internalBuild() {
+ GenericLakehouseTable genericLakehouseTable = new
GenericLakehouseTable();
+ genericLakehouseTable.schemaName = this.schemaName;
+ genericLakehouseTable.format = this.format;
+ genericLakehouseTable.columns = this.columns;
+ genericLakehouseTable.comment = this.comment;
+ genericLakehouseTable.properties = this.properties;
+ genericLakehouseTable.auditInfo = this.auditInfo;
+ genericLakehouseTable.distribution = this.distribution;
+ genericLakehouseTable.indexes = this.indexes;
+ genericLakehouseTable.name = this.name;
+ genericLakehouseTable.partitioning = this.partitioning;
+ genericLakehouseTable.sortOrders = this.sortOrders;
+ return genericLakehouseTable;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
b/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
new file mode 100644
index 0000000000..4b2dd9ad03
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import lombok.Getter;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.connector.GenericLakehouseColumn;
+import org.apache.gravitino.connector.GenericLakehouseTable;
+import org.apache.gravitino.rel.GenericTable;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+
+@Getter
+public class GenericTableEntity extends TableEntity {
+ public static final Field FORMAT = Field.required("format", Long.class, "The
table's format");
+ public static final Field PROPERTIES =
+ Field.optional("properties", Map.class, "The table's properties");
+
+ public static final Field PARTITIONS =
+ Field.optional("partitions", Transform[].class, "The table's partition");
+
+ public static final Field SORT_ORDER =
+ Field.optional("sortOrders", SortOrder[].class, "The table's sort
order");
+
+ public static final Field DISTRIBUTION =
+ Field.optional("distribution", Distribution.class, "The table's
distribution");
+
+ public static final Field INDEXES =
+ Field.optional("indexes", Index[].class, "The table's indexes");
+
+ public static final Field COMMENT =
+ Field.optional("comment", String.class, "The table's comment");
+
+ public GenericTableEntity() {
+ super();
+ }
+
+ @Override
+ public Map<Field, Object> fields() {
+ Map<Field, Object> superFields = super.fields();
+ Map<Field, Object> result = Maps.newHashMap(superFields);
+ result.put(FORMAT, format);
+ result.put(PROPERTIES, properties);
+ result.put(PARTITIONS, partitions);
+ result.put(SORT_ORDER, sortOrder);
+ result.put(DISTRIBUTION, distribution);
+ result.put(INDEXES, indexes);
+ result.put(COMMENT, comment);
+
+ return result;
+ }
+
+ private String format;
+ @Getter private Map<String, String> properties;
+ private Transform[] partitions;
+ private SortOrder[] sortOrder;
+ private Distribution distribution;
+ private Index[] indexes;
+ private String comment;
+
+ public static class Builder {
+ private final GenericTableEntity tableEntity;
+
+ public Builder() {
+ this.tableEntity = new GenericTableEntity();
+ }
+
+ public Builder withId(Long id) {
+ tableEntity.id = id;
+ return this;
+ }
+
+ public Builder withName(String name) {
+ tableEntity.name = name;
+ return this;
+ }
+
+ public Builder withAuditInfo(AuditInfo auditInfo) {
+ tableEntity.auditInfo = auditInfo;
+ return this;
+ }
+
+ public Builder withColumns(java.util.List<ColumnEntity> columns) {
+ tableEntity.columns = columns;
+ return this;
+ }
+
+ public Builder withNamespace(org.apache.gravitino.Namespace namespace) {
+ tableEntity.namespace = namespace;
+ return this;
+ }
+
+ public Builder withFormat(String format) {
+ tableEntity.format = format;
+ return this;
+ }
+
+ public Builder withProperties(Map<String, String> properties) {
+ tableEntity.properties = properties;
+ return this;
+ }
+
+ public Builder withPartitions(Transform[] partitions) {
+ tableEntity.partitions = partitions;
+ return this;
+ }
+
+ public Builder withSortOrder(SortOrder[] sortOrder) {
+ tableEntity.sortOrder = sortOrder;
+ return this;
+ }
+
+ public Builder withDistribution(Distribution distribution) {
+ tableEntity.distribution = distribution;
+ return this;
+ }
+
+ public Builder withIndexes(Index[] indexes) {
+ tableEntity.indexes = indexes;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ tableEntity.comment = comment;
+ return this;
+ }
+
+ public GenericTableEntity build() {
+ return tableEntity;
+ }
+ }
+
+ public static GenericTableEntity.Builder getBuilder() {
+ return new GenericTableEntity.Builder();
+ }
+
+ public GenericTable toGenericTable() {
+ return GenericLakehouseTable.builder()
+ .withFormat(format)
+ .withProperties(properties)
+ .withAuditInfo(auditInfo)
+ .withSortOrders(sortOrder)
+ .withPartitioning(partitions)
+ .withDistribution(distribution)
+ .withColumns(
+ columns.stream()
+ .map(this::toGenericLakehouseColumn)
+ .toArray(GenericLakehouseColumn[]::new))
+ .withIndexes(indexes)
+ .withName(name)
+ .withComment(comment)
+ .build();
+ }
+
+ private GenericLakehouseColumn toGenericLakehouseColumn(ColumnEntity
columnEntity) {
+ return GenericLakehouseColumn.builder()
+ .withName(columnEntity.name())
+ .withComment(columnEntity.comment())
+ .withAutoIncrement(columnEntity.autoIncrement())
+ .withNullable(columnEntity.nullable())
+ .withType(columnEntity.dataType())
+ .withDefaultValue(columnEntity.defaultValue())
+ .build();
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
index 9d15be7df6..595defed08 100644
--- a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
@@ -42,15 +42,15 @@ public class TableEntity implements Entity, Auditable,
HasIdentifier {
public static final Field COLUMNS =
Field.optional("columns", List.class, "The columns of the table");
- private Long id;
+ protected Long id;
- private String name;
+ protected String name;
- private AuditInfo auditInfo;
+ protected AuditInfo auditInfo;
- private Namespace namespace;
+ protected Namespace namespace;
- private List<ColumnEntity> columns;
+ protected List<ColumnEntity> columns;
/**
* Returns a map of the fields and their corresponding values for this table.
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
new file mode 100644
index 0000000000..a723c3db4a
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper;
+
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+
+public interface TableVersionMapper {
+ String TABLE_NAME = "table_version_info";
+
+ @InsertProvider(type = TableVersionSQLProviderFactory.class, method =
"insertTableVersion")
+ void insertTableVersion(@Param("tablePO") TablePO tablePO);
+
+ @InsertProvider(
+ type = TableVersionSQLProviderFactory.class,
+ method = "insertTableVersionOnDuplicateKeyUpdate")
+ void insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO
tablePO);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
new file mode 100644
index 0000000000..ab27353c00
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.TableVersionBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.TableVersionPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class TableVersionSQLProviderFactory {
+
+ private static final Map<JDBCBackendType, TableVersionBaseSQLProvider>
+ TABLE_VERSION_SQL_PROVIDER_MAP =
+ ImmutableMap.of(
+ JDBCBackendType.MYSQL, new
TableVersionSQLProviderFactory.TableVersionMySQLProvider(),
+ JDBCBackendType.H2, new
TableVersionSQLProviderFactory.TableVersionH2Provider(),
+ JDBCBackendType.POSTGRESQL, new
TableVersionPostgreSQLProvider());
+
+ public static TableVersionBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+
+ JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+ return TABLE_VERSION_SQL_PROVIDER_MAP.get(jdbcBackendType);
+ }
+
+ static class TableVersionMySQLProvider extends TableVersionBaseSQLProvider {}
+
+ static class TableVersionH2Provider extends TableVersionBaseSQLProvider {}
+
+ public static String insertTableVersion(@Param("tablePO") TablePO tablePO) {
+ return getProvider().insertTableVersion(tablePO);
+ }
+
+ public static String
insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO tablePO) {
+ return getProvider().insertTableVersionOnDuplicateKeyUpdate(tablePO);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
index f214bd1962..aaf22ccda8 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
@@ -41,6 +41,7 @@ import
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
import org.apache.gravitino.storage.relational.mapper.TagMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
@@ -78,6 +79,7 @@ public class DefaultMapperPackageProvider implements
MapperPackageProvider {
TagMetaMapper.class,
TopicMetaMapper.class,
UserMetaMapper.class,
- UserRoleRelMapper.class);
+ UserRoleRelMapper.class,
+ TableVersionMapper.class);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 9360e2c354..8065476a61 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -21,20 +21,29 @@ package
org.apache.gravitino.storage.relational.mapper.provider.base;
import static
org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME;
import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
import org.apache.gravitino.storage.relational.po.TablePO;
import org.apache.ibatis.annotations.Param;
public class TableMetaBaseSQLProvider {
public String listTablePOsBySchemaId(@Param("schemaId") Long schemaId) {
- return "SELECT table_id as tableId, table_name as tableName,"
- + " metalake_id as metalakeId, catalog_id as catalogId,"
- + " schema_id as schemaId, audit_info as auditInfo,"
- + " current_version as currentVersion, last_version as lastVersion,"
- + " deleted_at as deletedAt"
+ return "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+ + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+ + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+ + " tm.current_version as currentVersion, tm.last_version as
lastVersion,"
+ + " tm.deleted_at as deletedAt,"
+ + " tv.format as format, "
+ + " tv.properties as properties,"
+ + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+ + " tv.distribution as distribution, tv.indexes as indexes,"
+ + " tv.comment as comment"
+ " FROM "
+ TABLE_NAME
- + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+ + " tm LEFT JOIN "
+ + TableVersionMapper.TABLE_NAME
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version AND tv.deleted_at = 0"
+ + " WHERE tm.schema_id = #{schemaId} AND tm.deleted_at = 0";
}
public String listTablePOsByTableIds(List<Long> tableIds) {
@@ -65,14 +74,22 @@ public class TableMetaBaseSQLProvider {
public String selectTableMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("tableName") String name) {
- return "SELECT table_id as tableId, table_name as tableName,"
- + " metalake_id as metalakeId, catalog_id as catalogId,"
- + " schema_id as schemaId, audit_info as auditInfo,"
- + " current_version as currentVersion, last_version as lastVersion,"
- + " deleted_at as deletedAt"
+ return "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+ + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+ + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+ + " tm.current_version as currentVersion, tm.last_version as
lastVersion,"
+ + " tm.deleted_at as deletedAt,"
+ + " tv.format as format, "
+ + " tv.properties as properties,"
+ + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+ + " tv.distribution as distribution, tv.indexes as indexes,"
+ + " tv.comment as comment"
+ " FROM "
+ TABLE_NAME
- + " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND
deleted_at = 0";
+ + " tm LEFT JOIN "
+ + TableVersionMapper.TABLE_NAME
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version AND tv.deleted_at = 0"
+ + " WHERE tm.schema_id = #{schemaId} AND tm.table_name = #{tableName}
AND tm.deleted_at = 0";
}
public String selectTableMetaById(@Param("tableId") Long tableId) {
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
new file mode 100644
index 0000000000..3501abe10c
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static
org.apache.gravitino.storage.relational.mapper.TableVersionMapper.TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.Param;
+
+public class TableVersionBaseSQLProvider {
+
+ public String insertTableVersion(@Param("tablePO") TablePO tablePO) {
+ return "INSERT INTO "
+ + TABLE_NAME
+ + " (table_id, format, properties, partitioning"
+ + " distribution, sort_orders, indexes, comment,"
+ + " version, last_version, deleted_at)"
+ + " VALUES ("
+ + " #{tablePO.tableId},"
+ + " #{tablePO.format},"
+ + " #{tablePO.properties},"
+ + " #{tablePO.partitions},"
+ + " #{tablePO.distribution},"
+ + " #{tablePO.sortOrders},"
+ + " #{tablePO.indexes},"
+ + " #{tablePO.comment},"
+ + " #{tablePO.currentVersion},"
+ + " #{tablePO.lastVersion},"
+ + " #{tablePO.deletedAt}"
+ + " )";
+ }
+
+ public String insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO")
TablePO tablePO) {
+ return "INSERT INTO "
+ + TABLE_NAME
+ + " (table_id, format, properties, partitioning,"
+ + " distribution, sort_orders, indexes, comment,"
+ + " version, deleted_at)"
+ + " VALUES ("
+ + " #{tablePO.tableId},"
+ + " #{tablePO.format},"
+ + " #{tablePO.properties},"
+ + " #{tablePO.partitions},"
+ + " #{tablePO.distribution},"
+ + " #{tablePO.sortOrders},"
+ + " #{tablePO.indexes},"
+ + " #{tablePO.comment},"
+ + " #{tablePO.currentVersion},"
+ + " #{tablePO.deletedAt}"
+ + " )"
+ + " ON DUPLICATE KEY UPDATE"
+ + " format = #{tablePO.format},"
+ + " properties = #{tablePO.properties},"
+ + " partitioning = #{tablePO.partitions},"
+ + " distribution = #{tablePO.distribution},"
+ + " sort_orders = #{tablePO.sortOrders},"
+ + " indexes = #{tablePO.indexes},"
+ + " comment = #{tablePO.comment},"
+ + " version = #{tablePO.currentVersion},"
+ + " deleted_at = #{tablePO.deletedAt}";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
new file mode 100644
index 0000000000..e0a7413b1c
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import
org.apache.gravitino.storage.relational.mapper.provider.base.TableVersionBaseSQLProvider;
+
+public class TableVersionPostgreSQLProvider extends
TableVersionBaseSQLProvider {}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
index 693105e772..56fea38337 100644
--- a/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
@@ -20,7 +20,9 @@ package org.apache.gravitino.storage.relational.po;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import lombok.Getter;
+@Getter
public class TablePO {
private Long tableId;
private String tableName;
@@ -32,6 +34,15 @@ public class TablePO {
private Long lastVersion;
private Long deletedAt;
+ private String format;
+
+ private String properties;
+ private String partitions;
+ private String sortOrders;
+ private String distribution;
+ private String indexes;
+ private String comment;
+
public Long getTableId() {
return tableId;
}
@@ -154,6 +165,41 @@ public class TablePO {
return this;
}
+ public Builder withFormat(String format) {
+ tablePO.format = format;
+ return this;
+ }
+
+ public Builder withProperties(String properties) {
+ tablePO.properties = properties;
+ return this;
+ }
+
+ public Builder withPartitions(String partitions) {
+ tablePO.partitions = partitions;
+ return this;
+ }
+
+ public Builder withSortOrders(String sortOrders) {
+ tablePO.sortOrders = sortOrders;
+ return this;
+ }
+
+ public Builder withDistribution(String distribution) {
+ tablePO.distribution = distribution;
+ return this;
+ }
+
+ public Builder withIndexes(String indexes) {
+ tablePO.indexes = indexes;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ tablePO.comment = comment;
+ return this;
+ }
+
private void validate() {
Preconditions.checkArgument(tablePO.tableId != null, "Table id is
required");
Preconditions.checkArgument(tablePO.tableName != null, "Table name is
required");
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 326ba63b5f..f4bbf7a6f6 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -40,6 +40,7 @@ import
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMap
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.po.ColumnPO;
import org.apache.gravitino.storage.relational.po.TablePO;
@@ -118,12 +119,12 @@ public class TableMetaService {
fillTablePOBuilderParentEntityId(builder, tableEntity.namespace());
AtomicReference<TablePO> tablePORef = new AtomicReference<>();
+ TablePO po = POConverters.initializeTablePOWithVersion(tableEntity,
builder);
SessionUtils.doMultipleWithCommit(
() ->
SessionUtils.doWithoutCommit(
TableMetaMapper.class,
mapper -> {
- TablePO po =
POConverters.initializeTablePOWithVersion(tableEntity, builder);
tablePORef.set(po);
if (overwrite) {
mapper.insertTableMetaOnDuplicateKeyUpdate(po);
@@ -131,6 +132,18 @@ public class TableMetaService {
mapper.insertTableMeta(po);
}
}),
+ () ->
+ SessionUtils.doWithCommit(
+ TableVersionMapper.class,
+ mapper -> {
+ if (po.getFormat() != null) {
+ if (overwrite) {
+ mapper.insertTableVersionOnDuplicateKeyUpdate(po);
+ } else {
+ mapper.insertTableVersion(po);
+ }
+ }
+ }),
() -> {
// We need to delete the columns first if we want to overwrite the
table.
if (overwrite) {
@@ -292,7 +305,6 @@ public class TableMetaService {
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId,
tableName));
-
if (tablePO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 127cb022e8..62bc11f891 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -45,6 +45,7 @@ import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.ColumnEntity;
import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.ModelVersionEntity;
@@ -60,6 +61,7 @@ import org.apache.gravitino.policy.Policy;
import org.apache.gravitino.policy.PolicyContent;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.storage.relational.po.CatalogPO;
import org.apache.gravitino.storage.relational.po.ColumnPO;
@@ -390,14 +392,44 @@ public class POConverters {
public static TablePO initializeTablePOWithVersion(
TableEntity tableEntity, TablePO.Builder builder) {
try {
- return builder
+ builder
.withTableId(tableEntity.id())
.withTableName(tableEntity.name())
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.auditInfo()))
.withCurrentVersion(INIT_VERSION)
.withLastVersion(INIT_VERSION)
- .withDeletedAt(DEFAULT_DELETED_AT)
- .build();
+ .withDeletedAt(DEFAULT_DELETED_AT);
+
+ if (tableEntity instanceof GenericTableEntity genericTable) {
+ builder.withFormat(genericTable.getFormat());
+ builder.withComment(genericTable.getComment());
+ builder.withProperties(
+ genericTable.getProperties() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
+
+ // TODO store the following information to databases;
+ /**
+ * builder.withDistribution( genericTable.getDistribution() == null ?
null :
+ *
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getDistribution()));
+ * builder.withPartitions( genericTable.getPartitions() == null ? null
:
+ *
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getPartitions()));
+ */
+ builder.withIndexes(
+ genericTable.getIndexes() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
+ builder.withProperties(
+ genericTable.getProperties() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
+ builder.withSortOrders(
+ genericTable.getSortOrder() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getSortOrder()));
+ }
+
+ return builder.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize json object:", e);
}
@@ -455,6 +487,29 @@ public class POConverters {
public static TableEntity fromTableAndColumnPOs(
TablePO tablePO, List<ColumnPO> columnPOs, Namespace namespace) {
try {
+ if (tablePO.getFormat() != null) {
+ return GenericTableEntity.getBuilder()
+ .withId(tablePO.getTableId())
+ .withName(tablePO.getTableName())
+ .withNamespace(namespace)
+ .withColumns(fromColumnPOs(columnPOs))
+ .withAuditInfo(
+ JsonUtils.anyFieldMapper().readValue(tablePO.getAuditInfo(),
AuditInfo.class))
+ // TODO add field partition, distribution and sort order;
+ .withIndexes(
+ StringUtils.isBlank(tablePO.getIndexes())
+ ? null
+ :
JsonUtils.anyFieldMapper().readValue(tablePO.getIndexes(), IndexImpl[].class))
+ .withFormat(tablePO.getFormat())
+ .withComment(tablePO.getComment())
+ .withProperties(
+ StringUtils.isBlank(tablePO.getProperties())
+ ? null
+ :
JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), Map.class))
+ .withColumns(fromColumnPOs(columnPOs))
+ .build();
+ }
+
return TableEntity.builder()
.withId(tablePO.getTableId())
.withName(tablePO.getTableName())
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
index 752d89533d..0482bfecfd 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
@@ -106,4 +106,16 @@ public class SessionUtils {
throw e;
}
}
+
+ public static void beginTransaction() {
+ SqlSessions.getSqlSession();
+ }
+
+ public static void commitTransaction() {
+ SqlSessions.commitAndCloseSqlSession();
+ }
+
+ public static void rollbackTransaction() {
+ SqlSessions.rollbackAndCloseSqlSession();
+ }
}
diff --git a/scripts/h2/schema-1.1.0-h2.sql b/scripts/h2/schema-1.1.0-h2.sql
index 98a1217423..6172915f1f 100644
--- a/scripts/h2/schema-1.1.0-h2.sql
+++ b/scripts/h2/schema-1.1.0-h2.sql
@@ -1,22 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file--
diff --git a/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
b/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
index f76a2c2593..cf42a02b57 100644
--- a/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
+++ b/scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
@@ -1,21 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file--
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
CREATE TABLE IF NOT EXISTS `table_version_info` (
`table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
diff --git a/scripts/mysql/schema-1.1.0-mysql.sql
b/scripts/mysql/schema-1.1.0-mysql.sql
index c6bd8a81e3..ca9b351b03 100644
--- a/scripts/mysql/schema-1.1.0-mysql.sql
+++ b/scripts/mysql/schema-1.1.0-mysql.sql
@@ -1,22 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file--
diff --git a/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
b/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
index 5560993eb6..6663150f15 100644
--- a/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
+++ b/scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
@@ -1,21 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file--
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
CREATE TABLE IF NOT EXISTS `table_version_info` (
`table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id',
diff --git a/scripts/postgresql/schema-1.1.0-postgresql.sql
b/scripts/postgresql/schema-1.1.0-postgresql.sql
index bc69e7839b..c5bc6b3205 100644
--- a/scripts/postgresql/schema-1.1.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.1.0-postgresql.sql
@@ -1,22 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file--
diff --git a/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
b/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
index 882c9a6cc2..42d06e30a8 100644
--- a/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-1.0.0-to-1.1.0-postgresql.sql
@@ -1,21 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file--
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
CREATE TABLE IF NOT EXISTS table_version_info (