This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 978a915 [SPARK-32712][SQL] Support writing Hive bucketed table (Hive
file formats with Hive hash)
978a915 is described below
commit 978a9154069e12812d2d2976b0dd33c1e5700d21
Author: Cheng Su <[email protected]>
AuthorDate: Mon Sep 27 15:58:41 2021 +0800
[SPARK-32712][SQL] Support writing Hive bucketed table (Hive file formats
with Hive hash)
### What changes were proposed in this pull request?
This is to support writing Hive bucketed table with Hive file formats (the
code path for Hive table write - `InsertIntoHiveTable`). The bucketed table is
partitioned with Hive hash, same as Hive, Presto and Trino.
### Why are the changes needed?
To make Spark write other-SQL-engines-compatible bucketed table. Same
motivation as https://github.com/apache/spark/pull/33432 .
### Does this PR introduce _any_ user-facing change?
Yes. Before this PR, writing to these Hive bucketed table would throw an
exception in Spark if config "hive.enforce.bucketing" or "hive.enforce.sorting"
set to true. After this PR, writing to these Hive bucketed table would succeed.
The table can be read back by Presto and Trino efficiently as other Hive
bucketed table.
### How was this patch tested?
Modified unit test in `BucketedWriteWithHiveSupportSuite.scala`, to verify
bucket file names and each row in each bucket is written properly, for Hive
write code path as well.
Closes #34103 from c21/hive-bucket.
Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/hive/execution/InsertIntoHiveTable.scala | 24 +---------
.../spark/sql/hive/execution/SaveAsHiveFile.scala | 14 ++++--
.../org/apache/spark/sql/hive/InsertSuite.scala | 32 ++++----------
.../BucketedWriteWithHiveSupportSuite.scala | 51 +++++++++++++---------
4 files changed, 50 insertions(+), 71 deletions(-)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 108401c..7484a1e 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -168,27 +168,6 @@ case class InsertIntoHiveTable(
}
}
- table.bucketSpec match {
- case Some(bucketSpec) =>
- // Writes to bucketed hive tables are allowed only if user does not
care about maintaining
- // table's bucketing i.e. both "hive.enforce.bucketing" and
"hive.enforce.sorting" are
- // set to false
- val enforceBucketingConfig = "hive.enforce.bucketing"
- val enforceSortingConfig = "hive.enforce.sorting"
-
- val message = s"Output Hive table ${table.identifier} is bucketed but
Spark " +
- "currently does NOT populate bucketed output which is compatible
with Hive."
-
- if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
- hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
- throw new AnalysisException(message)
- } else {
- logWarning(message + s" Inserting data anyways since both
$enforceBucketingConfig and " +
- s"$enforceSortingConfig are set to false.")
- }
- case _ => // do nothing since table has no bucketing
- }
-
val partitionAttributes =
partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
val attr = query.resolve(name :: Nil,
sparkSession.sessionState.analyzer.resolver).getOrElse {
throw QueryCompilationErrors.cannotResolveAttributeError(
@@ -207,7 +186,8 @@ case class InsertIntoHiveTable(
hadoopConf = hadoopConf,
fileSinkConf = fileSinkConf,
outputLocation = tmpLocation.toString,
- partitionAttributes = partitionAttributes)
+ partitionAttributes = partitionAttributes,
+ bucketSpec = table.bucketSpec)
if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index b30de0c..7f88572 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -31,12 +31,13 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
-import org.apache.spark.sql.execution.datasources.FileFormatWriter
+import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FileFormatWriter}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveVersion
@@ -53,7 +54,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand
{
fileSinkConf: FileSinkDesc,
outputLocation: String,
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
- partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
+ partitionAttributes: Seq[Attribute] = Nil,
+ bucketSpec: Option[BucketSpec] = None): Set[String] = {
val isCompressed =
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT)
match {
@@ -84,6 +86,10 @@ private[hive] trait SaveAsHiveFile extends
DataWritingCommand {
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputLocation)
+ val options = bucketSpec
+ .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite ->
"true"))
+ .getOrElse(Map.empty)
+
FileFormatWriter.write(
sparkSession = sparkSession,
plan = plan,
@@ -93,9 +99,9 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand
{
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations,
outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionAttributes,
- bucketSpec = None,
+ bucketSpec = bucketSpec,
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
- options = Map.empty)
+ options = options)
}
protected def getExternalTmpPath(
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index b715f48..101cf0c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -514,31 +514,15 @@ class InsertSuite extends QueryTest with
TestHiveSingleton with BeforeAndAfter
}
}
- testBucketedTable("INSERT should NOT fail if strict bucketing is NOT
enforced") {
- tableName =>
- withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting"
-> "false") {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
- checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
- }
- }
-
- testBucketedTable("INSERT should fail if strict bucketing / sorting is
enforced") {
- tableName =>
- withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting"
-> "false") {
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
- }
- }
- withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting"
-> "true") {
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
- }
- }
- withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting"
-> "true") {
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
- }
+ Seq("true", "false").foreach { enableHiveEnforce =>
+ withSQLConf("hive.enforce.bucketing" -> enableHiveEnforce,
+ "hive.enforce.sorting" -> enableHiveEnforce) {
+ testBucketedTable(s"INSERT should NOT fail if strict bucketing is
$enableHiveEnforce") {
+ tableName =>
+ sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+ checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3,
4))
}
+ }
}
test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
index c12caaa..dcd00d3 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, Expression,
HiveHash, Literal, Pmod}
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -48,29 +49,37 @@ class BucketedWriteWithHiveSupportSuite extends
BucketedWriteSuite with TestHive
val table = "hive_bucketed_table"
fileFormatsToTest.foreach { format =>
- withTable(table) {
- sql(
- s"""
- |CREATE TABLE IF NOT EXISTS $table (i int, j string)
- |PARTITIONED BY(k string)
- |CLUSTERED BY (i, j) SORTED BY (i) INTO 8 BUCKETS
- |STORED AS $format
- """.stripMargin)
+ Seq("true", "false").foreach { enableConvertMetastore =>
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key ->
enableConvertMetastore,
+ HiveUtils.CONVERT_METASTORE_ORC.key -> enableConvertMetastore) {
+ withTable(table) {
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS $table (i int, j string)
+ |PARTITIONED BY(k string)
+ |CLUSTERED BY (i, j) SORTED BY (i) INTO 8 BUCKETS
+ |STORED AS $format
+ """.stripMargin)
- val df =
- (0 until 50).map(i => (i % 13, i.toString, i % 5)).toDF("i", "j",
"k")
- df.write.mode(SaveMode.Overwrite).insertInto(table)
+ val df =
+ (0 until 50).map(i => (i % 13, i.toString, i % 5)).toDF("i",
"j", "k")
- for (k <- 0 until 5) {
- testBucketing(
- new File(tableDir(table), s"k=$k"),
- format,
- 8,
- Seq("i", "j"),
- Seq("i"),
- df,
- bucketIdExpression,
- getBucketIdFromFileName)
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ df.write.mode(SaveMode.Overwrite).insertInto(table)
+ }
+
+ for (k <- 0 until 5) {
+ testBucketing(
+ new File(tableDir(table), s"k=$k"),
+ format,
+ 8,
+ Seq("i", "j"),
+ Seq("i"),
+ df,
+ bucketIdExpression,
+ getBucketIdFromFileName)
+ }
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]