tao meng created HUDI-2086:
------------------------------
Summary: redo the logical of mor_incremental_view for hive
Key: HUDI-2086
URL: https://issues.apache.org/jira/browse/HUDI-2086
Project: Apache Hudi
Issue Type: Bug
Components: Hive Integration
Environment: spark3.1.1
hive3.1.1
hadoop3.1.1
os: suse
Reporter: tao meng
now ,There are some problems with mor_incremental_view for hive。
For example,
1):*hudi cannot read the lastest incremental datas which are stored by logs*
think that: create a mor table with bulk_insert, and then do upsert for this
table,
no we want to query the latest incremental data by hive/sparksql, however the
lastest incremental datas are stored by logs, when we do query nothings will
return
step1: prepare data
val df = spark.sparkContext.parallelize(0 to 20, 2).map(x => testCase(x,
x+"jack", Random.nextInt(2))).toDF()
.withColumn("col3", expr("keyid + 3000"))
.withColumn("p", lit(1))
step2: do bulk_insert
mergePartitionTable(df, 4, "default", "inc", tableType =
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
step3: do upsert
mergePartitionTable(df, 4, "default", "inc", tableType =
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")
step4: check the lastest commit time and do query
spark.sql("set hoodie.inc.consume.mode=INCREMENTAL")
spark.sql("set hoodie.inc.consume.max.commits=1")
spark.sql("set hoodie.inc.consume.start.timestamp=20210628103935")
spark.sql("select keyid, col3 from inc_rt where `_hoodie_commit_time` >
'20210628103935' order by keyid").show(100, false)
+-----+----+
|keyid|col3|
+-----+----+
+-----+----+
2):*if we do insert_over_write/insert_over_write_table for hudi mor table, the
incr query result is wrong when we want to query the data before
insert_overwrite/insert_overwrite_table*
step1: do bulk_insert
mergePartitionTable(df, 4, "default", "overInc", tableType =
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
now the commits is
[20210628160614.deltacommit ]
step2: do insert_overwrite_table
mergePartitionTable(df, 4, "default", "overInc", tableType =
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "insert_overwrite_table")
now the commits is
[20210628160614.deltacommit, 20210628160923.replacecommit ]
step3: query the data before insert_overwrite_table
spark.sql("set hoodie.overInc.consume.mode=INCREMENTAL")
spark.sql("set hoodie.overInc.consume.max.commits=1")
spark.sql("set hoodie.overInc.consume.start.timestamp=0")
spark.sql("select keyid, col3 from overInc_rt where `_hoodie_commit_time` > '0'
order by keyid").show(100, false)
+-----+----+
|keyid|col3|
+-----+----+
+-----+----+
3) *hive/presto/flink cannot read file groups which has only logs*
when we use hbase/inmemory as index, mor table will produce log files instead
of parquet file, but now hive/presto cannot read those files since those files
are log files.
*HUDI-2048* mentions this problem.
however when we use spark data source to executre incremental query, there is
no such problem above。keep the logical of mor_incremental_view for hive as the
same logicl as spark dataSource is necessary。
we redo the logical of mor_incremental_view for hive,to solve above problems
and keep the logical of mor_incremental_view as the same logicl as spark
dataSource
--
This message was sent by Atlassian Jira
(v8.3.4#803005)