tao meng created HUDI-2059:
------------------------------

             Summary: When log exists in mor table,  clustering is triggered. 
The query result shows that the update record in log is lost
                 Key: HUDI-2059
                 URL: https://issues.apache.org/jira/browse/HUDI-2059
             Project: Apache Hudi
          Issue Type: Bug
    Affects Versions: 0.8.0
         Environment: hadoop 3.1.1
spark3.1.1/spark2.4.5
hive3.1.1
            Reporter: tao meng
            Assignee: tao meng
             Fix For: 0.9.0


When log exists in mor table, and clustering is triggered. The query result 
shows that the update record of log is lost。

the reason of this problem is that:  hoodie use HoodieFileSliceReader to read 
table data and then do clustering.  HoodieFileSliceReader call 
HoodieMergedLogRecordScanner.

processNextRecord to merge update values and old valuse,   when call that 
function old values is reserved update values is discarded, this is wrong。

test step:

// step1 : create hudi mor table
val df = spark.range(0, 1000).toDF("keyid")
 .withColumn("col3", expr("keyid"))
 .withColumn("age", lit(1))
 .withColumn("p", lit(2))

df.write.format("hudi").
 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
 option(PRECOMBINE_FIELD_OPT_KEY, "col3").
 option(RECORDKEY_FIELD_OPT_KEY, "keyid").
 option(PARTITIONPATH_FIELD_OPT_KEY, "p").
 option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
 option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, 
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
 option("hoodie.insert.shuffle.parallelism", "4").
 option("hoodie.upsert.shuffle.parallelism", "4").
 option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
 .mode(SaveMode.Overwrite).save(basePath)

// step2, update age where keyid < 5 to produce log files
val df1 = spark.range(0, 5).toDF("keyid")
 .withColumn("col3", expr("keyid"))
 .withColumn("age", lit(1 + 1000))
 .withColumn("p", lit(2))
df1.write.format("hudi").
 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
 option(PRECOMBINE_FIELD_OPT_KEY, "col3").
 option(RECORDKEY_FIELD_OPT_KEY, "keyid").
 option(PARTITIONPATH_FIELD_OPT_KEY, "p").
 option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
 option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, 
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
 option("hoodie.insert.shuffle.parallelism", "4").
 option("hoodie.upsert.shuffle.parallelism", "4").
 option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
 .mode(SaveMode.Append).save(basePath)

// step3, do cluster inline
val df2 = spark.range(6, 10).toDF("keyid")
 .withColumn("col3", expr("keyid"))
 .withColumn("age", lit(1 + 2000))
 .withColumn("p", lit(2))

df2.write.format("hudi").
 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
 option(PRECOMBINE_FIELD_OPT_KEY, "col3").
 option(RECORDKEY_FIELD_OPT_KEY, "keyid").
 option(PARTITIONPATH_FIELD_OPT_KEY, "p").
 option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
 option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, 
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
 option("hoodie.insert.shuffle.parallelism", "4").
 option("hoodie.upsert.shuffle.parallelism", "4").
 option("hoodie.parquet.small.file.limit", "0").
 option("hoodie.clustering.inline", "true").
 option("hoodie.clustering.inline.max.commits", "1").
 option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
 option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
 option("hoodie.clustering.plan.strategy.max.bytes.per.group", 
Long.MaxValue.toString)
 .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
 .mode(SaveMode.Append).save(basePath)

spark.read.format("hudi")
 .load(basePath).select("age").where("keyid = 0").show(100, false)

+---+
|age|
+---+
|1 |
+—+

the result is wrong, since we update the value of age to 1001 at step 2.

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to