This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8f4409e717c [FLINK-32407][table] Notify catalog modification listener
for table ddl in CatalogManager (#23041)
8f4409e717c is described below
commit 8f4409e717c0d9f820915214fb4e95f5c2d1fcd4
Author: Shammon FY <[email protected]>
AuthorDate: Sun Jul 23 22:37:54 2023 +0800
[FLINK-32407][table] Notify catalog modification listener for table ddl in
CatalogManager (#23041)
---
.../apache/flink/table/catalog/CatalogManager.java | 97 ++++++++++-
.../table/catalog/listener/AlterTableEvent.java | 70 ++++++++
.../table/catalog/listener/CatalogContext.java | 10 +-
.../table/catalog/listener/CreateTableEvent.java | 63 +++++++
.../table/catalog/listener/DropTableEvent.java | 66 +++++++
.../catalog/listener/TableModificationEvent.java | 39 +++++
.../flink/table/catalog/CatalogManagerTest.java | 192 ++++++++++++++++++++-
7 files changed, 527 insertions(+), 10 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index f799b106624..fa07df13f40 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -36,10 +36,13 @@ import
org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
+import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogContext;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
+import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
+import org.apache.flink.table.catalog.listener.DropTableEvent;
import org.apache.flink.table.delegation.Planner;
import
org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
import org.apache.flink.util.Preconditions;
@@ -810,6 +813,19 @@ public final class CatalogManager implements
CatalogRegistry {
ignoreIfExists);
catalog.createTable(path, resolvedListenedTable,
ignoreIfExists);
+ if (resolvedListenedTable instanceof CatalogTable) {
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ CreateTableEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedListenedTable,
+ ignoreIfExists,
+ false)));
+ }
},
objectIdentifier,
false,
@@ -840,9 +856,11 @@ public final class CatalogManager implements
CatalogRegistry {
return v;
} else {
ResolvedCatalogBaseTable<?> resolvedTable =
resolveCatalogBaseTable(table);
+ Catalog catalog =
+
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
ResolvedCatalogBaseTable<?> resolvedListenedTable =
managedTableListener.notifyTableCreation(
-
getCatalog(objectIdentifier.getCatalogName()).orElse(null),
+ catalog,
objectIdentifier,
resolvedTable,
true,
@@ -853,6 +871,20 @@ public final class CatalogManager implements
CatalogRegistry {
.onCreateTemporaryTable(
objectIdentifier.toObjectPath(),
resolvedListenedTable);
}
+ if (resolvedListenedTable instanceof CatalogTable) {
+ catalogModificationListeners.forEach(
+ l ->
+ l.onEvent(
+
CreateTableEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier
+
.getCatalogName(),
+ catalog),
+ objectIdentifier,
+
resolvedListenedTable,
+ ignoreIfExists,
+ true)));
+ }
return resolvedListenedTable;
}
});
@@ -887,7 +919,10 @@ public final class CatalogManager implements
CatalogRegistry {
*/
public void dropTemporaryTable(ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
dropTemporaryTableInternal(
- objectIdentifier, (table) -> table instanceof CatalogTable,
ignoreIfNotExists);
+ objectIdentifier,
+ (table) -> table instanceof CatalogTable,
+ ignoreIfNotExists,
+ true);
}
/**
@@ -899,13 +934,17 @@ public final class CatalogManager implements
CatalogRegistry {
*/
public void dropTemporaryView(ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
dropTemporaryTableInternal(
- objectIdentifier, (table) -> table instanceof CatalogView,
ignoreIfNotExists);
+ objectIdentifier,
+ (table) -> table instanceof CatalogView,
+ ignoreIfNotExists,
+ false);
}
private void dropTemporaryTableInternal(
ObjectIdentifier objectIdentifier,
Predicate<CatalogBaseTable> filter,
- boolean ignoreIfNotExists) {
+ boolean ignoreIfNotExists,
+ boolean isDropTable) {
CatalogBaseTable catalogBaseTable =
temporaryTables.get(objectIdentifier);
if (filter.test(catalogBaseTable)) {
getTemporaryOperationListener(objectIdentifier)
@@ -917,6 +956,18 @@ public final class CatalogManager implements
CatalogRegistry {
catalog, objectIdentifier, resolvedTable, true,
ignoreIfNotExists);
temporaryTables.remove(objectIdentifier);
+ if (isDropTable) {
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ DropTableEvent.createEvent(
+ CatalogContext.createContext(
+
objectIdentifier.getCatalogName(), catalog),
+ objectIdentifier,
+ resolvedTable,
+ ignoreIfNotExists,
+ true)));
+ }
} else if (!ignoreIfNotExists) {
throw new ValidationException(
String.format(
@@ -949,6 +1000,18 @@ public final class CatalogManager implements
CatalogRegistry {
(catalog, path) -> {
final CatalogBaseTable resolvedTable =
resolveCatalogBaseTable(table);
catalog.alterTable(path, resolvedTable, ignoreIfNotExists);
+ if (resolvedTable instanceof CatalogTable) {
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ AlterTableEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedTable,
+ ignoreIfNotExists)));
+ }
},
objectIdentifier,
ignoreIfNotExists,
@@ -973,6 +1036,18 @@ public final class CatalogManager implements
CatalogRegistry {
(catalog, path) -> {
final CatalogBaseTable resolvedTable =
resolveCatalogBaseTable(table);
catalog.alterTable(path, resolvedTable, changes,
ignoreIfNotExists);
+ if (resolvedTable instanceof CatalogTable) {
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ AlterTableEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedTable,
+ ignoreIfNotExists)));
+ }
},
objectIdentifier,
ignoreIfNotExists,
@@ -1026,6 +1101,20 @@ public final class CatalogManager implements
CatalogRegistry {
catalog, objectIdentifier, resolvedTable,
false, ignoreIfNotExists);
catalog.dropTable(path, ignoreIfNotExists);
+ if (isDropTable) {
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ DropTableEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier
+
.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedTable,
+ ignoreIfNotExists,
+ false)));
+ }
},
objectIdentifier,
ignoreIfNotExists,
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterTableEvent.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterTableEvent.java
new file mode 100644
index 00000000000..3b8b5858366
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterTableEvent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** When a table is altered, a {@link AlterTableEvent} event will be created
and fired. */
+@PublicEvolving
+public interface AlterTableEvent extends TableModificationEvent {
+ CatalogBaseTable newTable();
+
+ boolean ignoreIfNotExists();
+
+ static AlterTableEvent createEvent(
+ final CatalogContext context,
+ final ObjectIdentifier identifier,
+ final CatalogBaseTable newTable,
+ final boolean ignoreIfNotExists) {
+ return new AlterTableEvent() {
+ @Override
+ public CatalogBaseTable newTable() {
+ return newTable;
+ }
+
+ @Override
+ public boolean ignoreIfNotExists() {
+ return ignoreIfNotExists;
+ }
+
+ @Override
+ public ObjectIdentifier identifier() {
+ return identifier;
+ }
+
+ @Override
+ public CatalogBaseTable table() {
+ throw new IllegalStateException(
+ "There is no table in AlterTableEvent, use
identifier() instead.");
+ }
+
+ @Override
+ public boolean isTemporary() {
+ return false;
+ }
+
+ @Override
+ public CatalogContext context() {
+ return context;
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
index a1c109ed976..1b0aa2a1c57 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.Factory;
+import javax.annotation.Nullable;
+
import java.util.Optional;
/**
@@ -43,6 +45,7 @@ public interface CatalogContext {
Optional<String> getFactoryIdentifier();
/** Class of the catalog. */
+ @Nullable
Class<? extends Catalog> getClazz();
/** The catalog configuration. */
@@ -58,12 +61,15 @@ public interface CatalogContext {
@Override
public Optional<String> getFactoryIdentifier() {
- return catalog.getFactory().map(Factory::factoryIdentifier);
+ return catalog == null
+ ? Optional.empty()
+ : catalog.getFactory().map(Factory::factoryIdentifier);
}
@Override
+ @Nullable
public Class<? extends Catalog> getClazz() {
- return catalog.getClass();
+ return catalog == null ? null : catalog.getClass();
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateTableEvent.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateTableEvent.java
new file mode 100644
index 00000000000..0805e917d17
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateTableEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** When a table is created, a {@link CreateTableEvent} event will be created
and fired. */
+@PublicEvolving
+public interface CreateTableEvent extends TableModificationEvent {
+ boolean ignoreIfExists();
+
+ static CreateTableEvent createEvent(
+ final CatalogContext context,
+ final ObjectIdentifier identifier,
+ final CatalogBaseTable table,
+ final boolean ignoreIfExists,
+ final boolean isTemporary) {
+ return new CreateTableEvent() {
+ @Override
+ public boolean ignoreIfExists() {
+ return ignoreIfExists;
+ }
+
+ @Override
+ public ObjectIdentifier identifier() {
+ return identifier;
+ }
+
+ @Override
+ public CatalogBaseTable table() {
+ return table;
+ }
+
+ @Override
+ public boolean isTemporary() {
+ return isTemporary;
+ }
+
+ @Override
+ public CatalogContext context() {
+ return context;
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropTableEvent.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropTableEvent.java
new file mode 100644
index 00000000000..807264226c4
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropTableEvent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import javax.annotation.Nullable;
+
+/** When a table is dropped, a {@link DropTableEvent} event will be created
and fired. */
+@PublicEvolving
+public interface DropTableEvent extends TableModificationEvent {
+ boolean ignoreIfNotExists();
+
+ static DropTableEvent createEvent(
+ final CatalogContext context,
+ final ObjectIdentifier identifier,
+ @Nullable final CatalogBaseTable table,
+ final boolean ignoreIfNotExists,
+ final boolean isTemporary) {
+ return new DropTableEvent() {
+ @Override
+ public boolean ignoreIfNotExists() {
+ return ignoreIfNotExists;
+ }
+
+ @Override
+ public ObjectIdentifier identifier() {
+ return identifier;
+ }
+
+ @Override
+ @Nullable
+ public CatalogBaseTable table() {
+ return table;
+ }
+
+ @Override
+ public boolean isTemporary() {
+ return isTemporary;
+ }
+
+ @Override
+ public CatalogContext context() {
+ return context;
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/TableModificationEvent.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/TableModificationEvent.java
new file mode 100644
index 00000000000..87bb97d2422
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/TableModificationEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import javax.annotation.Nullable;
+
+/**
+ * Base table event, provides column list, primary keys, partition keys,
watermarks and properties
+ * in CatalogBaseTable. The table can be a source or sink connector.
+ */
+@PublicEvolving
+public interface TableModificationEvent extends CatalogModificationEvent {
+ ObjectIdentifier identifier();
+
+ @Nullable
+ CatalogBaseTable table();
+
+ boolean isTemporary();
+}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index 5da0239cb08..bd9ffa8c85e 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -19,11 +19,16 @@
package org.apache.flink.table.catalog;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
+import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
+import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
+import org.apache.flink.table.catalog.listener.DropTableEvent;
+import org.apache.flink.table.utils.ExpressionResolverMocks;
import org.junit.jupiter.api.Test;
@@ -43,7 +48,7 @@ class CatalogManagerTest {
CompletableFuture<DropDatabaseEvent> dropFuture = new
CompletableFuture<>();
CatalogManager catalogManager =
createCatalogManager(
- new TestingCatalogModificationListener(
+ new TestingDatabaseModificationListener(
createFuture, alterFuture, dropFuture));
// Validate create a database
@@ -97,6 +102,142 @@ class CatalogManagerTest {
assertThat(dropDatabaseEvent.cascade()).isTrue();
}
+ @Test
+ void testTableModificationListener() throws Exception {
+ CompletableFuture<CreateTableEvent> createFuture = new
CompletableFuture<>();
+ CompletableFuture<CreateTableEvent> createTemporaryFuture = new
CompletableFuture<>();
+ CompletableFuture<AlterTableEvent> alterFuture = new
CompletableFuture<>();
+ CompletableFuture<DropTableEvent> dropFuture = new
CompletableFuture<>();
+ CompletableFuture<DropTableEvent> dropTemporaryFuture = new
CompletableFuture<>();
+ CatalogManager catalogManager =
+ CatalogManager.newBuilder()
+ .defaultCatalog("default", new
GenericInMemoryCatalog("default"))
+ .classLoader(CatalogManagerTest.class.getClassLoader())
+ .config(new Configuration())
+ .catalogModificationListeners(
+ Collections.singletonList(
+ new TestingTableModificationListener(
+ createFuture,
+ createTemporaryFuture,
+ alterFuture,
+ dropFuture,
+ dropTemporaryFuture)))
+ .build();
+
+ catalogManager.initSchemaResolver(true,
ExpressionResolverMocks.dummyResolver());
+ // Create a view
+ catalogManager.createTable(
+ CatalogView.of(Schema.newBuilder().build(), null, "", "",
Collections.emptyMap()),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "view1"),
+ true);
+ assertThat(createFuture.isDone()).isFalse();
+
+ // Create a table
+ catalogManager.createTable(
+ CatalogTable.of(
+ Schema.newBuilder().build(),
+ null,
+ Collections.emptyList(),
+ Collections.emptyMap()),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "table1"),
+ true);
+ CreateTableEvent createEvent = createFuture.get(10, TimeUnit.SECONDS);
+ assertThat(createEvent.isTemporary()).isFalse();
+
assertThat(createEvent.identifier().getObjectName()).isEqualTo("table1");
+ assertThat(createEvent.ignoreIfExists()).isTrue();
+
+ // Create a temporary table
+ catalogManager.createTemporaryTable(
+ CatalogTable.of(
+ Schema.newBuilder().build(),
+ null,
+ Collections.emptyList(),
+ Collections.emptyMap()),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "table2"),
+ false);
+ CreateTableEvent createTemporaryEvent = createTemporaryFuture.get(10,
TimeUnit.SECONDS);
+ assertThat(createTemporaryEvent.isTemporary()).isTrue();
+
assertThat(createTemporaryEvent.identifier().getObjectName()).isEqualTo("table2");
+ assertThat(createTemporaryEvent.ignoreIfExists()).isFalse();
+
+ // Alter a table
+ catalogManager.alterTable(
+ CatalogTable.of(
+ Schema.newBuilder().build(),
+ "table1 comment",
+ Collections.emptyList(),
+ Collections.emptyMap()),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "table1"),
+ false);
+ AlterTableEvent alterEvent = alterFuture.get(10, TimeUnit.SECONDS);
+ assertThat(alterEvent.isTemporary()).isFalse();
+
assertThat(alterEvent.identifier().getObjectName()).isEqualTo("table1");
+ assertThat(alterEvent.newTable().getComment()).isEqualTo("table1
comment");
+ assertThat(alterEvent.ignoreIfNotExists()).isFalse();
+
+ // Drop a view
+ catalogManager.dropView(
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "table1"),
+ true);
+ assertThat(dropFuture.isDone()).isFalse();
+
+ // Drop a table
+ catalogManager.dropTable(
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "table1"),
+ true);
+ DropTableEvent dropEvent = dropFuture.get(10, TimeUnit.SECONDS);
+ assertThat(dropEvent.isTemporary()).isFalse();
+ assertThat(dropEvent.ignoreIfNotExists()).isTrue();
+ assertThat(dropEvent.identifier().getObjectName()).isEqualTo("table1");
+
+ // Create a temporary view with the same table name `table2`
+ catalogManager.createTemporaryTable(
+ CatalogView.of(Schema.newBuilder().build(), null, "", "",
Collections.emptyMap()),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "view2"),
+ false);
+ // Drop a temporary view
+ catalogManager.dropTemporaryView(
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "view2"),
+ true);
+ assertThat(dropTemporaryFuture.isDone()).isFalse();
+
+ // Drop a temporary table
+ catalogManager.dropTemporaryTable(
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "table2"),
+ false);
+ DropTableEvent dropTemporaryEvent = dropTemporaryFuture.get(10,
TimeUnit.SECONDS);
+ assertThat(dropTemporaryEvent.isTemporary()).isTrue();
+ assertThat(dropTemporaryEvent.ignoreIfNotExists()).isFalse();
+
assertThat(dropTemporaryEvent.identifier().getObjectName()).isEqualTo("table2");
+ }
+
private CatalogManager createCatalogManager(CatalogModificationListener
listener) {
return CatalogManager.newBuilder()
.classLoader(CatalogManagerTest.class.getClassLoader())
@@ -106,13 +247,13 @@ class CatalogManagerTest {
.build();
}
- /** Testing catalog modification listener. */
- static class TestingCatalogModificationListener implements
CatalogModificationListener {
+ /** Testing database modification listener. */
+ static class TestingDatabaseModificationListener implements
CatalogModificationListener {
private final CompletableFuture<CreateDatabaseEvent> createFuture;
private final CompletableFuture<AlterDatabaseEvent> alterFuture;
private final CompletableFuture<DropDatabaseEvent> dropFuture;
- TestingCatalogModificationListener(
+ TestingDatabaseModificationListener(
CompletableFuture<CreateDatabaseEvent> createFuture,
CompletableFuture<AlterDatabaseEvent> alterFuture,
CompletableFuture<DropDatabaseEvent> dropFuture) {
@@ -134,4 +275,47 @@ class CatalogManagerTest {
}
}
}
+
+ /** Testing table modification listener. */
+ static class TestingTableModificationListener implements
CatalogModificationListener {
+ private final CompletableFuture<CreateTableEvent> createFuture;
+ private final CompletableFuture<CreateTableEvent>
createTemporaryFuture;
+ private final CompletableFuture<AlterTableEvent> alterFuture;
+ private final CompletableFuture<DropTableEvent> dropFuture;
+ private final CompletableFuture<DropTableEvent> dropTemporaryFuture;
+
+ TestingTableModificationListener(
+ CompletableFuture<CreateTableEvent> createFuture,
+ CompletableFuture<CreateTableEvent> createTemporaryFuture,
+ CompletableFuture<AlterTableEvent> alterFuture,
+ CompletableFuture<DropTableEvent> dropFuture,
+ CompletableFuture<DropTableEvent> dropTemporaryFuture) {
+ this.createFuture = createFuture;
+ this.createTemporaryFuture = createTemporaryFuture;
+ this.alterFuture = alterFuture;
+ this.dropFuture = dropFuture;
+ this.dropTemporaryFuture = dropTemporaryFuture;
+ }
+
+ @Override
+ public void onEvent(CatalogModificationEvent event) {
+ if (event instanceof CreateTableEvent) {
+ if (((CreateTableEvent) event).isTemporary()) {
+ createTemporaryFuture.complete((CreateTableEvent) event);
+ } else {
+ createFuture.complete((CreateTableEvent) event);
+ }
+ } else if (event instanceof AlterTableEvent) {
+ alterFuture.complete((AlterTableEvent) event);
+ } else if (event instanceof DropTableEvent) {
+ if (((DropTableEvent) event).isTemporary()) {
+ dropTemporaryFuture.complete((DropTableEvent) event);
+ } else {
+ dropFuture.complete((DropTableEvent) event);
+ }
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
}