Airblader commented on a change in pull request #18088:
URL: https://github.com/apache/flink/pull/18088#discussion_r767538397



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MANAGED;
+import static 
org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory;
+
+/** The listener for managed table operations. */
+public class ManagedTableListener {
+
+    private final ClassLoader classLoader;
+
+    private final ReadableConfig config;
+
+    public ManagedTableListener(ClassLoader classLoader, ReadableConfig 
config) {
+        this.classLoader = classLoader;
+        this.config = config;
+    }
+
+    /** Notify for creating managed table. */
+    public ResolvedCatalogBaseTable<?> notifyTableCreation(
+            @Nullable Catalog catalog,
+            ObjectIdentifier identifier,
+            ResolvedCatalogBaseTable<?> table,
+            boolean isTemporary,
+            boolean ignoreIfExists) {
+        if (isNewlyManagedTable(catalog, table)) {
+            ResolvedCatalogTable managedTable = createManagedTable(identifier, 
table, isTemporary);
+            discoverManagedTableFactory(classLoader)
+                    .onCreateTable(
+                            createTableFactoryContext(identifier, 
managedTable, isTemporary),
+                            ignoreIfExists);
+            return managedTable;
+        }
+        return table;
+    }
+
+    /** Notify for dropping managed table. */
+    public void notifyTableDrop(
+            ObjectIdentifier identifier,
+            ResolvedCatalogBaseTable<?> table,
+            boolean isTemporary,
+            boolean ignoreIfNotExists) {
+        if (table.getTableKind() == MANAGED) {
+            discoverManagedTableFactory(classLoader)
+                    .onDropTable(
+                            createTableFactoryContext(
+                                    identifier, (ResolvedCatalogTable) table, 
isTemporary),
+                            ignoreIfNotExists);
+        }
+    }
+
+    private boolean isNewlyManagedTable(
+            @Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
+        if (catalog == null || !catalog.supportsManagedTable()) {
+            // catalog not support managed table
+            return false;
+        }
+        if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            // view is not managed table
+            return false;
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(
+                table.getOptions().get(FactoryUtil.CONNECTOR.key()))) {
+            // with connector is not managed table
+            return false;
+        }
+        CatalogBaseTable origin = table.getOrigin();
+        // only DefaultCatalogTable or CatalogTableImpl is managed table
+        return origin instanceof DefaultCatalogTable || origin instanceof 
CatalogTableImpl;

Review comment:
       What's the motivation for this restriction? How does a catalog 
implementor learn about this?

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TestManagedTableFactory.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.ManagedTableFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** A test {@link ManagedTableFactory}. */
+public class TestManagedTableFactory implements ManagedTableFactory {
+
+    public static final String ENRICHED_KEY = "ENRICHED_KEY";
+
+    public static final String ENRICHED_VALUE = "ENRICHED_VALUE";
+
+    public static final Map<ObjectIdentifier, Map<String, String>> 
MANAGED_TABLES =
+            new ConcurrentHashMap<>();
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public Map<String, String> enrichOptions(Context context) {
+        Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        newOptions.put(ENRICHED_KEY, ENRICHED_VALUE);
+        return newOptions;
+    }
+
+    @Override
+    public void onCreateTable(Context context, boolean ignoreIfExists) {
+        MANAGED_TABLES.compute(
+                context.getObjectIdentifier(),
+                (k, v) -> {
+                    if (v == null) {
+                        return context.getCatalogTable().toProperties();
+                    } else if (!ignoreIfExists) {
+                        throw new TableException("Table exists.");
+                    } else {
+                        return v;
+                    }
+                });
+    }
+
+    @Override
+    public void onDropTable(Context context, boolean ignoreIfNotExists) {
+        boolean remove =
+                MANAGED_TABLES.remove(
+                        context.getObjectIdentifier(), 
context.getCatalogTable().toProperties());
+        if (!remove && !ignoreIfNotExists) {
+            throw new TableException("Table not exists.");

Review comment:
       ```suggestion
               throw new TableException("Table does not exist.");
   ```

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
##########
@@ -110,4 +118,96 @@ public void testTableFromDescriptor() {
 
         assertEquals("fake", 
lookupResult.get().getTable().getOptions().get("connector"));
     }
+
+    @Test
+    public void testCreateManagedTableFromDescriptor() {

Review comment:
       I would prefer splitting this into separate test cases. If one case 
fails, we'll immediately know which one. Right now this is one giant test case 
that actually tests several cases, and several things per case.

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
##########
@@ -110,4 +118,96 @@ public void testTableFromDescriptor() {
 
         assertEquals("fake", 
lookupResult.get().getTable().getOptions().get("connector"));
     }
+
+    @Test
+    public void testCreateManagedTableFromDescriptor() {
+        innerTestCreateManagedTableFromDescriptor(false, false);
+        innerTestCreateManagedTableFromDescriptor(true, false);
+        innerTestCreateManagedTableFromDescriptor(true, true);
+        innerTestCreateManagedTableFromDescriptor(false, true);
+    }
+
+    private void innerTestCreateManagedTableFromDescriptor(
+            boolean ignoreIfExists, boolean isTemporary) {
+        final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+        final String catalog = tEnv.getCurrentCatalog();
+        final String database = tEnv.getCurrentDatabase();
+
+        final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
+        final String tableName = UUID.randomUUID().toString();
+        ObjectIdentifier identifier = ObjectIdentifier.of(catalog, database, 
tableName);
+
+        // create table
+
+        CreateTableOperation createOperation =
+                new CreateTableOperation(
+                        identifier,
+                        TableDescriptor.forManaged()
+                                .schema(schema)
+                                .option("a", "Test")
+                                .build()
+                                .toCatalogTable(),
+                        ignoreIfExists,
+                        isTemporary);
+
+        tEnv.executeInternal(createOperation);
+
+        // test ignore: create again
+        if (ignoreIfExists) {
+            tEnv.executeInternal(createOperation);
+        } else {
+            try {

Review comment:
       This entire try-catch block could be written in a nicer way using 
AssertJ's `assertThatThrownBy`.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
##########
@@ -311,6 +311,14 @@ void createTable(ObjectPath tablePath, CatalogBaseTable 
table, boolean ignoreIfE
     void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, CatalogException;
 
+    /**
+     * If return true, the Table without specified connector will be 
translated to the Flink managed
+     * table. See {@link CatalogBaseTable.TableKind#MANAGED}

Review comment:
       Use `@see`?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
##########
@@ -39,16 +39,19 @@
     private final @Nullable String comment;
     private final List<String> partitionKeys;
     private final Map<String, String> options;
+    private final TableKind tableKind;
 
     protected DefaultCatalogTable(
             Schema schema,
             @Nullable String comment,
             List<String> partitionKeys,
-            Map<String, String> options) {
+            Map<String, String> options,
+            TableKind tableKind) {
         this.schema = checkNotNull(schema, "Schema must not be null.");
         this.comment = comment;
         this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must 
not be null.");
         this.options = checkNotNull(options, "Options must not be null.");
+        this.tableKind = checkNotNull(tableKind, "Table kind must not be 
null.");

Review comment:
       We should validate that it isn't `VIEW` either. In fact, would it maybe 
make sense to introduce a separate `DefaultManagedTable` (which maybe inherits 
from `DefaultCatalogTable`) instead of adding the table kind here?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MANAGED;
+import static 
org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory;
+
+/** The listener for managed table operations. */
+public class ManagedTableListener {

Review comment:
       Please add an API stability annotation here.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
##########
@@ -311,6 +311,14 @@ void createTable(ObjectPath tablePath, CatalogBaseTable 
table, boolean ignoreIfE
     void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, CatalogException;
 
+    /**
+     * If return true, the Table without specified connector will be 
translated to the Flink managed
+     * table. See {@link CatalogBaseTable.TableKind#MANAGED}

Review comment:
       ```suggestion
        * If true, tables which do not specify a connector will be translated 
to managed tables.
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
##########
@@ -39,6 +39,7 @@
     @PublicEvolving
     enum TableKind {
         TABLE,
+        MANAGED,

Review comment:
       It'd be nice to add some JavaDoc for this value what a managed table is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to