tao meng created HUDI-2058: ------------------------------ Summary: support incremental query for insert_overwrite_table/insert_overwrite operation on cow table Key: HUDI-2058 URL: https://issues.apache.org/jira/browse/HUDI-2058 Project: Apache Hudi Issue Type: Bug Components: Incremental Pull Affects Versions: 0.8.0 Environment: hadoop 3.1.1 spark3.1.1 hive 3.1.1 Reporter: tao meng Assignee: tao meng Fix For: 0.9.0
when incremental query contains multiple commit before and after replacecommit, and the query result contains the data of the old file. Notice: mor table is ok, only cow table has this problem. when query incr_view for cow table, replacecommit is ignored which lead the wrong result. test step: step1: create dataFrame val df = spark.range(0, 10).toDF("keyid") .withColumn("col3", expr("keyid")) .withColumn("age", lit(1)) .withColumn("p", lit(2)) step2: insert df to a empty hoodie table df.write.format("hudi"). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL). option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3"). option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid"). option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, ""). option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert"). option("hoodie.insert.shuffle.parallelism", "4"). option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") .mode(SaveMode.Overwrite).save(basePath) step3: do insert_overwrite df.write.format("hudi"). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL). option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3"). option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "keyid"). option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, ""). option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert_overwrite_table"). option("hoodie.insert.shuffle.parallelism", "4"). option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") .mode(SaveMode.Append).save(basePath) step4: query incrematal table spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "0000") .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, currentCommits(0)) .load(basePath).select("keyid").orderBy("keyid").show(100, false) result: the result contains old data +-----+ |keyid| +-----+ |0 | |0 | |1 | |1 | |2 | |2 | |3 | |3 | |4 | |4 | |5 | |5 | |6 | |6 | |7 | |7 | |8 | |8 | |9 | |9 | +-----+ -- This message was sent by Atlassian Jira (v8.3.4#803005)