[ 
https://issues.apache.org/jira/browse/HUDI-2058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udit Mehrotra resolved HUDI-2058.
---------------------------------
    Resolution: Fixed

> support incremental query for insert_overwrite_table/insert_overwrite 
> operation on cow table
> --------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2058
>                 URL: https://issues.apache.org/jira/browse/HUDI-2058
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Incremental Pull
>    Affects Versions: 0.8.0
>         Environment: hadoop 3.1.1
> spark3.1.1
> hive 3.1.1
>            Reporter: tao meng
>            Assignee: tao meng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
>  when  incremental query contains multiple commit before and after 
> replacecommit, and the query result contains the data of the old file. 
> Notice: mor table is ok, only cow table has this problem.
>  
> when query incr_view for cow table, replacecommit is ignored which lead the 
> wrong result. 
>  
>  
> test step:
> step1:  create dataFrame
> val df = spark.range(0, 10).toDF("keyid")
>  .withColumn("col3", expr("keyid"))
>  .withColumn("age", lit(1))
>  .withColumn("p", lit(2))
>  
> step2:  insert df to a empty hoodie table
> df.write.format("hudi").
>  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
>  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
>  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
>  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
>  option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
> "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
>  option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
>  option("hoodie.insert.shuffle.parallelism", "4").
>  option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
>  .mode(SaveMode.Overwrite).save(basePath)
>  
> step3: do insert_overwrite
> df.write.format("hudi").
>  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
>  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
>  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
>  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
>  option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
> "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
>  option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert_overwrite_table").
>  option("hoodie.insert.shuffle.parallelism", "4").
>  option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
>  .mode(SaveMode.Append).save(basePath)
>  
> step4: query incrematal table 
> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
>  .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "0000")
>  .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, currentCommits(0))
>  .load(basePath).select("keyid").orderBy("keyid").show(100, false)
>  
> result:   the result contains old data
> +-----+
> |keyid|
> +-----+
> |0 |
> |0 |
> |1 |
> |1 |
> |2 |
> |2 |
> |3 |
> |3 |
> |4 |
> |4 |
> |5 |
> |5 |
> |6 |
> |6 |
> |7 |
> |7 |
> |8 |
> |8 |
> |9 |
> |9 |
> +-----+
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to