wuchong commented on a change in pull request #14802:
URL: https://github.com/apache/flink/pull/14802#discussion_r567620870
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
##########
@@ -693,6 +751,30 @@ class TableEnvironmentITCase(tableEnvName: String,
isStreaming: Boolean) extends
FileUtils.readFileUtf8(file).split("\n").toList
}
}
+
+ class ListenerCatalog(name: String)
+ extends GenericInMemoryCatalog(name) with TemporaryOperationListener {
+
+ var numTempTable = 0
+ var numTempFunc = 0
+
+ override def onCreateTemporaryTable(tablePath: ObjectPath, table:
CatalogBaseTable)
+ : CatalogBaseTable = {
+ numTempTable += 1
+ table
Review comment:
Could you slightly modify the CatalogBaseTable (e.g. add a property) to
verify the modified table is stored?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
##########
@@ -693,6 +751,30 @@ class TableEnvironmentITCase(tableEnvName: String,
isStreaming: Boolean) extends
FileUtils.readFileUtf8(file).split("\n").toList
}
}
+
+ class ListenerCatalog(name: String)
+ extends GenericInMemoryCatalog(name) with TemporaryOperationListener {
+
+ var numTempTable = 0
+ var numTempFunc = 0
+
+ override def onCreateTemporaryTable(tablePath: ObjectPath, table:
CatalogBaseTable)
+ : CatalogBaseTable = {
+ numTempTable += 1
+ table
+ }
+
+ override def onDropTemporaryTable(tablePath: ObjectPath): Unit =
numTempTable -= 1
+
+ override def onCreateTemporaryFunction(functionPath: ObjectPath, function:
CatalogFunction)
+ : CatalogFunction = {
+ numTempFunc += 1
+ function
Review comment:
The same to above.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
##########
@@ -440,7 +453,11 @@ public void dropTempCatalogFunction(ObjectIdentifier
identifier, boolean ignoreI
CatalogFunction fd = tempCatalogFunctions.remove(normalizedName);
- if (fd == null && !ignoreIfNotExist) {
+ if (fd != null) {
Review comment:
What's the difference between `dropTempCatalogFunction` and
`dropTemporaryCatalogFunction`? Should we unify them?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -697,6 +703,9 @@ private void dropTemporaryTableInternal(
CatalogBaseTable catalogBaseTable =
temporaryTables.get(objectIdentifier);
if (filter.test(catalogBaseTable)) {
temporaryTables.remove(objectIdentifier);
+ Optional<TemporaryOperationListener> listener =
+ getTemporaryOperationListener(objectIdentifier);
+ listener.ifPresent(l ->
l.onDropTemporaryTable(objectIdentifier.toObjectPath()));
Review comment:
I think we should move this action before
`temporaryTables.remove(objectIdentifier)`, otherwise, there might be data
inconsistent if `onDropTemporaryTable` throws exception. We also put
`onCreateTemporaryTable` before put into temporary map.
The same to drop temporary function.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -705,6 +714,16 @@ private void dropTemporaryTableInternal(
}
}
+ public Optional<TemporaryOperationListener> getTemporaryOperationListener(
Review comment:
Can be `protected`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]