[
https://issues.apache.org/jira/browse/HUDI-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Guo closed HUDI-5678.
---------------------------
Resolution: Fixed
> deduceShuffleParallelism Returns 0 when that should never happen
> ----------------------------------------------------------------
>
> Key: HUDI-5678
> URL: https://issues.apache.org/jira/browse/HUDI-5678
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Jonathan Vexler
> Assignee: Alexey Kudinkin
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.13.0
>
> Attachments: image (1).png
>
>
> This test
> {code:java}
> forAll(BulkInsertSortMode.values().toList) { (sortMode: BulkInsertSortMode)
> => val sortModeName = sortMode.name() test(s"Test Bulk Insert with
> BulkInsertSortMode: '$sortModeName'") { withTempDir { basePath =>
> testBulkInsertPartitioner(basePath, sortModeName) } } }
> def testBulkInsertPartitioner(basePath: File, sortModeName: String): Unit =
> { val tableName = generateTableName //Remove these with [HUDI-5419]
> spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
> spark.sessionState.conf.unsetConf("hoodie.datasource.write.insert.drop.duplicates")
>
> spark.sessionState.conf.unsetConf("hoodie.merge.allow.duplicate.on.inserts")
>
> spark.sessionState.conf.unsetConf("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled")
> //Default parallelism is 200 which means in global sort, each record will
> end up in a different spark partition so //9 files would be created.
> Setting parallelism to 3 so that each spark partition will contain a hudi
> partition. val parallelism = if
> (sortModeName.equals(BulkInsertSortMode.GLOBAL_SORT.name())) {
> "hoodie.bulkinsert.shuffle.parallelism = 3," } else { "" }
> spark.sql( s""" |create table $tableName ( | id int,
> | name string, | price double, | dt string |)
> using hudi | tblproperties ( | primaryKey = 'id', |
> preCombineField = 'name', | type = 'cow', | $parallelism
> | hoodie.bulkinsert.sort.mode = '$sortModeName' | ) |
> partitioned by (dt) | location
> '${basePath.getCanonicalPath}/$tableName' """.stripMargin)
> spark.sql("set hoodie.sql.bulk.insert.enable = true") spark.sql("set
> hoodie.sql.insert.mode = non-strict") spark.sql( s"""insert into
> $tableName values |(5, 'a', 35, '2021-05-21'), |(1, 'a', 31,
> '2021-01-21'), |(3, 'a', 33, '2021-03-21'), |(4, 'b', 16,
> '2021-05-21'), |(2, 'b', 18, '2021-01-21'), |(6, 'b', 17,
> '2021-03-21'), |(8, 'a', 21, '2021-05-21'), |(9, 'a', 22,
> '2021-01-21'), |(7, 'a', 23, '2021-03-21') |""".stripMargin)
> assertResult(3)(spark.sql(s"select distinct _hoodie_file_name from
> $tableName").count()) } {code}
> Fails due to
> {code:java}
> requirement failed: Number of partitions (0) must be positive.
> java.lang.IllegalArgumentException: requirement failed: Number of partitions
> (0) must be positive.
> at scala.Predef$.require(Predef.scala:224)
> at
> org.apache.spark.sql.catalyst.plans.logical.Repartition.<init>(basicLogicalOperators.scala:951)
> at org.apache.spark.sql.Dataset.coalesce(Dataset.scala:2946)
> at
> org.apache.hudi.execution.bulkinsert.PartitionSortPartitionerWithRows.repartitionRecords(PartitionSortPartitionerWithRows.java:48)
> at
> org.apache.hudi.execution.bulkinsert.PartitionSortPartitionerWithRows.repartitionRecords(PartitionSortPartitionerWithRows.java:34)
> at
> org.apache.hudi.HoodieDatasetBulkInsertHelper$.prepareForBulkInsert(HoodieDatasetBulkInsertHelper.scala:124)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:763)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:239)
> at
> org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:107)
> at
> org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:60)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3369)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3368)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
> at
> org.apache.spark.sql.hudi.TestInsertTable.testBulkInsertPartitioner(TestInsertTable.scala:1204)
> {code}
> !image (1).png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)