This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6093a78 [SPARK-34558][SQL] warehouse path should be qualified ahead of populating and use 6093a78 is described below commit 6093a78dbd310209f574567a50e5e216021e6ae8 Author: Kent Yao <y...@apache.org> AuthorDate: Tue Mar 2 15:14:19 2021 +0000 [SPARK-34558][SQL] warehouse path should be qualified ahead of populating and use ### What changes were proposed in this pull request? Currently, the warehouse path gets fully qualified in the caller side for creating a database, table, partition, etc. An unqualified path is populated into Spark and Hadoop confs, which leads to inconsistent API behaviors. We should make it qualified ahead. When the value is a relative path `spark.sql.warehouse.dir=lakehouse`, some behaviors become inconsistent, for example. If the default database is absent at runtime, the app fails with ```java Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./lakehouse at org.apache.hadoop.fs.Path.initialize(Path.java:263) at org.apache.hadoop.fs.Path.<init>(Path.java:254) at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:133) at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:137) at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:150) at org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(Warehouse.java:163) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB_core(HiveMetaStore.java:636) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:655) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:431) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:148) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:79) ... 73 more ``` If the default database is present at runtime, the app can work with it, and if we create a database, it gets fully qualified, for example ```sql spark-sql> create database test; Time taken: 0.052 seconds spark-sql> desc database test; Database Name test Comment Location file:/Users/kentyao/Downloads/spark/spark-3.2.0-SNAPSHOT-bin-20210226/lakehouse/test.db Owner kentyao Time taken: 0.023 seconds, Fetched 4 row(s) ``` Another thing is that the log becomes nubilous, for example. ```logtalk 21/02/27 13:54:17 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('datalake'). 21/02/27 13:54:17 INFO SharedState: Warehouse path is 'lakehouse'. ``` ### Why are the changes needed? fix bug and ambiguity ### Does this PR introduce _any_ user-facing change? yes, the path now resolved with proper order - `warehouse->database->table->partition` ### How was this patch tested? w/ ut added Closes #31671 from yaooqinn/SPARK-34558. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/internal/SharedState.scala | 16 +++++---- .../spark/sql/SparkSessionBuilderSuite.scala | 41 ++++++++++++++++++---- .../spark/sql/hive/HiveSharedStateSuite.scala | 20 ++++++----- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 12 ++++--- 4 files changed, 62 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index ac1dd4e..852a9b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FsUrlStreamHandlerFactory +import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging @@ -253,9 +253,8 @@ object SharedState extends Logging { val warehousePath = if (hiveWarehouseDir != null && sparkWarehouseOption.isEmpty) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. - sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is set. Setting" + - s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey ('$hiveWarehouseDir').") + s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey.") hiveWarehouseDir } else { // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using @@ -264,12 +263,15 @@ object SharedState extends Logging { // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. val sparkWarehouseDir = sparkWarehouseOption.getOrElse(WAREHOUSE_PATH.defaultValueString) logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " + - s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") - sparkConf.set(WAREHOUSE_PATH.key, sparkWarehouseDir) - hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir) + s"${WAREHOUSE_PATH.key}.") sparkWarehouseDir } - logInfo(s"Warehouse path is '$warehousePath'.") + + val tempPath = new Path(warehousePath) + val qualifiedWarehousePath = tempPath.getFileSystem(hadoopConf).makeQualified(tempPath).toString + sparkConf.set(WAREHOUSE_PATH.key, qualifiedWarehousePath) + hadoopConf.set(hiveWarehouseKey, qualifiedWarehousePath) + logInfo(s"Warehouse path is '$qualifiedWarehousePath'.") initialConfigs -- Seq(WAREHOUSE_PATH.key, hiveWarehouseKey) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 1f16bb6..e8e2f68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} @@ -240,7 +241,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { .getOrCreate() assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2") assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532-2") - assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2") + assert(session.conf.get(WAREHOUSE_PATH) contains "SPARK-31532-db-2") } test("SPARK-32062: reset listenerRegistered in SparkSession") { @@ -306,14 +307,14 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { // newly specified values val sharedWH = spark.sharedState.conf.get(wh) val sharedTD = spark.sharedState.conf.get(td) - assert(sharedWH === "./data2", + assert(sharedWH contains "data2", "The warehouse dir in shared state should be determined by the 1st created spark session") assert(sharedTD === "alice", "Static sql configs in shared state should be determined by the 1st created spark session") assert(spark.sharedState.conf.getOption(custom).isEmpty, "Dynamic sql configs is session specific") - assert(spark.conf.get(wh) === sharedWH, + assert(spark.conf.get(wh) contains sharedWH, "The warehouse dir in session conf and shared state conf should be consistent") assert(spark.conf.get(td) === sharedTD, "Static sql configs in session conf and shared state conf should be consistent") @@ -321,7 +322,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { spark.sql("RESET") - assert(spark.conf.get(wh) === sharedWH, + assert(spark.conf.get(wh) contains sharedWH, "The warehouse dir in shared state should be respect after RESET") assert(spark.conf.get(td) === sharedTD, "Static sql configs in shared state should be respect after RESET") @@ -331,7 +332,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { val spark2 = SparkSession.builder() .config(wh, "./data3") .config(custom, "kyaoo").getOrCreate() - assert(spark2.conf.get(wh) === sharedWH) + assert(spark2.conf.get(wh) contains sharedWH) assert(spark2.conf.get(td) === sharedTD) assert(spark2.conf.get(custom) === "kyaoo") } @@ -352,7 +353,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { spark.sql(s"SET $custom=c1") assert(spark.conf.get(custom) === "c1") spark.sql("RESET") - assert(spark.conf.get(wh) === "./data0", + assert(spark.conf.get(wh) contains "data0", "The warehouse dir in shared state should be respect after RESET") assert(spark.conf.get(td) === "bob", "Static sql configs in shared state should be respect after RESET") @@ -381,7 +382,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { spark2.sql(s"SET $custom=c1") assert(spark2.conf.get(custom) === "c1") spark2.sql("RESET") - assert(spark2.conf.get(wh) === "./data1") + assert(spark2.conf.get(wh) contains "data1") assert(spark2.conf.get(td) === "alice") assert(spark2.conf.get(custom) === "c2") @@ -412,4 +413,30 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { } assert(!logAppender.loggingEvents.exists(_.getRenderedMessage.contains(msg))) } + + Seq(".", "..", "dir0", "dir0/dir1", "/dir0/dir1", "./dir0").foreach { pathStr => + test(s"SPARK-34558: warehouse path ($pathStr) should be qualified for spark/hadoop conf") { + val path = new Path(pathStr) + val conf = new SparkConf().set(WAREHOUSE_PATH, pathStr) + val session = SparkSession.builder() + .master("local") + .config(conf) + .getOrCreate() + val hadoopConf = session.sessionState.newHadoopConf() + val expected = path.getFileSystem(hadoopConf).makeQualified(path).toString + // session related configs + assert(hadoopConf.get("hive.metastore.warehouse.dir") === expected) + assert(session.conf.get(WAREHOUSE_PATH) === expected) + assert(session.sessionState.conf.warehousePath === expected) + + // shared configs + assert(session.sharedState.conf.get(WAREHOUSE_PATH) === expected) + assert(session.sharedState.hadoopConf.get("hive.metastore.warehouse.dir") === expected) + + // spark context configs + assert(session.sparkContext.conf.get(WAREHOUSE_PATH) === expected) + assert(session.sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") === + expected) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 4570e72..d23293b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} @@ -35,7 +37,7 @@ class HiveSharedStateSuite extends SparkFunSuite { test("initial configs should be passed to SharedState but not SparkContext") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") val sc = SparkContext.getOrCreate(conf) - val wareHouseDir = Utils.createTempDir().toString + val warehousePath = Utils.createTempDir().toString val invalidPath = "invalid/path" val metastorePath = Utils.createTempDir() val tmpDb = "tmp_db" @@ -45,8 +47,8 @@ class HiveSharedStateSuite extends SparkFunSuite { // Especially, all these configs are passed to the cloned confs inside SharedState for sharing // cross sessions. val initialConfigs = Map("spark.foo" -> "bar", - WAREHOUSE_PATH.key -> wareHouseDir, - ConfVars.METASTOREWAREHOUSE.varname -> wareHouseDir, + WAREHOUSE_PATH.key -> warehousePath, + ConfVars.METASTOREWAREHOUSE.varname -> warehousePath, CATALOG_IMPLEMENTATION.key -> "hive", ConfVars.METASTORECONNECTURLKEY.varname -> s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true", @@ -56,9 +58,11 @@ class HiveSharedStateSuite extends SparkFunSuite { initialConfigs.foreach { case (k, v) => builder.config(k, v) } val ss = builder.getOrCreate() val state = ss.sharedState - assert(sc.conf.get(WAREHOUSE_PATH.key) === wareHouseDir, + val qualifiedWHPath = + FileUtils.makeQualified(new Path(warehousePath), sc.hadoopConfiguration).toString + assert(sc.conf.get(WAREHOUSE_PATH.key) === qualifiedWHPath, "initial warehouse conf in session options can affect application wide spark conf") - assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) === wareHouseDir, + assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) === qualifiedWHPath, "initial warehouse conf in session options can affect application wide hadoop conf") assert(!state.sparkContext.conf.contains("spark.foo"), @@ -68,7 +72,7 @@ class HiveSharedStateSuite extends SparkFunSuite { val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client assert(client.getConf("spark.foo", "") === "bar", "session level conf should be passed to catalog") - assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, "") === wareHouseDir, + assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, "") === qualifiedWHPath, "session level conf should be passed to catalog") assert(state.globalTempViewManager.database === tmpDb) @@ -76,12 +80,12 @@ class HiveSharedStateSuite extends SparkFunSuite { val ss2 = builder.config("spark.foo", "bar2222").config(WAREHOUSE_PATH.key, invalidPath).getOrCreate() - assert(ss2.sparkContext.conf.get(WAREHOUSE_PATH.key) !== invalidPath, + assert(!ss2.sparkContext.conf.get(WAREHOUSE_PATH.key).contains(invalidPath), "warehouse conf in session options can't affect application wide spark conf") assert(ss2.sparkContext.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath, "warehouse conf in session options can't affect application wide hadoop conf") assert(ss.conf.get("spark.foo") === "bar2222", "session level conf should be passed to catalog") - assert(ss.conf.get(WAREHOUSE_PATH) !== invalidPath, + assert(!ss.conf.get(WAREHOUSE_PATH).contains(invalidPath), "session level conf should be passed to catalog") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 4e64b4d..af8a23d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -23,6 +23,7 @@ import scala.util.Properties import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.common.FileUtils import org.scalatest.Assertions._ import org.scalatest.BeforeAndAfterEach import org.scalatest.matchers.must.Matchers @@ -408,10 +409,11 @@ object SetWarehouseLocationTest extends Logging { } - if (sparkSession.conf.get(WAREHOUSE_PATH.key) != expectedWarehouseLocation) { + val qualifiedWHPath = FileUtils.makeQualified( + new Path(expectedWarehouseLocation), sparkSession.sparkContext.hadoopConfiguration).toString + if (sparkSession.conf.get(WAREHOUSE_PATH.key) != qualifiedWHPath) { throw new Exception( - s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location " + - s"$expectedWarehouseLocation.") + s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location $qualifiedWHPath.") } val catalog = sparkSession.sessionState.catalog @@ -424,7 +426,7 @@ object SetWarehouseLocationTest extends Logging { val tableMetadata = catalog.getTableMetadata(TableIdentifier("testLocation", Some("default"))) val expectedLocation = - CatalogUtils.stringToURI(s"file:${expectedWarehouseLocation.toString}/testlocation") + CatalogUtils.stringToURI(s"$qualifiedWHPath/testlocation") val actualLocation = tableMetadata.location if (actualLocation != expectedLocation) { throw new Exception( @@ -440,7 +442,7 @@ object SetWarehouseLocationTest extends Logging { val tableMetadata = catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB"))) val expectedLocation = CatalogUtils.stringToURI( - s"file:${expectedWarehouseLocation.toString}/testlocationdb.db/testlocation") + s"$qualifiedWHPath/testlocationdb.db/testlocation") val actualLocation = tableMetadata.location if (actualLocation != expectedLocation) { throw new Exception( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org