This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f4409e717c [FLINK-32407][table] Notify catalog modification listener 
for table ddl in CatalogManager (#23041)
8f4409e717c is described below

commit 8f4409e717c0d9f820915214fb4e95f5c2d1fcd4
Author: Shammon FY <[email protected]>
AuthorDate: Sun Jul 23 22:37:54 2023 +0800

    [FLINK-32407][table] Notify catalog modification listener for table ddl in 
CatalogManager (#23041)
---
 .../apache/flink/table/catalog/CatalogManager.java |  97 ++++++++++-
 .../table/catalog/listener/AlterTableEvent.java    |  70 ++++++++
 .../table/catalog/listener/CatalogContext.java     |  10 +-
 .../table/catalog/listener/CreateTableEvent.java   |  63 +++++++
 .../table/catalog/listener/DropTableEvent.java     |  66 +++++++
 .../catalog/listener/TableModificationEvent.java   |  39 +++++
 .../flink/table/catalog/CatalogManagerTest.java    | 192 ++++++++++++++++++++-
 7 files changed, 527 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index f799b106624..fa07df13f40 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -36,10 +36,13 @@ import 
org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
+import org.apache.flink.table.catalog.listener.AlterTableEvent;
 import org.apache.flink.table.catalog.listener.CatalogContext;
 import org.apache.flink.table.catalog.listener.CatalogModificationListener;
 import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
+import org.apache.flink.table.catalog.listener.CreateTableEvent;
 import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
+import org.apache.flink.table.catalog.listener.DropTableEvent;
 import org.apache.flink.table.delegation.Planner;
 import 
org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
 import org.apache.flink.util.Preconditions;
@@ -810,6 +813,19 @@ public final class CatalogManager implements 
CatalogRegistry {
                                     ignoreIfExists);
 
                     catalog.createTable(path, resolvedListenedTable, 
ignoreIfExists);
+                    if (resolvedListenedTable instanceof CatalogTable) {
+                        catalogModificationListeners.forEach(
+                                listener ->
+                                        listener.onEvent(
+                                                CreateTableEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        resolvedListenedTable,
+                                                        ignoreIfExists,
+                                                        false)));
+                    }
                 },
                 objectIdentifier,
                 false,
@@ -840,9 +856,11 @@ public final class CatalogManager implements 
CatalogRegistry {
                         return v;
                     } else {
                         ResolvedCatalogBaseTable<?> resolvedTable = 
resolveCatalogBaseTable(table);
+                        Catalog catalog =
+                                
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
                         ResolvedCatalogBaseTable<?> resolvedListenedTable =
                                 managedTableListener.notifyTableCreation(
-                                        
getCatalog(objectIdentifier.getCatalogName()).orElse(null),
+                                        catalog,
                                         objectIdentifier,
                                         resolvedTable,
                                         true,
@@ -853,6 +871,20 @@ public final class CatalogManager implements 
CatalogRegistry {
                                     .onCreateTemporaryTable(
                                             objectIdentifier.toObjectPath(), 
resolvedListenedTable);
                         }
+                        if (resolvedListenedTable instanceof CatalogTable) {
+                            catalogModificationListeners.forEach(
+                                    l ->
+                                            l.onEvent(
+                                                    
CreateTableEvent.createEvent(
+                                                            
CatalogContext.createContext(
+                                                                    
objectIdentifier
+                                                                            
.getCatalogName(),
+                                                                    catalog),
+                                                            objectIdentifier,
+                                                            
resolvedListenedTable,
+                                                            ignoreIfExists,
+                                                            true)));
+                        }
                         return resolvedListenedTable;
                     }
                 });
