This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e3b95e88a76 [HUDI-5678] Fix `deduceShuffleParallelism` in row-writing
Bulk Insert helper (#7818)
e3b95e88a76 is described below
commit e3b95e88a7645b3a03bf109671e69e354079fe5d
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Feb 2 11:50:48 2023 -0500
[HUDI-5678] Fix `deduceShuffleParallelism` in row-writing Bulk Insert
helper (#7818)
`deduceShuffleParallelism` returns 0 in some situations which should never
occur.
---
.../hudi/HoodieDatasetBulkInsertHelper.scala | 4 +-
.../org/apache/spark/sql/HoodieUnsafeUtils.scala | 20 ++++++-
.../apache/spark/sql/hudi/TestInsertTable.scala | 66 ++++++++++++++++++++++
3 files changed, 85 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 7e235993e33..a6488b07b51 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -34,7 +34,7 @@ import
org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath,
getNestedInternalRowValue}
-import org.apache.spark.sql.HoodieUnsafeUtils.getOutputPartitioning
+import org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -45,7 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.{asScalaBufferConverter,
seqAsJavaListConverter}
object HoodieDatasetBulkInsertHelper
- extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df
=> getOutputPartitioning(df).numPartitions)) with Logging {
+ extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df
=> getNumPartitions(df))) with Logging {
/**
* Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following
steps:
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index dfd416b6f52..ee22f714c9c 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -22,7 +22,8 @@ import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
UnknownPartitioning}
+import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
@@ -38,8 +39,21 @@ object HoodieUnsafeUtils {
* but instead will just execute Spark resolution, optimization and
actual execution planning stages
* returning instance of [[SparkPlan]] ready for execution
*/
- def getOutputPartitioning(df: DataFrame): Partitioning =
- df.queryExecution.executedPlan.outputPartitioning
+ def getNumPartitions(df: DataFrame): Int = {
+ // NOTE: In general we'd rely on [[outputPartitioning]] of the executable
[[SparkPlan]] to determine
+ // number of partitions plan is going to be executed with.
+ // However in case of [[LogicalRDD]] plan's output-partitioning will
be stubbed as [[UnknownPartitioning]]
+ // and therefore we will be falling back to determine number of
partitions by looking at the RDD itself
+ df.queryExecution.logical match {
+ case LogicalRDD(_, rdd, outputPartitioning, _, _) =>
+ outputPartitioning match {
+ case _: UnknownPartitioning => rdd.getNumPartitions
+ case _ => outputPartitioning.numPartitions
+ }
+
+ case _ => df.queryExecution.executedPlan.outputPartitioning.numPartitions
+ }
+ }
/**
* Creates [[DataFrame]] from provided [[plan]]
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index b092a68e20d..b33deebdf72 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -26,9 +26,11 @@ import org.apache.hudi.common.model.WriteOperationType
import
org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
+import org.scalatest.Inspectors.forAll
import java.io.File
@@ -1059,4 +1061,68 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
)
}
}
+
+ /**
+ * This test is to make sure that bulk insert doesn't create a bunch of tiny
files if
+ * hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start
with the partition columns
+ *
+ * NOTE: Additionally, this test serves as a smoke test making sure that all
of the bulk-insert
+ * modes work
+ */
+ test(s"Test Bulk Insert with all sort-modes") {
+ withTempDir { basePath =>
+ BulkInsertSortMode.values().foreach { sortMode =>
+ val tableName = generateTableName
+ // Remove these with [HUDI-5419]
+ spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
+
spark.sessionState.conf.unsetConf("hoodie.datasource.write.insert.drop.duplicates")
+
spark.sessionState.conf.unsetConf("hoodie.merge.allow.duplicate.on.inserts")
+
spark.sessionState.conf.unsetConf("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled")
+ // Default parallelism is 200 which means in global sort, each record
will end up in a different spark partition so
+ // 9 files would be created. Setting parallelism to 3 so that each
spark partition will contain a hudi partition.
+ val parallelism = if
(sortMode.name.equals(BulkInsertSortMode.GLOBAL_SORT.name())) {
+ "hoodie.bulkinsert.shuffle.parallelism = 3,"
+ } else {
+ ""
+ }
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | dt string
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'name',
+ | type = 'cow',
+ | $parallelism
+ | hoodie.bulkinsert.sort.mode = '${sortMode.name}'
+ | )
+ | partitioned by (dt)
+ | location '${basePath.getCanonicalPath}/$tableName'
+ """.stripMargin)
+
+ spark.sql("set hoodie.sql.bulk.insert.enable = true")
+ spark.sql("set hoodie.sql.insert.mode = non-strict")
+
+ spark.sql(
+ s"""insert into $tableName values
+ |(5, 'a', 35, '2021-05-21'),
+ |(1, 'a', 31, '2021-01-21'),
+ |(3, 'a', 33, '2021-03-21'),
+ |(4, 'b', 16, '2021-05-21'),
+ |(2, 'b', 18, '2021-01-21'),
+ |(6, 'b', 17, '2021-03-21'),
+ |(8, 'a', 21, '2021-05-21'),
+ |(9, 'a', 22, '2021-01-21'),
+ |(7, 'a', 23, '2021-03-21')
+ |""".stripMargin)
+
+ // TODO re-enable
+ //assertResult(3)(spark.sql(s"select distinct _hoodie_file_name from
$tableName").count())
+ }
+ }
+ }
}