This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new a0804e55214 Revert [SPARK-39203][SQL] Rewrite table location to
absolute URI based on database URI
a0804e55214 is described below
commit a0804e55214e75d9e0f8a6bd2b4fef0b435e3c51
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Apr 21 10:28:22 2023 +0900
Revert [SPARK-39203][SQL] Rewrite table location to absolute URI based on
database URI
### What changes were proposed in this pull request?
This reverts https://github.com/apache/spark/pull/36625 and its followup
https://github.com/apache/spark/pull/38321 .
### Why are the changes needed?
External table location can be arbitrary and has no connection with the
database location. It can be wrong to qualify the external table location based
on the database location.
If a table written by old Spark versions does not have a qualified
location, there is no way to restore it as the information is already lost.
People can manually fix the table locations assuming they are under the same
HDFS cluster with the database location, by themselves.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A
Closes #40871 from cloud-fan/minor.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit afd9e2cc0a73069514eef5c5eb7a3ebed8e4b8cf)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/hive/HiveExternalCatalog.scala | 27 ++--------------
.../spark/sql/hive/client/HiveClientImpl.scala | 36 +++++-----------------
.../spark/sql/hive/HiveExternalCatalogSuite.scala | 15 ---------
3 files changed, 10 insertions(+), 68 deletions(-)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 9af34ba3b20..0c556cd34ed 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.lang.reflect.InvocationTargetException
-import java.net.URI
import java.util
import java.util.Locale
@@ -850,15 +849,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
// source tables. Here we set the table location to `locationUri` field
and filter out the
// path option in storage properties, to avoid exposing this concept
externally.
val storageWithLocation = {
- val tableLocation = getLocationFromStorageProps(table).map { path =>
- // Before SPARK-19257, created data source table does not use absolute
uri.
- // This makes Spark can't read these tables across HDFS clusters.
- // Rewrite table path to absolute uri based on location uri (The
location uri has been
- // rewritten by HiveClientImpl.convertHiveTableToCatalogTable) to fix
this issue.
- toAbsoluteURI(CatalogUtils.stringToURI(path),
table.storage.locationUri)
- }
+ val tableLocation = getLocationFromStorageProps(table)
// We pass None as `newPath` here, to remove the path option in storage
properties.
- updateLocationInStorageProps(table, newPath = None).copy(locationUri =
tableLocation)
+ updateLocationInStorageProps(table, newPath = None).copy(
+ locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
}
val storageWithoutHiveGeneratedProperties =
storageWithLocation.copy(properties =
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
@@ -1445,19 +1439,4 @@ object HiveExternalCatalog {
isHiveCompatibleDataType(m.keyType) &&
isHiveCompatibleDataType(m.valueType)
case _ => true
}
-
- /** Rewrite uri to absolute location. For example:
- * uri: /user/hive/warehouse/test_table
- * absoluteUri: viewfs://clusterA/user/hive/warehouse/
- * The result is: viewfs://clusterA/user/hive/warehouse/test_table
- */
- private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = {
- if (!uri.isAbsolute && absoluteUri.isDefined) {
- val aUri = absoluteUri.get
- new URI(aUri.getScheme, aUri.getUserInfo, aUri.getHost, aUri.getPort,
- uri.getPath, uri.getQuery, uri.getFragment)
- } else {
- uri
- }
- }
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index aaa0afc344d..48902a94593 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client
import java.io.PrintStream
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
-import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{HashMap => JHashMap, Locale, Map => JMap}
import java.util.concurrent.TimeUnit._
@@ -54,7 +53,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException,
NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException,
NoSuchTableException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -539,21 +537,7 @@ private[hive] class HiveClientImpl(
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
- locationUri = shim.getDataLocation(h).map { loc =>
- val tableUri = stringToURI(loc)
- if (h.getTableType == HiveTableType.VIRTUAL_VIEW) {
- // Data location of SQL view is useless. Do not qualify it even if
it's present, as
- // it can be an invalid path.
- tableUri
- } else {
- // Before SPARK-19257, created data source table does not use
absolute uri.
- // This makes Spark can't read these tables across HDFS clusters.
- // Rewrite table location to absolute uri based on database uri to
fix this issue.
- val absoluteUri = Option(tableUri).filterNot(_.isAbsolute)
- .map(_ =>
stringToURI(client.getDatabase(h.getDbName).getLocationUri))
- HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri)
- }
- },
+ locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI),
// To avoid ClassNotFound exception, we try our best to not get the
format class, but get
// the class name directly. However, for non-native tables, there is
no interface to get
// the format class name, so we may still throw ClassNotFound in this
case.
@@ -793,8 +777,7 @@ private[hive] class HiveClientImpl(
spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState
{
val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
val hivePartition = shim.getPartition(client, hiveTable, spec.asJava,
false)
- Option(hivePartition)
- .map(fromHivePartition(_,
rawHiveTable.toCatalogTable.storage.locationUri))
+ Option(hivePartition).map(fromHivePartition)
}
override def getPartitions(
@@ -816,10 +799,7 @@ private[hive] class HiveClientImpl(
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
s
}
- val absoluteUri =
shim.getDataLocation(hiveTable).map(stringToURI).filterNot(_.isAbsolute)
- .map(_ =>
stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri))
- val parts = shim.getPartitions(client, hiveTable, partSpec.asJava)
- .map(fromHivePartition(_, absoluteUri))
+ val parts = shim.getPartitions(client, hiveTable,
partSpec.asJava).map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}
@@ -829,9 +809,8 @@ private[hive] class HiveClientImpl(
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState
{
val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
hiveTable.setOwner(userName)
- val parts =
- shim.getPartitionsByFilter(client, hiveTable, predicates,
rawHiveTable.toCatalogTable)
- .map(fromHivePartition(_,
rawHiveTable.toCatalogTable.storage.locationUri))
+ val parts = shim.getPartitionsByFilter(
+ client, hiveTable, predicates,
rawHiveTable.toCatalogTable).map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}
@@ -1212,7 +1191,7 @@ private[hive] object HiveClientImpl extends Logging {
/**
* Build the native partition metadata from Hive's Partition.
*/
- def fromHivePartition(hp: HivePartition, absoluteUri: Option[URI]):
CatalogTablePartition = {
+ def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
val apiPartition = hp.getTPartition
val properties: Map[String, String] = if (hp.getParameters != null) {
hp.getParameters.asScala.toMap
@@ -1222,8 +1201,7 @@ private[hive] object HiveClientImpl extends Logging {
CatalogTablePartition(
spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
storage = CatalogStorageFormat(
- locationUri = Option(HiveExternalCatalog.toAbsoluteURI(
- stringToURI(apiPartition.getSd.getLocation), absoluteUri)),
+ locationUri =
Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)),
inputFormat = Option(apiPartition.getSd.getInputFormat),
outputFormat = Option(apiPartition.getSd.getOutputFormat),
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 7c36198c326..e413e0ee73c 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -200,19 +200,4 @@ class HiveExternalCatalogSuite extends
ExternalCatalogSuite {
assert(alteredTable.provider === Some("foo"))
})
}
-
- test("SPARK-39203: Rewrite table location to absolute location based on
database location") {
- val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1")
- val tableLocation2 =
CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2")
- val dbLocation =
CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/")
-
- assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation))
-
.equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1")))
-
- assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None)
- .equals(tableLocation1))
-
- assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation))
- .equals(tableLocation2))
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]