[
https://issues.apache.org/jira/browse/HUDI-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
tao meng updated HUDI-1719:
---------------------------
Description:
now hudi use HoodieCombineHiveInputFormat to achieve Incremental query of the
mor table.
when we have some small files in different partitions,
HoodieCombineHiveInputFormat will combine those small file readers.
HoodieCombineHiveInputFormat build partition field base on the first file
reader in it, however now HoodieCombineHiveInputFormat holds other file
readers which come from different partitions.
When switching readers, we should update ioctx
test env:
spark2.4.5, hadoop 3.1.1, hive 3.1.1
test step:
step1:
val df = spark.range(0, 10000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(6))
.withColumn("a1", lit(Array[String]("sb1", "rz")))
.withColumn("a2", lit(Array[String]("sb1", "rz")))
// create hudi table which has three level partitions p,p1,p2
merge(df, 4, "default", "hive_8b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
step2:
val df = spark.range(0, 10000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String]("sb1", "rz")))
.withColumn("a2", lit(Array[String]("sb1", "rz")))
// upsert current table
merge(df, 4, "default", "hive_8b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")
hive beeline:
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hoodie.hive_8b.consume.mode=INCREMENTAL;
set hoodie.hive_8b.consume.max.commits=3;
set hoodie.hive_8b.consume.start.timestamp=20210325141300; // this timestamp is
smaller the earlist commit, so we can query whole commits
select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where
`_hoodie_commit_time`>'20210325141300' and `keyid` < 5;
query result:
+-----+----++-------------+
|p|p1|p2|keyid|
+-----+----++-------------+
|0|0|6|0|
|0|0|6|1|
|0|0|6|2|
|0|0|6|3|
|0|0|6|4|
|0|0|6|4|
|0|0|6|0|
|0|0|6|3|
|0|0|6|2|
|0|0|6|1|
+-----+----++-------------+
this result is wrong, since the second step we insert new data in table which
p2=7, however in the query result we cannot find p2=7, all p2= 6
was:
now hudi use HoodieCombineHiveInputFormat to achieve Incremental query of the
mor table.
when we have some small files in different partitions,
HoodieCombineHiveInputFormat will combine those small file readers.
HoodieCombineHiveInputFormat build partition field base on the first file
reader in it, however now HoodieCombineHiveInputFormat holds other file
readers which come from different partitions.
When switching readers, we should update ioctx
test env:
spark2.4.5, hadoop 3.1.1, hive 3.1.1
test step:
step1:
val df = spark.range(0, 10000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(6))
.withColumn("a1", lit(Array[String]("sb1", "rz")))
.withColumn("a2", lit(Array[String]("sb1", "rz")))
// create hudi table which has three level partitions p,p1,p2
merge(df, 4, "default", "hive_8b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
step2:
val df = spark.range(0, 10000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String]("sb1", "rz")))
.withColumn("a2", lit(Array[String]("sb1", "rz")))
// upsert current table
merge(df, 4, "default", "hive_8b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")
hive beeline:
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hoodie.hive_8b.consume.mode=INCREMENTAL;
set hoodie.hive_8b.consume.max.commits=3;
set hoodie.hive_8b.consume.start.timestamp=20210325141300;
select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where
`_hoodie_commit_time`>'20210325141300' and `keyid` < 5;
query result:
+----+-----+-----+--------+
| p | p1 | p2 | keyid |
+----+-----+-----+--------+
| 0 | 0 | 6 | 0 |
| 0 | 0 | 6 | 1 |
| 0 | 0 | 6 | 2 |
| 0 | 0 | 6 | 3 |
| 0 | 0 | 6 | 4 |
| 0 | 0 | 6 | 4 |
| 0 | 0 | 6 | 0 |
| 0 | 0 | 6 | 3 |
| 0 | 0 | 6 | 2 |
| 0 | 0 | 6 | 1 |
+----+-----+-----+--------+
this result is wrong, since the second step we insert new data in table which
p2=7, however in the query result we cannot find p2=7, all p2= 6
> hive on spark/mr,Incremental query of the mor table, the partition field is
> incorrect
> -------------------------------------------------------------------------------------
>
> Key: HUDI-1719
> URL: https://issues.apache.org/jira/browse/HUDI-1719
> Project: Apache Hudi
> Issue Type: Bug
> Components: Hive Integration
> Affects Versions: 0.7.0, 0.8.0
> Environment: spark2.4.5, hadoop 3.1.1, hive 3.1.1
> Reporter: tao meng
> Priority: Major
> Fix For: 0.9.0
>
>
> now hudi use HoodieCombineHiveInputFormat to achieve Incremental query of the
> mor table.
> when we have some small files in different partitions,
> HoodieCombineHiveInputFormat will combine those small file readers.
> HoodieCombineHiveInputFormat build partition field base on the first file
> reader in it, however now HoodieCombineHiveInputFormat holds other file
> readers which come from different partitions.
> When switching readers, we should update ioctx
> test env:
> spark2.4.5, hadoop 3.1.1, hive 3.1.1
> test step:
> step1:
> val df = spark.range(0, 10000).toDF("keyid")
> .withColumn("col3", expr("keyid + 10000000"))
> .withColumn("p", lit(0))
> .withColumn("p1", lit(0))
> .withColumn("p2", lit(6))
> .withColumn("a1", lit(Array[String]("sb1", "rz")))
> .withColumn("a2", lit(Array[String]("sb1", "rz")))
> // create hudi table which has three level partitions p,p1,p2
> merge(df, 4, "default", "hive_8b",
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
>
> step2:
> val df = spark.range(0, 10000).toDF("keyid")
> .withColumn("col3", expr("keyid + 10000000"))
> .withColumn("p", lit(0))
> .withColumn("p1", lit(0))
> .withColumn("p2", lit(7))
> .withColumn("a1", lit(Array[String]("sb1", "rz")))
> .withColumn("a2", lit(Array[String]("sb1", "rz")))
> // upsert current table
> merge(df, 4, "default", "hive_8b",
> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")
> hive beeline:
> set
> hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
> set hoodie.hive_8b.consume.mode=INCREMENTAL;
> set hoodie.hive_8b.consume.max.commits=3;
> set hoodie.hive_8b.consume.start.timestamp=20210325141300; // this timestamp
> is smaller the earlist commit, so we can query whole commits
> select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where
> `_hoodie_commit_time`>'20210325141300' and `keyid` < 5;
> query result:
> +-----+----++-------------+
> |p|p1|p2|keyid|
> +-----+----++-------------+
> |0|0|6|0|
> |0|0|6|1|
> |0|0|6|2|
> |0|0|6|3|
> |0|0|6|4|
> |0|0|6|4|
> |0|0|6|0|
> |0|0|6|3|
> |0|0|6|2|
> |0|0|6|1|
> +-----+----++-------------+
> this result is wrong, since the second step we insert new data in table which
> p2=7, however in the query result we cannot find p2=7, all p2= 6
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)