This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f01e911a0d56ecd2e40aa81d1808067515c9610c Author: JingsongLi <[email protected]> AuthorDate: Fri Dec 17 15:56:01 2021 +0800 [FLINK-25174][table] Implement callback for managed table This closes #18088 --- .../apache/flink/table/catalog/CatalogManager.java | 54 ++++++-- .../flink/table/catalog/ManagedTableListener.java | 137 ++++++++++++++++++ .../flink/table/api/TableEnvironmentTest.java | 153 +++++++++++++++++---- 3 files changed, 312 insertions(+), 32 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index f0648b8..496c936 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -84,8 +84,13 @@ public final class CatalogManager { private final DataTypeFactory typeFactory; + private final ManagedTableListener managedTableListener; + private CatalogManager( - String defaultCatalogName, Catalog defaultCatalog, DataTypeFactory typeFactory) { + String defaultCatalogName, + Catalog defaultCatalog, + DataTypeFactory typeFactory, + ManagedTableListener managedTableListener) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName), "Default catalog name cannot be null or empty"); @@ -101,6 +106,7 @@ public final class CatalogManager { builtInCatalogName = defaultCatalogName; this.typeFactory = typeFactory; + this.managedTableListener = managedTableListener; } public static Builder newBuilder() { @@ -147,7 +153,8 @@ public final class CatalogManager { return new CatalogManager( defaultCatalogName, defaultCatalog, - new DataTypeFactoryImpl(classLoader, config, executionConfig)); + new DataTypeFactoryImpl(classLoader, config, executionConfig), + new ManagedTableListener(classLoader, config)); } } @@ -656,8 +663,18 @@ public final class CatalogManager { public void createTable( CatalogBaseTable table, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { execute( - (catalog, path) -> - catalog.createTable(path, resolveCatalogBaseTable(table), ignoreIfExists), + (catalog, path) -> { + ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(table); + ResolvedCatalogBaseTable<?> resolvedListenedTable = + managedTableListener.notifyTableCreation( + catalog, + objectIdentifier, + resolvedTable, + false, + ignoreIfExists); + + catalog.createTable(path, resolvedListenedTable, ignoreIfExists); + }, objectIdentifier, false, "CreateTable"); @@ -686,13 +703,21 @@ public final class CatalogManager { } return v; } else { - final CatalogBaseTable resolvedTable = resolveCatalogBaseTable(table); + ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(table); + ResolvedCatalogBaseTable<?> resolvedListenedTable = + managedTableListener.notifyTableCreation( + getCatalog(objectIdentifier.getCatalogName()).orElse(null), + objectIdentifier, + resolvedTable, + true, + ignoreIfExists); + if (listener.isPresent()) { return listener.get() .onCreateTemporaryTable( - objectIdentifier.toObjectPath(), resolvedTable); + objectIdentifier.toObjectPath(), resolvedListenedTable); } - return resolvedTable; + return resolvedListenedTable; } }); } @@ -729,6 +754,12 @@ public final class CatalogManager { if (filter.test(catalogBaseTable)) { getTemporaryOperationListener(objectIdentifier) .ifPresent(l -> l.onDropTemporaryTable(objectIdentifier.toObjectPath())); + + Catalog catalog = catalogs.get(objectIdentifier.getCatalogName()); + ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(catalogBaseTable); + managedTableListener.notifyTableDrop( + catalog, objectIdentifier, resolvedTable, true, ignoreIfNotExists); + temporaryTables.remove(objectIdentifier); } else if (!ignoreIfNotExists) { throw new ValidationException( @@ -808,7 +839,14 @@ public final class CatalogManager { final Optional<CatalogBaseTable> resultOpt = getUnresolvedTable(objectIdentifier); if (resultOpt.isPresent() && filter.test(resultOpt.get())) { execute( - (catalog, path) -> catalog.dropTable(path, ignoreIfNotExists), + (catalog, path) -> { + ResolvedCatalogBaseTable<?> resolvedTable = + resolveCatalogBaseTable(resultOpt.get()); + managedTableListener.notifyTableDrop( + catalog, objectIdentifier, resolvedTable, false, ignoreIfNotExists); + + catalog.dropTable(path, ignoreIfNotExists); + }, objectIdentifier, ignoreIfNotExists, "DropTable"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java new file mode 100644 index 0000000..211dce7 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Map; + +import static org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory; + +/** The listener for managed table operations. */ +@Internal +public class ManagedTableListener { + + private final ClassLoader classLoader; + + private final ReadableConfig config; + + public ManagedTableListener(ClassLoader classLoader, ReadableConfig config) { + this.classLoader = classLoader; + this.config = config; + } + + /** Notify for creating managed table. */ + public ResolvedCatalogBaseTable<?> notifyTableCreation( + @Nullable Catalog catalog, + ObjectIdentifier identifier, + ResolvedCatalogBaseTable<?> table, + boolean isTemporary, + boolean ignoreIfExists) { + if (isManagedTable(catalog, table)) { + ResolvedCatalogTable managedTable = enrichOptions(identifier, table, isTemporary); + discoverManagedTableFactory(classLoader) + .onCreateTable( + createTableFactoryContext(identifier, managedTable, isTemporary), + ignoreIfExists); + return managedTable; + } + return table; + } + + /** Notify for dropping managed table. */ + public void notifyTableDrop( + @Nullable Catalog catalog, + ObjectIdentifier identifier, + ResolvedCatalogBaseTable<?> table, + boolean isTemporary, + boolean ignoreIfNotExists) { + if (isManagedTable(catalog, table)) { + discoverManagedTableFactory(classLoader) + .onDropTable( + createTableFactoryContext( + identifier, (ResolvedCatalogTable) table, isTemporary), + ignoreIfNotExists); + } + } + + private boolean isManagedTable(@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) { + if (catalog == null || !catalog.supportsManagedTable()) { + // catalog not support managed table + return false; + } + + if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) { + // view is not managed table + return false; + } + + Map<String, String> options; + try { + options = table.getOptions(); + } catch (TableException ignore) { + // exclude abnormal tables, such as InlineCatalogTable that does not have the options + return false; + } + + if (!StringUtils.isNullOrWhitespaceOnly( + options.get(ConnectorDescriptorValidator.CONNECTOR_TYPE))) { + // legacy connector is not managed table + return false; + } + + if (!StringUtils.isNullOrWhitespaceOnly(options.get(FactoryUtil.CONNECTOR.key()))) { + // with connector is not managed table + return false; + } + + // ConnectorCatalogTable is not managed table + return !(table.getOrigin() instanceof ConnectorCatalogTable); + } + + /** Enrich options for creating managed table. */ + private ResolvedCatalogTable enrichOptions( + ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> table, boolean isTemporary) { + if (!(table instanceof ResolvedCatalogTable)) { + throw new UnsupportedOperationException( + "Managed table only supports catalog table, unsupported table type: " + + table.getClass()); + } + ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) table; + Map<String, String> newOptions = + discoverManagedTableFactory(classLoader) + .enrichOptions( + createTableFactoryContext(identifier, resolvedTable, isTemporary)); + return resolvedTable.copy(newOptions); + } + + private DynamicTableFactory.Context createTableFactoryContext( + ObjectIdentifier identifier, ResolvedCatalogTable table, boolean isTemporary) { + return new FactoryUtil.DefaultDynamicTableContext( + identifier, table, config, classLoader, isTemporary); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java index 18542bc..c4c12bb 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java @@ -24,15 +24,22 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.operations.CatalogQueryOperation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.utils.TableEnvironmentMock; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import java.util.Map; import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.apache.flink.table.factories.TestManagedTableFactory.ENRICHED_KEY; +import static org.apache.flink.table.factories.TestManagedTableFactory.ENRICHED_VALUE; +import static org.apache.flink.table.factories.TestManagedTableFactory.MANAGED_TABLES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link TableEnvironment}. */ public class TableEnvironmentTest { @@ -48,20 +55,21 @@ public class TableEnvironmentTest { "T", TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build()); - assertFalse( - tEnv.getCatalog(catalog) - .orElseThrow(AssertionError::new) - .tableExists(new ObjectPath(database, "T"))); + assertThat( + tEnv.getCatalog(catalog) + .orElseThrow(AssertionError::new) + .tableExists(new ObjectPath(database, "T"))) + .isFalse(); final Optional<CatalogManager.TableLookupResult> lookupResult = tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, database, "T")); - assertTrue(lookupResult.isPresent()); + assertThat(lookupResult.isPresent()).isTrue(); final CatalogBaseTable catalogTable = lookupResult.get().getTable(); - assertTrue(catalogTable instanceof CatalogTable); - assertEquals(schema, catalogTable.getUnresolvedSchema()); - assertEquals("fake", catalogTable.getOptions().get("connector")); - assertEquals("Test", catalogTable.getOptions().get("a")); + assertThat(catalogTable instanceof CatalogTable).isTrue(); + assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema); + assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake"); + assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); } @Test @@ -76,15 +84,18 @@ public class TableEnvironmentTest { TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build()); final ObjectPath objectPath = new ObjectPath(database, "T"); - assertTrue( - tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).tableExists(objectPath)); + assertThat( + tEnv.getCatalog(catalog) + .orElseThrow(AssertionError::new) + .tableExists(objectPath)) + .isTrue(); final CatalogBaseTable catalogTable = tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath); - assertTrue(catalogTable instanceof CatalogTable); - assertEquals(schema, catalogTable.getUnresolvedSchema()); - assertEquals("fake", catalogTable.getOptions().get("connector")); - assertEquals("Test", catalogTable.getOptions().get("a")); + assertThat(catalogTable instanceof CatalogTable).isTrue(); + assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema); + assertThat(catalogTable.getOptions().get("connector")).isEqualTo("fake"); + assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); } @Test @@ -97,17 +108,111 @@ public class TableEnvironmentTest { final Table table = tEnv.from(descriptor); - assertEquals( - schema, Schema.newBuilder().fromResolvedSchema(table.getResolvedSchema()).build()); + assertThat(Schema.newBuilder().fromResolvedSchema(table.getResolvedSchema()).build()) + .isEqualTo(schema); - assertTrue(table.getQueryOperation() instanceof CatalogQueryOperation); + assertThat(table.getQueryOperation() instanceof CatalogQueryOperation).isTrue(); final ObjectIdentifier tableIdentifier = ((CatalogQueryOperation) table.getQueryOperation()).getTableIdentifier(); final Optional<CatalogManager.TableLookupResult> lookupResult = tEnv.getCatalogManager().getTable(tableIdentifier); - assertTrue(lookupResult.isPresent()); + assertThat(lookupResult.isPresent()).isTrue(); + + assertThat(lookupResult.get().getTable().getOptions().get("connector")).isEqualTo("fake"); + } + + @Test + public void testManagedTable() { + innerTestManagedTableFromDescriptor(false, false); + } - assertEquals("fake", lookupResult.get().getTable().getOptions().get("connector")); + @Test + public void testManagedTableWithIgnoreExists() { + innerTestManagedTableFromDescriptor(true, false); + } + + @Test + public void testTemporaryManagedTableWithIgnoreExists() { + innerTestManagedTableFromDescriptor(true, true); + } + + @Test + public void testTemporaryManagedTable() { + innerTestManagedTableFromDescriptor(true, true); + } + + private void innerTestManagedTableFromDescriptor(boolean ignoreIfExists, boolean isTemporary) { + final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + final String catalog = tEnv.getCurrentCatalog(); + final String database = tEnv.getCurrentDatabase(); + + final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); + final String tableName = UUID.randomUUID().toString(); + ObjectIdentifier identifier = ObjectIdentifier.of(catalog, database, tableName); + + // create table + MANAGED_TABLES.put(identifier, new AtomicReference<>()); + CreateTableOperation createOperation = + new CreateTableOperation( + identifier, + TableDescriptor.forManaged() + .schema(schema) + .option("a", "Test") + .build() + .toCatalogTable(), + ignoreIfExists, + isTemporary); + + tEnv.executeInternal(createOperation); + + // test ignore: create again + if (ignoreIfExists) { + tEnv.executeInternal(createOperation); + } else { + assertThatThrownBy( + () -> tEnv.executeInternal(createOperation), + isTemporary ? "already exists" : "Could not execute CreateTable"); + } + + // lookup table + + boolean isInCatalog = + tEnv.getCatalog(catalog) + .orElseThrow(AssertionError::new) + .tableExists(new ObjectPath(database, tableName)); + if (isTemporary) { + assertThat(isInCatalog).isFalse(); + } else { + assertThat(isInCatalog).isTrue(); + } + + final Optional<CatalogManager.TableLookupResult> lookupResult = + tEnv.getCatalogManager().getTable(identifier); + assertThat(lookupResult.isPresent()).isTrue(); + + final CatalogBaseTable catalogTable = lookupResult.get().getTable(); + assertThat(catalogTable instanceof CatalogTable).isTrue(); + assertThat(catalogTable.getUnresolvedSchema()).isEqualTo(schema); + assertThat(catalogTable.getOptions().get("a")).isEqualTo("Test"); + assertThat(catalogTable.getOptions().get(ENRICHED_KEY)).isEqualTo(ENRICHED_VALUE); + + AtomicReference<Map<String, String>> reference = MANAGED_TABLES.get(identifier); + assertThat(reference.get()).isNotNull(); + assertThat(reference.get().get("a")).isEqualTo("Test"); + assertThat(reference.get().get(ENRICHED_KEY)).isEqualTo(ENRICHED_VALUE); + + DropTableOperation dropOperation = + new DropTableOperation(identifier, ignoreIfExists, isTemporary); + tEnv.executeInternal(dropOperation); + assertThat(MANAGED_TABLES.get(identifier).get()).isNull(); + + // test ignore: drop again + if (ignoreIfExists) { + tEnv.executeInternal(dropOperation); + } else { + assertThatThrownBy(() -> tEnv.executeInternal(dropOperation), "does not exist"); + } + MANAGED_TABLES.remove(identifier); } }
