[
https://issues.apache.org/jira/browse/HUDI-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan reassigned HUDI-1719:
-----------------------------------------
Assignee: tao meng
> hive on spark/mr,Incremental query of the mor table, the partition field is
> incorrect
> -------------------------------------------------------------------------------------
>
> Key: HUDI-1719
> URL: https://issues.apache.org/jira/browse/HUDI-1719
> Project: Apache Hudi
> Issue Type: Bug
> Components: Hive Integration
> Affects Versions: 0.7.0, 0.8.0
> Environment: spark2.4.5, hadoop 3.1.1, hive 3.1.1
> Reporter: tao meng
> Assignee: tao meng
> Priority: Major
> Labels: pull-request-available, sev:critical, user-support-issues
> Fix For: 0.9.0
>
>
> now hudi use HoodieCombineHiveInputFormat to achieve Incremental query of the
> mor table.
> when we have some small files in different partitions,
> HoodieCombineHiveInputFormat will combine those small file readers.
> HoodieCombineHiveInputFormat build partition field base on the first file
> reader in it, however now HoodieCombineHiveInputFormat holds other file
> readers which come from different partitions.
> When switching readers, we should update ioctx
> test env:
> spark2.4.5, hadoop 3.1.1, hive 3.1.1
> test step:
> step1:
> val df = spark.range(0, 10000).toDF("keyid")
> .withColumn("col3", expr("keyid + 10000000"))
> .withColumn("p", lit(0))
> .withColumn("p1", lit(0))
> .withColumn("p2", lit(6))
> .withColumn("a1", lit(Array[String]("sb1", "rz")))
> .withColumn("a2", lit(Array[String]("sb1", "rz")))
> // create hudi table which has three level partitions p,p1,p2
> merge(df, 4, "default", "hive_8b",
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
>
> step2:
> val df = spark.range(0, 10000).toDF("keyid")
> .withColumn("col3", expr("keyid + 10000000"))
> .withColumn("p", lit(0))
> .withColumn("p1", lit(0))
> .withColumn("p2", lit(7))
> .withColumn("a1", lit(Array[String]("sb1", "rz")))
> .withColumn("a2", lit(Array[String]("sb1", "rz")))
> // upsert current table
> merge(df, 4, "default", "hive_8b",
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")
> hive beeline:
> set
> hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
> set hoodie.hive_8b.consume.mode=INCREMENTAL;
> set hoodie.hive_8b.consume.max.commits=3;
> set hoodie.hive_8b.consume.start.timestamp=20210325141300; // this timestamp
> is smaller the earlist commit, so we can query whole commits
> select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where
> `_hoodie_commit_time`>'20210325141300' and `keyid` < 5;
> query result:
> +-----+----++-------------+
> |p|p1|p2|keyid|
> +-----+----++-------------+
> |0|0|6|0|
> |0|0|6|1|
> |0|0|6|2|
> |0|0|6|3|
> |0|0|6|4|
> |0|0|6|4|
> |0|0|6|0|
> |0|0|6|3|
> |0|0|6|2|
> |0|0|6|1|
> +-----+----++-------------+
> this result is wrong, since the second step we insert new data in table which
> p2=7, however in the query result we cannot find p2=7, all p2= 6
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)