tao meng created HUDI-2058:
------------------------------

             Summary: support incremental query for 
insert_overwrite_table/insert_overwrite operation on cow table
                 Key: HUDI-2058
                 URL: https://issues.apache.org/jira/browse/HUDI-2058
             Project: Apache Hudi
          Issue Type: Bug
          Components: Incremental Pull
    Affects Versions: 0.8.0
         Environment: hadoop 3.1.1
spark3.1.1
hive 3.1.1
            Reporter: tao meng
            Assignee: tao meng
             Fix For: 0.9.0


 when  incremental query contains multiple commit before and after 
replacecommit, and the query result contains the data of the old file. Notice: 
mor table is ok, only cow table has this problem.
 
when query incr_view for cow table, replacecommit is ignored which lead the 
wrong result. 
 
 
test step:
step1:  create dataFrame
val df = spark.range(0, 10).toDF("keyid")
 .withColumn("col3", expr("keyid"))
 .withColumn("age", lit(1))
 .withColumn("p", lit(2))
 
step2:  insert df to a empty hoodie table
df.write.format("hudi").
 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
 option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
 option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
 option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
 option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator").
 option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
 option("hoodie.insert.shuffle.parallelism", "4").
 option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
 .mode(SaveMode.Overwrite).save(basePath)
 
step3: do insert_overwrite
df.write.format("hudi").
 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
 option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3").
 option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid").
 option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "").
 option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator").
 option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert_overwrite_table").
 option("hoodie.insert.shuffle.parallelism", "4").
 option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
 .mode(SaveMode.Append).save(basePath)
 
step4: query incrematal table 
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
 .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "0000")
 .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, currentCommits(0))
 .load(basePath).select("keyid").orderBy("keyid").show(100, false)
 
result:   the result contains old data
+-----+
|keyid|
+-----+
|0 |
|0 |
|1 |
|1 |
|2 |
|2 |
|3 |
|3 |
|4 |
|4 |
|5 |
|5 |
|6 |
|6 |
|7 |
|7 |
|8 |
|8 |
|9 |
|9 |
+-----+
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to