This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 23d4675  [FLINK-17756][table] Drop table/view shouldn't take effect on 
each other
23d4675 is described below

commit 23d4675520e3edbd3171374ed7a81444965b3d96
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 27 09:51:23 2020 +0800

    [FLINK-17756][table] Drop table/view shouldn't take effect on each other
    
    
    This closes #12314
---
 .../table/api/internal/TableEnvironmentImpl.java   |  2 +-
 .../apache/flink/table/catalog/CatalogManager.java | 82 +++++++++++++++------
 .../table/planner/catalog/CatalogTableITCase.scala | 69 ++++++++++++++++++
 .../flink/table/api/internal/TableEnvImpl.scala    |  2 +-
 .../flink/table/catalog/CatalogTableITCase.scala   | 85 ++++++++++++++++++++--
 5 files changed, 207 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c10552b..6fc2e86 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -889,7 +889,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                                                
dropViewOperation.getViewIdentifier(),
                                                dropViewOperation.isIfExists());
                        } else {
-                               catalogManager.dropTable(
+                               catalogManager.dropView(
                                                
dropViewOperation.getViewIdentifier(),
                                                dropViewOperation.isIfExists());
                        }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 4b6632e..44dc6c3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -336,16 +336,12 @@ public final class CatalogManager {
         * @return table that the path points to.
         */
        public Optional<TableLookupResult> getTable(ObjectIdentifier 
objectIdentifier) {
-               try {
-                       CatalogBaseTable temporaryTable = 
temporaryTables.get(objectIdentifier);
-                       if (temporaryTable != null) {
-                               return 
Optional.of(TableLookupResult.temporary(temporaryTable));
-                       } else {
-                               return getPermanentTable(objectIdentifier);
-                       }
-               } catch (TableNotExistException ignored) {
+               CatalogBaseTable temporaryTable = 
temporaryTables.get(objectIdentifier);
+               if (temporaryTable != null) {
+                       return 
Optional.of(TableLookupResult.temporary(temporaryTable));
+               } else {
+                       return getPermanentTable(objectIdentifier);
                }
-               return Optional.empty();
        }
 
        /**
@@ -367,13 +363,15 @@ public final class CatalogManager {
                return Optional.empty();
        }
 
-       private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier 
objectIdentifier)
-                       throws TableNotExistException {
+       private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier 
objectIdentifier) {
                Catalog currentCatalog = 
catalogs.get(objectIdentifier.getCatalogName());
                ObjectPath objectPath = objectIdentifier.toObjectPath();
-
-               if (currentCatalog != null && 
currentCatalog.tableExists(objectPath)) {
-                       return 
Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+               if (currentCatalog != null) {
+                       try {
+                               return 
Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+                       } catch (TableNotExistException e) {
+                               // Ignore.
+                       }
                }
                return Optional.empty();
        }
@@ -682,20 +680,58 @@ public final class CatalogManager {
         * Drops a table in a given fully qualified path.
         *
         * @param objectIdentifier The fully qualified path of the table to 
drop.
-        * @param ignoreIfNotExists If false exception will be thrown if the 
table or database or catalog to be altered
+        * @param ignoreIfNotExists If false exception will be thrown if the 
table to drop
         *                          does not exist.
         */
        public void dropTable(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
-               if (temporaryTables.containsKey(objectIdentifier)) {
+               dropTableInternal(
+                               objectIdentifier,
+                               ignoreIfNotExists,
+                               true);
+       }
+
+       /**
+        * Drops a view in a given fully qualified path.
+        *
+        * @param objectIdentifier The fully qualified path of the view to drop.
+        * @param ignoreIfNotExists If false exception will be thrown if the 
view to drop
+        *                          does not exist.
+        */
+       public void dropView(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
+               dropTableInternal(
+                               objectIdentifier,
+                               ignoreIfNotExists,
+                               false);
+       }
+
+       private void dropTableInternal(
+                       ObjectIdentifier objectIdentifier,
+                       boolean ignoreIfNotExists,
+                       boolean isDropTable) {
+               Predicate<CatalogBaseTable> filter = isDropTable
+                               ? table -> table instanceof CatalogTable
+                               : table -> table instanceof CatalogView;
+               // Same name temporary table or view exists.
+               if (filter.test(temporaryTables.get(objectIdentifier))) {
+                       String tableOrView = isDropTable ? "table" : "view";
                        throw new ValidationException(String.format(
-                               "Temporary table with identifier '%s' exists. 
Drop it first before removing the permanent table.",
-                               objectIdentifier));
+                                       "Temporary %s with identifier '%s' 
exists. "
+                                                       + "Drop it first before 
removing the permanent %s.",
+                                       tableOrView, objectIdentifier, 
tableOrView));
+               }
+               final Optional<TableLookupResult> resultOpt = 
getPermanentTable(objectIdentifier);
+               if (resultOpt.isPresent() && 
filter.test(resultOpt.get().getTable())) {
+                       execute(
+                                       (catalog, path) -> 
catalog.dropTable(path, ignoreIfNotExists),
+                                       objectIdentifier,
+                                       ignoreIfNotExists,
+                                       "DropTable");
+               } else if (!ignoreIfNotExists) {
+                       String tableOrView = isDropTable ? "Table" : "View";
+                       throw new ValidationException(String.format(
+                                       "%s with identifier '%s' does not 
exist.",
+                                       tableOrView, 
objectIdentifier.asSummaryString()));
                }
-               execute(
-                       (catalog, path) -> catalog.dropTable(path, 
ignoreIfNotExists),
-                       objectIdentifier,
-                       ignoreIfNotExists,
-                       "DropTable");
        }
 
        /**
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 45db348..ae721a9 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -880,6 +880,75 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends 
AbstractTestBase {
   }
 
   @Test
+  def testDropTableSameNameWithTemporaryTable(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val createTable2 =
+      """
+        |create temporary table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+    tableEnv.executeSql(createTable2)
+
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("Temporary table with identifier "
+      + "'`default_catalog`.`default_database`.`t1`' exists. "
+      + "Drop it first before removing the permanent table.")
+    tableEnv.executeSql("drop table t1")
+  }
+
+  @Test
+  def testDropViewSameNameWithTable(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("View with identifier "
+      + "'default_catalog.default_database.t1' does not exist.")
+    tableEnv.executeSql("drop view t1")
+  }
+
+  @Test
+  def testDropViewSameNameWithTableIfNotExists(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+    tableEnv.executeSql("drop view if exists t1")
+    assert(tableEnv.listTables().sameElements(Array("t1")))
+  }
+
+  @Test
   def testAlterTable(): Unit = {
     val ddl1 =
       """
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index b57f906..5d97ede 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -765,7 +765,7 @@ abstract class TableEnvImpl(
             dropViewOperation.getViewIdentifier,
             dropViewOperation.isIfExists)
         } else {
-          catalogManager.dropTable(
+          catalogManager.dropView(
             dropViewOperation.getViewIdentifier,
             dropViewOperation.isIfExists)
         }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index 23a6cd9..ce50b33 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -23,15 +23,16 @@ import 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, 
StreamTableEnvironment}
 import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, 
ValidationException}
 import org.apache.flink.table.factories.utils.TestCollectionTableFactory
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 
 import org.junit.Assert.{assertEquals, fail}
+import org.junit.rules.ExpectedException
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Ignore, Rule, Test}
 
 import java.util
-import org.apache.flink.test.util.AbstractTestBase
 
 import scala.collection.JavaConversions._
 
@@ -52,18 +53,17 @@ class CatalogTableITCase(isStreaming: Boolean) extends 
AbstractTestBase {
       toRow(3, "c")
   )
 
-  private val DIM_DATA = List(
-    toRow(1, "aDim"),
-    toRow(2, "bDim"),
-    toRow(3, "cDim")
-  )
-
   implicit def rowOrdering: Ordering[Row] = Ordering.by((r : Row) => {
     val builder = new StringBuilder
     0 until r.getArity foreach(idx => builder.append(r.getField(idx)))
     builder.toString()
   })
 
+  var _expectedEx: ExpectedException = ExpectedException.none
+
+  @Rule
+  def expectedEx: ExpectedException = _expectedEx
+
   @Before
   def before(): Unit = {
     batchExec.setParallelism(4)
@@ -538,6 +538,75 @@ class CatalogTableITCase(isStreaming: Boolean) extends 
AbstractTestBase {
   }
 
   @Test
+  def testDropTableSameNameWithTemporaryTable(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val createTable2 =
+      """
+        |create temporary table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+    tableEnv.executeSql(createTable2)
+
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("Temporary table with identifier "
+      + "'`default_catalog`.`default_database`.`t1`' exists. "
+      + "Drop it first before removing the permanent table.")
+    tableEnv.executeSql("drop table t1")
+  }
+
+  @Test
+  def testDropViewSameNameWithTable(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("View with identifier "
+      + "'default_catalog.default_database.t1' does not exist.")
+    tableEnv.executeSql("drop view t1")
+  }
+
+  @Test
+  def testDropViewSameNameWithTableIfNotExists(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+    tableEnv.executeSql("drop view if exists t1")
+    assert(tableEnv.listTables().sameElements(Array("t1")))
+  }
+
+  @Test
   def testAlterTable(): Unit = {
     val ddl1 =
       """

Reply via email to