Airblader commented on a change in pull request #18088: URL: https://github.com/apache/flink/pull/18088#discussion_r767538397
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Map; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MANAGED; +import static org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory; + +/** The listener for managed table operations. */ +public class ManagedTableListener { + + private final ClassLoader classLoader; + + private final ReadableConfig config; + + public ManagedTableListener(ClassLoader classLoader, ReadableConfig config) { + this.classLoader = classLoader; + this.config = config; + } + + /** Notify for creating managed table. */ + public ResolvedCatalogBaseTable<?> notifyTableCreation( + @Nullable Catalog catalog, + ObjectIdentifier identifier, + ResolvedCatalogBaseTable<?> table, + boolean isTemporary, + boolean ignoreIfExists) { + if (isNewlyManagedTable(catalog, table)) { + ResolvedCatalogTable managedTable = createManagedTable(identifier, table, isTemporary); + discoverManagedTableFactory(classLoader) + .onCreateTable( + createTableFactoryContext(identifier, managedTable, isTemporary), + ignoreIfExists); + return managedTable; + } + return table; + } + + /** Notify for dropping managed table. */ + public void notifyTableDrop( + ObjectIdentifier identifier, + ResolvedCatalogBaseTable<?> table, + boolean isTemporary, + boolean ignoreIfNotExists) { + if (table.getTableKind() == MANAGED) { + discoverManagedTableFactory(classLoader) + .onDropTable( + createTableFactoryContext( + identifier, (ResolvedCatalogTable) table, isTemporary), + ignoreIfNotExists); + } + } + + private boolean isNewlyManagedTable( + @Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) { + if (catalog == null || !catalog.supportsManagedTable()) { + // catalog not support managed table + return false; + } + if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) { + // view is not managed table + return false; + } + if (!StringUtils.isNullOrWhitespaceOnly( + table.getOptions().get(FactoryUtil.CONNECTOR.key()))) { + // with connector is not managed table + return false; + } + CatalogBaseTable origin = table.getOrigin(); + // only DefaultCatalogTable or CatalogTableImpl is managed table + return origin instanceof DefaultCatalogTable || origin instanceof CatalogTableImpl; Review comment: What's the motivation for this restriction? How does a catalog implementor learn about this? ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TestManagedTableFactory.java ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.factories.ManagedTableFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** A test {@link ManagedTableFactory}. */ +public class TestManagedTableFactory implements ManagedTableFactory { + + public static final String ENRICHED_KEY = "ENRICHED_KEY"; + + public static final String ENRICHED_VALUE = "ENRICHED_VALUE"; + + public static final Map<ObjectIdentifier, Map<String, String>> MANAGED_TABLES = + new ConcurrentHashMap<>(); + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return new HashSet<>(); + } + + @Override + public Map<String, String> enrichOptions(Context context) { + Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions()); + newOptions.put(ENRICHED_KEY, ENRICHED_VALUE); + return newOptions; + } + + @Override + public void onCreateTable(Context context, boolean ignoreIfExists) { + MANAGED_TABLES.compute( + context.getObjectIdentifier(), + (k, v) -> { + if (v == null) { + return context.getCatalogTable().toProperties(); + } else if (!ignoreIfExists) { + throw new TableException("Table exists."); + } else { + return v; + } + }); + } + + @Override + public void onDropTable(Context context, boolean ignoreIfNotExists) { + boolean remove = + MANAGED_TABLES.remove( + context.getObjectIdentifier(), context.getCatalogTable().toProperties()); + if (!remove && !ignoreIfNotExists) { + throw new TableException("Table not exists."); Review comment: ```suggestion throw new TableException("Table does not exist."); ``` ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java ########## @@ -110,4 +118,96 @@ public void testTableFromDescriptor() { assertEquals("fake", lookupResult.get().getTable().getOptions().get("connector")); } + + @Test + public void testCreateManagedTableFromDescriptor() { Review comment: I would prefer splitting this into separate test cases. If one case fails, we'll immediately know which one. Right now this is one giant test case that actually tests several cases, and several things per case. ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java ########## @@ -110,4 +118,96 @@ public void testTableFromDescriptor() { assertEquals("fake", lookupResult.get().getTable().getOptions().get("connector")); } + + @Test + public void testCreateManagedTableFromDescriptor() { + innerTestCreateManagedTableFromDescriptor(false, false); + innerTestCreateManagedTableFromDescriptor(true, false); + innerTestCreateManagedTableFromDescriptor(true, true); + innerTestCreateManagedTableFromDescriptor(false, true); + } + + private void innerTestCreateManagedTableFromDescriptor( + boolean ignoreIfExists, boolean isTemporary) { + final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + final String catalog = tEnv.getCurrentCatalog(); + final String database = tEnv.getCurrentDatabase(); + + final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); + final String tableName = UUID.randomUUID().toString(); + ObjectIdentifier identifier = ObjectIdentifier.of(catalog, database, tableName); + + // create table + + CreateTableOperation createOperation = + new CreateTableOperation( + identifier, + TableDescriptor.forManaged() + .schema(schema) + .option("a", "Test") + .build() + .toCatalogTable(), + ignoreIfExists, + isTemporary); + + tEnv.executeInternal(createOperation); + + // test ignore: create again + if (ignoreIfExists) { + tEnv.executeInternal(createOperation); + } else { + try { Review comment: This entire try-catch block could be written in a nicer way using AssertJ's `assertThatThrownBy`. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java ########## @@ -311,6 +311,14 @@ void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfE void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; + /** + * If return true, the Table without specified connector will be translated to the Flink managed + * table. See {@link CatalogBaseTable.TableKind#MANAGED} Review comment: Use `@see`? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java ########## @@ -39,16 +39,19 @@ private final @Nullable String comment; private final List<String> partitionKeys; private final Map<String, String> options; + private final TableKind tableKind; protected DefaultCatalogTable( Schema schema, @Nullable String comment, List<String> partitionKeys, - Map<String, String> options) { + Map<String, String> options, + TableKind tableKind) { this.schema = checkNotNull(schema, "Schema must not be null."); this.comment = comment; this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must not be null."); this.options = checkNotNull(options, "Options must not be null."); + this.tableKind = checkNotNull(tableKind, "Table kind must not be null."); Review comment: We should validate that it isn't `VIEW` either. In fact, would it maybe make sense to introduce a separate `DefaultManagedTable` (which maybe inherits from `DefaultCatalogTable`) instead of adding the table kind here? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Map; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MANAGED; +import static org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory; + +/** The listener for managed table operations. */ +public class ManagedTableListener { Review comment: Please add an API stability annotation here. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java ########## @@ -311,6 +311,14 @@ void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfE void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; + /** + * If return true, the Table without specified connector will be translated to the Flink managed + * table. See {@link CatalogBaseTable.TableKind#MANAGED} Review comment: ```suggestion * If true, tables which do not specify a connector will be translated to managed tables. ``` ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java ########## @@ -39,6 +39,7 @@ @PublicEvolving enum TableKind { TABLE, + MANAGED, Review comment: It'd be nice to add some JavaDoc for this value what a managed table is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
