This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 3c75ca773a6cb4f337adde7705b2b822ef8ff7ad Author: Zouxxyy <[email protected]> AuthorDate: Mon May 13 11:56:46 2024 +0800 [spark] Fix drop database cascade when database is not empy with SparkGenericCatalog (#3324) --- .../apache/paimon/spark/SparkGenericCatalog.java | 5 ++++ .../apache/paimon/spark/PaimonHiveTestBase.scala | 10 ++++---- .../apache/paimon/spark/PaimonSparkTestBase.scala | 8 +++++-- .../spark/sql/DDLWithHiveCatalogTestBase.scala | 23 ++++++++++++++---- .../paimon/spark/sql/SparkVersionSupport.scala | 27 ++++++++++++++++++++++ 5 files changed, 63 insertions(+), 10 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index cf8ee38ab..f8a6b203c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -132,6 +132,11 @@ public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExte @Override public boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException { + if (namespace.length == 1 && namespaceExists(namespace) && cascade) { + for (Identifier table : listTables(namespace)) { + dropTable(table); + } + } return asNamespaceCatalog().dropNamespace(namespace, cascade); } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index a87502b22..842147615 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -34,6 +34,8 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { protected lazy val testHiveMetastore: TestHiveMetastore = new TestHiveMetastore + protected val sparkCatalogName: String = "spark_catalog" + protected val paimonHiveCatalogName: String = "paimon_hive" protected val hiveDbName: String = "test_hive" @@ -46,7 +48,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { super.sparkConf .set("spark.sql.warehouse.dir", tempHiveDBDir.getCanonicalPath) .set("spark.sql.catalogImplementation", "hive") - .set("spark.sql.catalog.spark_catalog", "org.apache.paimon.spark.SparkGenericCatalog") + .set(s"spark.sql.catalog.$sparkCatalogName", "org.apache.paimon.spark.SparkGenericCatalog") .set(s"spark.sql.catalog.$paimonHiveCatalogName", classOf[SparkCatalog].getName) .set(s"spark.sql.catalog.$paimonHiveCatalogName.metastore", "hive") .set(s"spark.sql.catalog.$paimonHiveCatalogName.warehouse", tempHiveDBDir.getCanonicalPath) @@ -56,13 +58,13 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { override protected def beforeAll(): Unit = { testHiveMetastore.start(hivePort) super.beforeAll() - spark.sql(s"USE spark_catalog") + spark.sql(s"USE $sparkCatalogName") spark.sql(s"CREATE DATABASE IF NOT EXISTS $hiveDbName") } override protected def afterAll(): Unit = { try { - spark.sql(s"USE spark_catalog") + spark.sql(s"USE $sparkCatalogName") spark.sql("USE default") spark.sql(s"DROP DATABASE $hiveDbName CASCADE") } finally { @@ -73,7 +75,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { /** Default is spark_catalog */ override protected def beforeEach(): Unit = { - spark.sql(s"USE spark_catalog") + spark.sql(s"USE $sparkCatalogName") spark.sql(s"USE $hiveDbName") } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 0e79d1080..6ccf08016 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -22,7 +22,7 @@ import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, Ident import org.apache.paimon.options.Options import org.apache.paimon.spark.catalog.Catalogs import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions -import org.apache.paimon.spark.sql.WithTableOptions +import org.apache.paimon.spark.sql.{SparkVersionSupport, WithTableOptions} import org.apache.paimon.table.FileStoreTable import org.apache.spark.SparkConf @@ -39,7 +39,11 @@ import java.io.File import scala.util.Random -class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTableOptions { +class PaimonSparkTestBase + extends QueryTest + with SharedSparkSession + with WithTableOptions + with SparkVersionSupport { protected lazy val tempDBDir: File = Utils.createTempDir diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index a4ef0a55d..c03f61bf6 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { test("Paimon DDL with hive catalog: create database with location and comment") { - Seq("spark_catalog", paimonHiveCatalogName).foreach { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { catalogName => spark.sql(s"USE $catalogName") withTempDir { @@ -54,7 +54,7 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } test("Paimon DDL with hive catalog: create database with props") { - Seq("spark_catalog", paimonHiveCatalogName).foreach { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { catalogName => spark.sql(s"USE $catalogName") withDatabase("paimon_db") { @@ -69,7 +69,7 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { test("Paimon DDL with hive catalog: set default database") { var reusedSpark = spark - Seq("paimon", "spark_catalog", paimonHiveCatalogName).foreach { + Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach { catalogName => { val dbName = s"${catalogName}_default_db" @@ -89,7 +89,7 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { .config(s"spark.sql.catalog.$catalogName.defaultDatabase", dbName) .getOrCreate() - if (catalogName.equals("spark_catalog") && !supportDefaultDatabaseWithSessionCatalog) { + if (catalogName.equals(sparkCatalogName) && !supportDefaultDatabaseWithSessionCatalog) { checkAnswer(reusedSpark.sql("show tables").select("tableName"), Nil) reusedSpark.sql(s"use $dbName") } @@ -102,6 +102,21 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { reusedSpark.stop() } + test("Paimon DDL with hive catalog: drop database cascade which contains paimon table") { + // Spark supports DROP DATABASE CASCADE since 3.3 + if (gteqSpark3_3) { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + spark.sql(s"CREATE DATABASE paimon_db") + spark.sql(s"USE paimon_db") + spark.sql(s"CREATE TABLE paimon_tbl (id int, name string, dt string) using paimon") + spark.sql(s"USE default") + spark.sql(s"DROP DATABASE paimon_db CASCADE") + } + } + } + def supportDefaultDatabaseWithSessionCatalog = true def getDatabaseLocation(dbName: String): String = { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala new file mode 100644 index 000000000..c8f5711a6 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.spark.SPARK_VERSION + +trait SparkVersionSupport { + lazy val sparkVersion: String = SPARK_VERSION + + lazy val gteqSpark3_3: Boolean = sparkVersion >= "3.3" +}
