This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch generic_iceberg in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit cd2027a71d834d7ca291588b18358779b0452b95 Author: fanng <[email protected]> AuthorDate: Wed Dec 10 10:46:25 2025 +0800 first PR --- .../lakehouse/iceberg/IcebergTableDelegator.java | 60 +++++ .../lakehouse/iceberg/IcebergTableOperations.java | 89 +++++++ ...talog.lakehouse.generic.LakehouseTableDelegator | 1 + .../iceberg/TestIcebergTableDelegator.java | 256 +++++++++++++++++++++ 4 files changed, 406 insertions(+) diff --git a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTableDelegator.java b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTableDelegator.java new file mode 100644 index 0000000000..bb5e8a18af --- /dev/null +++ b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTableDelegator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.iceberg; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.catalog.ManagedSchemaOperations; +import org.apache.gravitino.catalog.ManagedTableOperations; +import org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator; +import org.apache.gravitino.connector.PropertyEntry; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.storage.IdGenerator; + +public class IcebergTableDelegator implements LakehouseTableDelegator { + + public static final String ICEBERG_TABLE_FORMAT = "iceberg"; + + @Override + public String tableFormat() { + return ICEBERG_TABLE_FORMAT; + } + + @Override + public List<PropertyEntry<?>> tablePropertyEntries() { + // Currently rely on generic table properties (location, format). Additional Iceberg-specific + // properties can be added here once they are supported end-to-end. + return ImmutableList.of( + PropertyEntry.booleanPropertyEntry( + Table.PROPERTY_EXTERNAL, + "Whether the Iceberg table is managed by Gravitino", + false, + false, + false, + false, + false)); + } + + @Override + public ManagedTableOperations createTableOps( + EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator idGenerator) { + return new IcebergTableOperations(store, schemaOps, idGenerator); + } +} diff --git a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTableOperations.java b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTableOperations.java new file mode 100644 index 0000000000..4ef459b89f --- /dev/null +++ b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTableOperations.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.iceberg; + +import com.google.common.base.Preconditions; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.ManagedSchemaOperations; +import org.apache.gravitino.catalog.ManagedTableOperations; +import org.apache.gravitino.connector.SupportsSchemas; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.storage.IdGenerator; + +/** + * Minimal Iceberg table operations for the generic lakehouse catalog. These operations persist + * table metadata in the entity store and currently do not manage physical Iceberg metadata or + * data files. + */ +public class IcebergTableOperations extends ManagedTableOperations { + + private final EntityStore store; + private final ManagedSchemaOperations schemaOps; + private final IdGenerator idGenerator; + + public IcebergTableOperations( + EntityStore store, ManagedSchemaOperations schemaOps, IdGenerator idGenerator) { + this.store = store; + this.schemaOps = schemaOps; + this.idGenerator = idGenerator; + } + + @Override + protected EntityStore store() { + return store; + } + + @Override + protected SupportsSchemas schemas() { + return schemaOps; + } + + @Override + protected IdGenerator idGenerator() { + return idGenerator; + } + + @Override + public Table createTable( + NameIdentifier ident, + Column[] columns, + String comment, + Map<String, String> properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) + throws NoSuchSchemaException, TableAlreadyExistsException { + Preconditions.checkArgument( + StringUtils.isNotBlank(properties.get(Table.PROPERTY_LOCATION)), + "Table location must be specified for Iceberg tables"); + return super.createTable( + ident, columns, comment, properties, partitions, distribution, sortOrders, indexes); + } +} diff --git a/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator index 352813f4b1..ee05ddc860 100644 --- a/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator +++ b/catalogs/catalog-lakehouse-generic/src/main/resources/META-INF/services/org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator @@ -17,3 +17,4 @@ # under the License. # org.apache.gravitino.catalog.lakehouse.lance.LanceTableDelegator +org.apache.gravitino.catalog.lakehouse.iceberg.IcebergTableDelegator diff --git a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergTableDelegator.java b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergTableDelegator.java new file mode 100644 index 0000000000..aaead4dc9e --- /dev/null +++ b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergTableDelegator.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.iceberg; + +import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PATH; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_STORE; +import static org.apache.gravitino.Configs.ENTITY_STORE; +import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE; +import static org.apache.gravitino.Configs.STORE_DELETE_AFTER_TIME; +import static org.apache.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME; +import static org.apache.gravitino.Configs.VERSION_RETENTION_COUNT; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.Config; +import org.apache.gravitino.Configs; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.EntityStoreFactory; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.lakehouse.generic.GenericCatalogOperations; +import org.apache.gravitino.catalog.lakehouse.generic.GenericCatalogPropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.generic.GenericSchemaPropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.generic.GenericTablePropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegator; +import org.apache.gravitino.catalog.lakehouse.generic.LakehouseTableDelegatorFactory; +import org.apache.gravitino.connector.CatalogInfo; +import org.apache.gravitino.connector.HasPropertyMetadata; +import org.apache.gravitino.connector.PropertiesMetadata; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.BaseMetalake; +import org.apache.gravitino.meta.CatalogEntity; +import org.apache.gravitino.meta.SchemaVersion; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.types.Types; +import org.apache.gravitino.storage.IdGenerator; +import org.apache.gravitino.storage.RandomIdGenerator; +import org.apache.gravitino.utils.NameIdentifierUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestIcebergTableDelegator { + + private static final String STORE_PATH = + "/tmp/gravitino_test_iceberg_" + UUID.randomUUID().toString().replace("-", ""); + private static final String METALAKE_NAME = "metalake_for_iceberg_generic_test"; + private static final String CATALOG_NAME = "lakehouse_generic_catalog"; + + private static EntityStore store; + private static IdGenerator idGenerator; + private static GenericCatalogOperations ops; + + @BeforeAll + public static void setUp() throws Exception { + Config config = Mockito.mock(Config.class); + when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE); + when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PATH)).thenReturn(STORE_PATH); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL)) + .thenReturn(String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", STORE_PATH)); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("gravitino"); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("gravitino"); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver"); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS)).thenReturn(100); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS)).thenReturn(1000L); + + File f = FileUtils.getFile(STORE_PATH); + f.deleteOnExit(); + + when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L); + when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L); + when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); + when(config.get(Configs.CACHE_ENABLED)).thenReturn(false); + + store = EntityStoreFactory.createEntityStore(config); + store.initialize(config); + idGenerator = RandomIdGenerator.INSTANCE; + + AuditInfo auditInfo = + AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build(); + BaseMetalake metalake = + BaseMetalake.builder() + .withId(idGenerator.nextId()) + .withName(METALAKE_NAME) + .withVersion(SchemaVersion.V_0_1) + .withAuditInfo(auditInfo) + .withName(METALAKE_NAME) + .build(); + store.put(metalake, false); + + CatalogEntity catalog = + CatalogEntity.builder() + .withId(idGenerator.nextId()) + .withName(CATALOG_NAME) + .withNamespace(Namespace.of(METALAKE_NAME)) + .withProvider("lakehouse-generic") + .withType(Catalog.Type.RELATIONAL) + .withAuditInfo(auditInfo) + .build(); + store.put(catalog, false); + + ops = new GenericCatalogOperations(store, idGenerator); + ops.initialize( + Collections.emptyMap(), + Mockito.mock(CatalogInfo.class), + new GenericPropertiesMetadata()); + } + + @AfterAll + public static void tearDown() throws Exception { + if (ops != null) { + ops.close(); + } + if (store != null) { + store.close(); + } + FileUtils.deleteDirectory(new File(STORE_PATH)); + } + + @Test + public void testDelegatorIsRegistered() { + Map<String, LakehouseTableDelegator> delegators = + LakehouseTableDelegatorFactory.tableDelegators(); + Set<String> formats = delegators.keySet(); + Assertions.assertTrue(formats.contains(IcebergTableDelegator.ICEBERG_TABLE_FORMAT)); + } + + @Test + public void testCreateAndDropIcebergTable() + throws NoSuchSchemaException, SchemaAlreadyExistsException, TableAlreadyExistsException, + NoSuchTableException, IOException { + String schemaName = randomSchemaName(); + NameIdentifier schemaIdent = + NameIdentifierUtil.ofSchema(METALAKE_NAME, CATALOG_NAME, schemaName); + Map<String, String> schemaProperties = + StringIdentifier.newPropertiesWithId(StringIdentifier.fromId(idGenerator.nextId()), null); + + Schema schema = ops.createSchema(schemaIdent, "schema comment", schemaProperties); + Assertions.assertEquals(schemaName, schema.name()); + + String tableName = "tbl_" + UUID.randomUUID().toString().replace("-", ""); + NameIdentifier tableIdent = + NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, schemaName, tableName); + + Map<String, String> tableProperties = + StringIdentifier.newPropertiesWithId(StringIdentifier.fromId(idGenerator.nextId()), null); + Path tableLocation = Files.createTempDirectory("iceberg_generic_tbl"); + tableProperties.put(Table.PROPERTY_LOCATION, tableLocation.toString()); + tableProperties.put(Table.PROPERTY_TABLE_FORMAT, IcebergTableDelegator.ICEBERG_TABLE_FORMAT); + tableProperties.put(Table.PROPERTY_EXTERNAL, "true"); + + Column[] columns = + new Column[] { + Column.of("id", Types.LongType.get(), "id column"), + Column.of("data", Types.StringType.get(), "data column") + }; + + Table createdTable = + ops.createTable( + tableIdent, + columns, + "table comment", + tableProperties, + Transforms.EMPTY_TRANSFORM, + null, + null, + null); + + Assertions.assertEquals(tableName, createdTable.name()); + Assertions.assertEquals( + IcebergTableDelegator.ICEBERG_TABLE_FORMAT, + createdTable.properties().get(Table.PROPERTY_TABLE_FORMAT)); + Assertions.assertEquals( + tableLocation.toString() + "/", + createdTable.properties().get(Table.PROPERTY_LOCATION)); + + Table loadedTable = ops.loadTable(tableIdent); + Assertions.assertEquals( + createdTable.properties().get(Table.PROPERTY_LOCATION), + loadedTable.properties().get(Table.PROPERTY_LOCATION)); + + Assertions.assertTrue(ops.dropTable(tableIdent)); + Assertions.assertFalse(ops.dropTable(tableIdent)); + FileUtils.deleteDirectory(tableLocation.toFile()); + } + + private String randomSchemaName() { + return "schema_" + UUID.randomUUID().toString().replace("-", ""); + } + + private static class GenericPropertiesMetadata implements HasPropertyMetadata { + private final PropertiesMetadata catalogProps = new GenericCatalogPropertiesMetadata(); + private final PropertiesMetadata schemaProps = new GenericSchemaPropertiesMetadata(); + private final PropertiesMetadata tableProps = new GenericTablePropertiesMetadata(); + + @Override + public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException { + return catalogProps; + } + + @Override + public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException { + return schemaProps; + } + + @Override + public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { + return tableProps; + } + } +}
