This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cdaaa3c4c7 [HUDI-4346] Fix params not update
BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED (#5999)
cdaaa3c4c7 is described below
commit cdaaa3c4c7f31e7bfdcc9ecd06df347e037e34bd
Author: RexAn <[email protected]>
AuthorDate: Thu Jun 30 10:26:00 2022 +0800
[HUDI-4346] Fix params not update BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED
(#5999)
---
.../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index fe4391c0a5..005d21d862 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -520,7 +520,8 @@ object HoodieSparkSqlWriter {
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
throw new HoodieException("Dropping duplicates with bulk_insert in row
writer path is not supported yet")
}
- val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key,
schema.toString)
+ val params: mutable.Map[String, String] =
collection.mutable.Map(parameters.toSeq: _*)
+ params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString,
path, tblName, mapAsJavaMap(params))
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if
(populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt =
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
@@ -535,7 +536,7 @@ object HoodieSparkSqlWriter {
new NonSortPartitionerWithRows()
}
val arePartitionRecordsSorted =
bulkInsertPartitionerRows.arePartitionRecordsSorted()
-
parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
arePartitionRecordsSorted.toString)
+ params(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED) =
arePartitionRecordsSorted.toString
val isGlobalIndex = if (populateMetaFields) {
SparkHoodieIndexFactory.isGlobalIndex(writeConfig)
} else {