tao meng created HUDI-2086:
------------------------------

             Summary: redo the logical of mor_incremental_view for hive
                 Key: HUDI-2086
                 URL: https://issues.apache.org/jira/browse/HUDI-2086
             Project: Apache Hudi
          Issue Type: Bug
          Components: Hive Integration
         Environment: spark3.1.1
hive3.1.1
hadoop3.1.1
os: suse
            Reporter: tao meng


now ,There are some problems with mor_incremental_view for hive。

For example,

1):*hudi cannot read the lastest incremental datas which are stored by logs*

think that:  create a mor table with bulk_insert, and then do upsert for this 
table, 

no we want to query the latest incremental data by hive/sparksql,   however the 
lastest incremental datas are stored by logs,   when we do query nothings will 
return

step1: prepare data

val df = spark.sparkContext.parallelize(0 to 20, 2).map(x => testCase(x, 
x+"jack", Random.nextInt(2))).toDF()
 .withColumn("col3", expr("keyid + 3000"))
 .withColumn("p", lit(1))

step2: do bulk_insert

mergePartitionTable(df, 4, "default", "inc", tableType = 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")

step3: do upsert

mergePartitionTable(df, 4, "default", "inc", tableType = 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")

step4:  check the lastest commit time and do query

spark.sql("set hoodie.inc.consume.mode=INCREMENTAL")
spark.sql("set hoodie.inc.consume.max.commits=1")
spark.sql("set hoodie.inc.consume.start.timestamp=20210628103935")
spark.sql("select keyid, col3 from inc_rt where `_hoodie_commit_time` > 
'20210628103935' order by keyid").show(100, false)

+-----+----+
|keyid|col3|
+-----+----+
+-----+----+

 

2):*if we do insert_over_write/insert_over_write_table for hudi mor table, the 
incr query result is wrong when we want to query the data before 
insert_overwrite/insert_overwrite_table*

step1: do bulk_insert 

mergePartitionTable(df, 4, "default", "overInc", tableType = 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")

now the commits is

[20210628160614.deltacommit ]

step2: do insert_overwrite_table

mergePartitionTable(df, 4, "default", "overInc", tableType = 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "insert_overwrite_table")

now the commits is

[20210628160614.deltacommit, 20210628160923.replacecommit ]

step3: query the data before insert_overwrite_table

spark.sql("set hoodie.overInc.consume.mode=INCREMENTAL")
spark.sql("set hoodie.overInc.consume.max.commits=1")
spark.sql("set hoodie.overInc.consume.start.timestamp=0")
spark.sql("select keyid, col3 from overInc_rt where `_hoodie_commit_time` > '0' 
order by keyid").show(100, false)

+-----+----+
|keyid|col3|
+-----+----+
+-----+----+

 

3) *hive/presto/flink  cannot read  file groups which has only logs*

when we use hbase/inmemory as index, mor table will produce log files instead 
of parquet file, but now hive/presto cannot read those files since those files 
are log files.

*HUDI-2048* mentions this problem.

 

however when we use spark data source to executre incremental query, there is 
no such problem above。keep the logical of mor_incremental_view for hive as the 
same logicl as spark dataSource is necessary。

we redo the logical of mor_incremental_view for hive,to solve above problems 
and keep the logical of mor_incremental_view  as the same logicl as spark 
dataSource

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to