@@ -887,7 +919,10 @@ public final class CatalogManager implements 
CatalogRegistry {
      */
     public void dropTemporaryTable(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
         dropTemporaryTableInternal(
-                objectIdentifier, (table) -> table instanceof CatalogTable, 
ignoreIfNotExists);
+                objectIdentifier,
+                (table) -> table instanceof CatalogTable,
+                ignoreIfNotExists,
+                true);
     }
 
     /**
@@ -899,13 +934,17 @@ public final class CatalogManager implements 
CatalogRegistry {
      */
     public void dropTemporaryView(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
         dropTemporaryTableInternal(
-                objectIdentifier, (table) -> table instanceof CatalogView, 
ignoreIfNotExists);
+                objectIdentifier,
+                (table) -> table instanceof CatalogView,
+                ignoreIfNotExists,
+                false);
     }
 
     private void dropTemporaryTableInternal(
             ObjectIdentifier objectIdentifier,
             Predicate<CatalogBaseTable> filter,
-            boolean ignoreIfNotExists) {
+            boolean ignoreIfNotExists,
+            boolean isDropTable) {
         CatalogBaseTable catalogBaseTable = 
temporaryTables.get(objectIdentifier);
         if (filter.test(catalogBaseTable)) {
             getTemporaryOperationListener(objectIdentifier)
@@ -917,6 +956,18 @@ public final class CatalogManager implements 
CatalogRegistry {
                     catalog, objectIdentifier, resolvedTable, true, 
ignoreIfNotExists);
 
             temporaryTables.remove(objectIdentifier);
+            if (isDropTable) {
+                catalogModificationListeners.forEach(
+                        listener ->
+                                listener.onEvent(
+                                        DropTableEvent.createEvent(
+                                                CatalogContext.createContext(
+                                                        
objectIdentifier.getCatalogName(), catalog),
+                                                objectIdentifier,
+                                                resolvedTable,
+                                                ignoreIfNotExists,
+                                                true)));
+            }
         } else if (!ignoreIfNotExists) {
             throw new ValidationException(
                     String.format(
@@ -949,6 +1000,18 @@ public final class CatalogManager implements 
CatalogRegistry {
                 (catalog, path) -> {
                     final CatalogBaseTable resolvedTable = 
resolveCatalogBaseTable(table);
                     catalog.alterTable(path, resolvedTable, ignoreIfNotExists);
+                    if (resolvedTable instanceof CatalogTable) {
+                        catalogModificationListeners.forEach(
+                                listener ->
+                                        listener.onEvent(
+                                                AlterTableEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        resolvedTable,
+                                                        ignoreIfNotExists)));
+                    }
                 },
                 objectIdentifier,
                 ignoreIfNotExists,
@@ -973,6 +1036,18 @@ public final class CatalogManager implements 
CatalogRegistry {
                 (catalog, path) -> {
                     final CatalogBaseTable resolvedTable = 
resolveCatalogBaseTable(table);
                     catalog.alterTable(path, resolvedTable, changes, 
ignoreIfNotExists);
+                    if (resolvedTable instanceof CatalogTable) {
+                        catalogModificationListeners.forEach(
+                                listener ->
+                                        listener.onEvent(
+                                                AlterTableEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        resolvedTable,
+                                                        ignoreIfNotExists)));
+                    }
                 },
                 objectIdentifier,
                 ignoreIfNotExists,
@@ -1026,6 +1101,20 @@ public final class CatalogManager implements 
CatalogRegistry {
                                 catalog, objectIdentifier, resolvedTable, 
false, ignoreIfNotExists);
 
                         catalog.dropTable(path, ignoreIfNotExists);
+                        if (isDropTable) {
+                            catalogModificationListeners.forEach(
+                                    listener ->
+                                            listener.onEvent(
+                                                    DropTableEvent.createEvent(
+                                                            
CatalogContext.createContext(
+                                                                    
objectIdentifier
+                                                                            
.getCatalogName(),
+                                                                    catalog),
+                                                            objectIdentifier,
+                                                            resolvedTable,
+                                                            ignoreIfNotExists,
+                                                            false)));
+                        }
                     },
                     objectIdentifier,
                     ignoreIfNotExists,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterTableEvent.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterTableEvent.java
new file mode 100644
index 00000000000..3b8b5858366
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterTableEvent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** When a table is altered, a {@link AlterTableEvent} event will be created 
and fired. */
+@PublicEvolving
+public interface AlterTableEvent extends TableModificationEvent {
+    CatalogBaseTable newTable();
+
+    boolean ignoreIfNotExists();
+
+    static AlterTableEvent createEvent(
+            final CatalogContext context,
+            final ObjectIdentifier identifier,
+            final CatalogBaseTable newTable,
+            final boolean ignoreIfNotExists) {
+        return new AlterTableEvent() {
+            @Override
+            public CatalogBaseTable newTable() {
+                return newTable;
+            }
+
+            @Override
+            public boolean ignoreIfNotExists() {
+                return ignoreIfNotExists;
+            }
+
+            @Override
+            public ObjectIdentifier identifier() {
+                return identifier;
+            }
+
+            @Override
+            public CatalogBaseTable table() {
+                throw new IllegalStateException(
+                        "There is no table in AlterTableEvent, use 
identifier() instead.");
+            }
+
+            @Override
+            public boolean isTemporary() {
+                return false;
+            }
+
+            @Override
+            public CatalogContext context() {
+                return context;
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
index a1c109ed976..1b0aa2a1c57 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.factories.Factory;
 
+import javax.annotation.Nullable;
+
 import java.util.Optional;
 
 /**
@@ -43,6 +45,7 @@ public interface CatalogContext {
     Optional<String> getFactoryIdentifier();
 
     /** Class of the catalog. */
+    @Nullable
     Class<? extends Catalog> getClazz();
 
     /** The catalog configuration. */
@@ -58,12 +61,15 @@ public interface CatalogContext {
 
             @Override
             public Optional<String> getFactoryIdentifier() {
-                return catalog.getFactory().map(Factory::factoryIdentifier);
+                return catalog == null
+                        ? Optional.empty()
+                        : catalog.getFactory().map(Factory::factoryIdentifier);
             }
 
             @Override
+            @Nullable
             public Class<? extends Catalog> getClazz() {
-                return catalog.getClass();
+                return catalog == null ? null : catalog.getClass();
             }
 
             /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateTableEvent.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateTableEvent.java
new file mode 100644
index 00000000000..0805e917d17
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateTableEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** When a table is created, a {@link CreateTableEvent} event will be created 
and fired. */
+@PublicEvolving
+public interface CreateTableEvent extends TableModificationEvent {
+    boolean ignoreIfExists();
+
+    static CreateTableEvent createEvent(
+            final CatalogContext context,
+            final ObjectIdentifier identifier,
+            final CatalogBaseTable table,
+            final boolean ignoreIfExists,
+            final boolean isTemporary) {
+        return new CreateTableEvent() {
+            @Override
+            public boolean ignoreIfExists() {
+                return ignoreIfExists;
+            }
+
+            @Override
+            public ObjectIdentifier identifier() {
+                return identifier;
+            }
+
+            @Override
+            public CatalogBaseTable table() {
+                return table;
+            }
+
+            @Override
+            public boolean isTemporary() {
+                return isTemporary;
+            }
+
+            @Override
+            public CatalogContext context() {
+                return context;
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropTableEvent.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropTableEvent.java
new file mode 100644
index 00000000000..807264226c4
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropTableEvent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import javax.annotation.Nullable;
+
+/** When a table is dropped, a {@link DropTableEvent} event will be created 
and fired. */
+@PublicEvolving
+public interface DropTableEvent extends TableModificationEvent {
+    boolean ignoreIfNotExists();
+
+    static DropTableEvent createEvent(
+            final CatalogContext context,
+            final ObjectIdentifier identifier,
+            @Nullable final CatalogBaseTable table,
+            final boolean ignoreIfNotExists,
+            final boolean isTemporary) {
+        return new DropTableEvent() {
+            @Override
+            public boolean ignoreIfNotExists() {
+                return ignoreIfNotExists;
+            }
+
+            @Override
+            public ObjectIdentifier identifier() {
+                return identifier;
+            }
+
+            @Override
+            @Nullable
+            public CatalogBaseTable table() {
+                return table;
+            }
+
+            @Override
+            public boolean isTemporary() {
+                return isTemporary;
+            }
+
+            @Override
+            public CatalogContext context() {
+                return context;
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/TableModificationEvent.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/TableModificationEvent.java
new file mode 100644
index 00000000000..87bb97d2422
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/TableModificationEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import javax.annotation.Nullable;
+
+/**
+ * Base table event, provides column list, primary keys, partition keys, 
watermarks and properties
+ * in CatalogBaseTable. The table can be a source or sink connector.
+ */
+@PublicEvolving
+public interface TableModificationEvent extends CatalogModificationEvent {
+    ObjectIdentifier identifier();
+
+    @Nullable
+    CatalogBaseTable table();
+
+    boolean isTemporary();
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index 5da0239cb08..bd9ffa8c85e 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -19,11 +19,16 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
+import org.apache.flink.table.catalog.listener.AlterTableEvent;
 import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
 import org.apache.flink.table.catalog.listener.CatalogModificationListener;
 import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
+import org.apache.flink.table.catalog.listener.CreateTableEvent;
 import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
+import org.apache.flink.table.catalog.listener.DropTableEvent;
+import org.apache.flink.table.utils.ExpressionResolverMocks;
 
 import org.junit.jupiter.api.Test;
 
@@ -43,7 +48,7 @@ class CatalogManagerTest {
         CompletableFuture<DropDatabaseEvent> dropFuture = new 
CompletableFuture<>();
         CatalogManager catalogManager =
                 createCatalogManager(
-                        new TestingCatalogModificationListener(
+                        new TestingDatabaseModificationListener(
                                 createFuture, alterFuture, dropFuture));
 
         // Validate create a database
@@ -97,6 +102,142 @@ class CatalogManagerTest {
         assertThat(dropDatabaseEvent.cascade()).isTrue();
     }
 
+    @Test
+    void testTableModificationListener() throws Exception {
+        CompletableFuture<CreateTableEvent> createFuture = new 
CompletableFuture<>();
+        CompletableFuture<CreateTableEvent> createTemporaryFuture = new 
CompletableFuture<>();
+        CompletableFuture<AlterTableEvent> alterFuture = new 
CompletableFuture<>();
+        CompletableFuture<DropTableEvent> dropFuture = new 
CompletableFuture<>();
+        CompletableFuture<DropTableEvent> dropTemporaryFuture = new 
CompletableFuture<>();
+        CatalogManager catalogManager =
+                CatalogManager.newBuilder()
+                        .defaultCatalog("default", new 
GenericInMemoryCatalog("default"))
+                        .classLoader(CatalogManagerTest.class.getClassLoader())
+                        .config(new Configuration())
+                        .catalogModificationListeners(
+                                Collections.singletonList(
+                                        new TestingTableModificationListener(
+                                                createFuture,
+                                                createTemporaryFuture,
+                                                alterFuture,
+                                                dropFuture,
+                                                dropTemporaryFuture)))
+                        .build();
+
+        catalogManager.initSchemaResolver(true, 
ExpressionResolverMocks.dummyResolver());
+        // Create a view
+        catalogManager.createTable(
+                CatalogView.of(Schema.newBuilder().build(), null, "", "", 
Collections.emptyMap()),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "view1"),
+                true);
+        assertThat(createFuture.isDone()).isFalse();
+
+        // Create a table
+        catalogManager.createTable(
+                CatalogTable.of(
+                        Schema.newBuilder().build(),
+                        null,
+                        Collections.emptyList(),
+                        Collections.emptyMap()),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "table1"),
+                true);
+        CreateTableEvent createEvent = createFuture.get(10, TimeUnit.SECONDS);
+        assertThat(createEvent.isTemporary()).isFalse();
+        
assertThat(createEvent.identifier().getObjectName()).isEqualTo("table1");
+        assertThat(createEvent.ignoreIfExists()).isTrue();
+
+        // Create a temporary table
+        catalogManager.createTemporaryTable(
+                CatalogTable.of(
+                        Schema.newBuilder().build(),
+                        null,
+                        Collections.emptyList(),
+                        Collections.emptyMap()),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "table2"),
+                false);
+        CreateTableEvent createTemporaryEvent = createTemporaryFuture.get(10, 
TimeUnit.SECONDS);
+        assertThat(createTemporaryEvent.isTemporary()).isTrue();
+        
assertThat(createTemporaryEvent.identifier().getObjectName()).isEqualTo("table2");
+        assertThat(createTemporaryEvent.ignoreIfExists()).isFalse();
+
+        // Alter a table
+        catalogManager.alterTable(
+                CatalogTable.of(
+                        Schema.newBuilder().build(),
+                        "table1 comment",
+                        Collections.emptyList(),
+                        Collections.emptyMap()),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "table1"),
+                false);
+        AlterTableEvent alterEvent = alterFuture.get(10, TimeUnit.SECONDS);
+        assertThat(alterEvent.isTemporary()).isFalse();
+        
assertThat(alterEvent.identifier().getObjectName()).isEqualTo("table1");
+        assertThat(alterEvent.newTable().getComment()).isEqualTo("table1 
comment");
+        assertThat(alterEvent.ignoreIfNotExists()).isFalse();
+
+        // Drop a view
+        catalogManager.dropView(
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "table1"),
+                true);
+        assertThat(dropFuture.isDone()).isFalse();
+
+        // Drop a table
+        catalogManager.dropTable(
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "table1"),
+                true);
+        DropTableEvent dropEvent = dropFuture.get(10, TimeUnit.SECONDS);
+        assertThat(dropEvent.isTemporary()).isFalse();
+        assertThat(dropEvent.ignoreIfNotExists()).isTrue();
+        assertThat(dropEvent.identifier().getObjectName()).isEqualTo("table1");
+
+        // Create a temporary view with the same table name `table2`
+        catalogManager.createTemporaryTable(
+                CatalogView.of(Schema.newBuilder().build(), null, "", "", 
Collections.emptyMap()),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "view2"),
+                false);
+        // Drop a temporary view
+        catalogManager.dropTemporaryView(
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "view2"),
+                true);
+        assertThat(dropTemporaryFuture.isDone()).isFalse();
+
+        // Drop a temporary table
+        catalogManager.dropTemporaryTable(
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "table2"),
+                false);
+        DropTableEvent dropTemporaryEvent = dropTemporaryFuture.get(10, 
TimeUnit.SECONDS);
+        assertThat(dropTemporaryEvent.isTemporary()).isTrue();
+        assertThat(dropTemporaryEvent.ignoreIfNotExists()).isFalse();
+        
assertThat(dropTemporaryEvent.identifier().getObjectName()).isEqualTo("table2");
+    }
+
     private CatalogManager createCatalogManager(CatalogModificationListener 
listener) {
         return CatalogManager.newBuilder()
                 .classLoader(CatalogManagerTest.class.getClassLoader())
@@ -106,13 +247,13 @@ class CatalogManagerTest {
                 .build();
     }
 
-    /** Testing catalog modification listener. */
-    static class TestingCatalogModificationListener implements 
CatalogModificationListener {
+    /** Testing database modification listener. */
+    static class TestingDatabaseModificationListener implements 
CatalogModificationListener {
         private final CompletableFuture<CreateDatabaseEvent> createFuture;
         private final CompletableFuture<AlterDatabaseEvent> alterFuture;
         private final CompletableFuture<DropDatabaseEvent> dropFuture;
 
-        TestingCatalogModificationListener(
+        TestingDatabaseModificationListener(
                 CompletableFuture<CreateDatabaseEvent> createFuture,
                 CompletableFuture<AlterDatabaseEvent> alterFuture,
                 CompletableFuture<DropDatabaseEvent> dropFuture) {
@@ -134,4 +275,47 @@ class CatalogManagerTest {
             }
         }
     }
+
+    /** Testing table modification listener. */
+    static class TestingTableModificationListener implements 
CatalogModificationListener {
+        private final CompletableFuture<CreateTableEvent> createFuture;
+        private final CompletableFuture<CreateTableEvent> 
createTemporaryFuture;
+        private final CompletableFuture<AlterTableEvent> alterFuture;
+        private final CompletableFuture<DropTableEvent> dropFuture;
+        private final CompletableFuture<DropTableEvent> dropTemporaryFuture;
+
+        TestingTableModificationListener(
+                CompletableFuture<CreateTableEvent> createFuture,
+                CompletableFuture<CreateTableEvent> createTemporaryFuture,
+                CompletableFuture<AlterTableEvent> alterFuture,
+                CompletableFuture<DropTableEvent> dropFuture,
+                CompletableFuture<DropTableEvent> dropTemporaryFuture) {
+            this.createFuture = createFuture;
+            this.createTemporaryFuture = createTemporaryFuture;
+            this.alterFuture = alterFuture;
+            this.dropFuture = dropFuture;
+            this.dropTemporaryFuture = dropTemporaryFuture;
+        }
+
+        @Override
+        public void onEvent(CatalogModificationEvent event) {
+            if (event instanceof CreateTableEvent) {
+                if (((CreateTableEvent) event).isTemporary()) {
+                    createTemporaryFuture.complete((CreateTableEvent) event);
+                } else {
+                    createFuture.complete((CreateTableEvent) event);
+                }
+            } else if (event instanceof AlterTableEvent) {
+                alterFuture.complete((AlterTableEvent) event);
+            } else if (event instanceof DropTableEvent) {
+                if (((DropTableEvent) event).isTemporary()) {
+                    dropTemporaryFuture.complete((DropTableEvent) event);
+                } else {
+                    dropFuture.complete((DropTableEvent) event);
+                }
+            } else {
+                throw new UnsupportedOperationException();
+            }
+        }
+    }
 }


Reply via email to