[
https://issues.apache.org/jira/browse/HUDI-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380985#comment-17380985
]
ASF GitHub Bot commented on HUDI-2059:
--------------------------------------
xiarixiaoyao commented on pull request #3181:
URL: https://github.com/apache/hudi/pull/3181#issuecomment-880359721
@garyli1019 thanks for your review。 close this pr, since HUDI-2170
solved this problem。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> When log exists in mor table, clustering is triggered. The query result
> shows that the update record in log is lost
> --------------------------------------------------------------------------------------------------------------------
>
> Key: HUDI-2059
> URL: https://issues.apache.org/jira/browse/HUDI-2059
> Project: Apache Hudi
> Issue Type: Bug
> Affects Versions: 0.8.0
> Environment: hadoop 3.1.1
> spark3.1.1/spark2.4.5
> hive3.1.1
> Reporter: tao meng
> Assignee: tao meng
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.9.0
>
>
> When log exists in mor table, and clustering is triggered. The query result
> shows that the update record of log is lost。
> the reason of this problem is that: hoodie use HoodieFileSliceReader to read
> table data and then do clustering. HoodieFileSliceReader call
> HoodieMergedLogRecordScanner.
> processNextRecord to merge update values and old valuse, when call that
> function old values is reserved update values is discarded, this is wrong。
> test step:
> // step1 : create hudi mor table
> val df = spark.range(0, 1000).toDF("keyid")
> .withColumn("col3", expr("keyid"))
> .withColumn("age", lit(1))
> .withColumn("p", lit(2))
> df.write.format("hudi").
> option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
> option(PRECOMBINE_FIELD_OPT_KEY, "col3").
> option(RECORDKEY_FIELD_OPT_KEY, "keyid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "p").
> option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
> option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
> classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
> option("hoodie.insert.shuffle.parallelism", "4").
> option("hoodie.upsert.shuffle.parallelism", "4").
> option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
> .mode(SaveMode.Overwrite).save(basePath)
> // step2, update age where keyid < 5 to produce log files
> val df1 = spark.range(0, 5).toDF("keyid")
> .withColumn("col3", expr("keyid"))
> .withColumn("age", lit(1 + 1000))
> .withColumn("p", lit(2))
> df1.write.format("hudi").
> option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
> option(PRECOMBINE_FIELD_OPT_KEY, "col3").
> option(RECORDKEY_FIELD_OPT_KEY, "keyid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "p").
> option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
> option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
> classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
> option("hoodie.insert.shuffle.parallelism", "4").
> option("hoodie.upsert.shuffle.parallelism", "4").
> option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
> .mode(SaveMode.Append).save(basePath)
> // step3, do cluster inline
> val df2 = spark.range(6, 10).toDF("keyid")
> .withColumn("col3", expr("keyid"))
> .withColumn("age", lit(1 + 2000))
> .withColumn("p", lit(2))
> df2.write.format("hudi").
> option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
> option(PRECOMBINE_FIELD_OPT_KEY, "col3").
> option(RECORDKEY_FIELD_OPT_KEY, "keyid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "p").
> option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
> option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
> classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
> option("hoodie.insert.shuffle.parallelism", "4").
> option("hoodie.upsert.shuffle.parallelism", "4").
> option("hoodie.parquet.small.file.limit", "0").
> option("hoodie.clustering.inline", "true").
> option("hoodie.clustering.inline.max.commits", "1").
> option("hoodie.clustering.plan.strategy.target.file.max.bytes",
> "1073741824").
> option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
> option("hoodie.clustering.plan.strategy.max.bytes.per.group",
> Long.MaxValue.toString)
> .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
> .mode(SaveMode.Append).save(basePath)
> spark.read.format("hudi")
> .load(basePath).select("age").where("keyid = 0").show(100, false)
> +---+
> |age|
> +---+
> |1 |
> +—+
> the result is wrong, since we update the value of age to 1001 at step 2.
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)