xiarixiaoyao opened a new pull request #3181:
URL: https://github.com/apache/hudi/pull/3181


   
   
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a 
pull request.*
   
   ## What is the purpose of the pull request
   When log exists in mor table, and clustering is triggered. The query result 
shows that the update record of log is lost。
   
   the reason of this problem is that:  hoodie use HoodieFileSliceReader to 
read table data and then do clustering.  HoodieFileSliceReader call 
HoodieMergedLogRecordScanner.
   
   processNextRecord to merge update values and old valuse,   when call that 
function old values is reserved update values is discarded, this is wrong。
   
   test step:
   
   // step1 : create hudi mor table
   val df = spark.range(0, 1000).toDF("keyid")
   .withColumn("col3", expr("keyid"))
   .withColumn("age", lit(1))
   .withColumn("p", lit(2))
   
   df.write.format("hudi").
   option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
   option(PRECOMBINE_FIELD_OPT_KEY, "col3").
   option(RECORDKEY_FIELD_OPT_KEY, "keyid").
   option(PARTITIONPATH_FIELD_OPT_KEY, "p").
   option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
   option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, 
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
   option("hoodie.insert.shuffle.parallelism", "4").
   option("hoodie.upsert.shuffle.parallelism", "4").
   option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
   .mode(SaveMode.Overwrite).save(basePath)
   
   // step2, update age where keyid < 5 to produce log files
   val df1 = spark.range(0, 5).toDF("keyid")
   .withColumn("col3", expr("keyid"))
   .withColumn("age", lit(1 + 1000))
   .withColumn("p", lit(2))
   df1.write.format("hudi").
   option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
   option(PRECOMBINE_FIELD_OPT_KEY, "col3").
   option(RECORDKEY_FIELD_OPT_KEY, "keyid").
   option(PARTITIONPATH_FIELD_OPT_KEY, "p").
   option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
   option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, 
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
   option("hoodie.insert.shuffle.parallelism", "4").
   option("hoodie.upsert.shuffle.parallelism", "4").
   option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
   .mode(SaveMode.Append).save(basePath)
   
   // step3, do cluster inline
   val df2 = spark.range(6, 10).toDF("keyid")
   .withColumn("col3", expr("keyid"))
   .withColumn("age", lit(1 + 2000))
   .withColumn("p", lit(2))
   
   df2.write.format("hudi").
   option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
   option(PRECOMBINE_FIELD_OPT_KEY, "col3").
   option(RECORDKEY_FIELD_OPT_KEY, "keyid").
   option(PARTITIONPATH_FIELD_OPT_KEY, "p").
   option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
   option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, 
classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
   option("hoodie.insert.shuffle.parallelism", "4").
   option("hoodie.upsert.shuffle.parallelism", "4").
   option("hoodie.parquet.small.file.limit", "0").
   option("hoodie.clustering.inline", "true").
   option("hoodie.clustering.inline.max.commits", "1").
   option("hoodie.clustering.plan.strategy.target.file.max.bytes", 
"1073741824").
   option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
   option("hoodie.clustering.plan.strategy.max.bytes.per.group", 
Long.MaxValue.toString)
   .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
   .mode(SaveMode.Append).save(basePath)
   
   spark.read.format("hudi")
   .load(basePath).select("age").where("keyid = 0").show(100, false)
   ----
   age
   ----
   1
   ---
   the result is wrong, since we update the value of age to 1001 at step 2.
   
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   new UT added
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to