tao meng created HUDI-2059:
------------------------------
Summary: When log exists in mor table, clustering is triggered.
The query result shows that the update record in log is lost
Key: HUDI-2059
URL: https://issues.apache.org/jira/browse/HUDI-2059
Project: Apache Hudi
Issue Type: Bug
Affects Versions: 0.8.0
Environment: hadoop 3.1.1
spark3.1.1/spark2.4.5
hive3.1.1
Reporter: tao meng
Assignee: tao meng
Fix For: 0.9.0
When log exists in mor table, and clustering is triggered. The query result
shows that the update record of log is lost。
the reason of this problem is that: hoodie use HoodieFileSliceReader to read
table data and then do clustering. HoodieFileSliceReader call
HoodieMergedLogRecordScanner.
processNextRecord to merge update values and old valuse, when call that
function old values is reserved update values is discarded, this is wrong。
test step:
// step1 : create hudi mor table
val df = spark.range(0, 1000).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1))
.withColumn("p", lit(2))
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
option("hoodie.insert.shuffle.parallelism", "4").
option("hoodie.upsert.shuffle.parallelism", "4").
option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.mode(SaveMode.Overwrite).save(basePath)
// step2, update age where keyid < 5 to produce log files
val df1 = spark.range(0, 5).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1 + 1000))
.withColumn("p", lit(2))
df1.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
option("hoodie.insert.shuffle.parallelism", "4").
option("hoodie.upsert.shuffle.parallelism", "4").
option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.mode(SaveMode.Append).save(basePath)
// step3, do cluster inline
val df2 = spark.range(6, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1 + 2000))
.withColumn("p", lit(2))
df2.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
option("hoodie.insert.shuffle.parallelism", "4").
option("hoodie.upsert.shuffle.parallelism", "4").
option("hoodie.parquet.small.file.limit", "0").
option("hoodie.clustering.inline", "true").
option("hoodie.clustering.inline.max.commits", "1").
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
option("hoodie.clustering.plan.strategy.max.bytes.per.group",
Long.MaxValue.toString)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.mode(SaveMode.Append).save(basePath)
spark.read.format("hudi")
.load(basePath).select("age").where("keyid = 0").show(100, false)
+---+
|age|
+---+
|1 |
+—+
the result is wrong, since we update the value of age to 1001 at step 2.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)