[
https://issues.apache.org/jira/browse/HUDI-480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213102#comment-17213102
]
cdmikechen edited comment on HUDI-480 at 10/14/20, 6:55 AM:
------------------------------------------------------------
[~vinoth]
[~yanghua]
Sorry for starting to discuss this topic as long as so far. I have considered a
general implementation based on spark to complete incremental query for deleted
data:
1. We need to create a new method that use *HoodieWriteStat.getNumDeletes()* to
filter files which contains delete rows.
2. We also need to create a new method that use
*HoodieWriteStat.getPrevCommit()* to find previous file with same fileId.
3. Creating a new method that check current files and previous files with same
fileId, and then filter out deleted rows. The implementation method is similar
to spark merge method. It builds an *ExternalSpillableMap* (including COW
parquet and MOR log) of the current file datasets, and then determines whether
it is in prev *ExternalSpillableMap*. If the old key does not exist in
*ExternalSpillableMap*, it is a deleted row.
4. Add a new column named *_ hoodie_ delete_* which is the commit time of the
current files. It is used to indicate the deletion time.
5. Add a new incremental view called *DELETE* , we can add a new type
*DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL*
{code:scala}
Dataset<Row> hudiIncQueryDF = spark.read()
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),
DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),
<beginInstantTime>)
.load(tablePath);
hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select * from hudi_trips_incremental where fare > 20.0").show()
{code}
I haven't figured out how to implement it in Hive yet. But I figured out how to
query in Hive.
We can add a mode called 'DELETE'. Delete views in Hive can used by this:
{code:sql}
set hoodie.tableName.consume.mode=DELETE
set hoodie.tableName.consume.start.timestamp=20101013232359
set hoodie.tableName.consume.max.commits=1
{code}
was (Author: chenxiang):
[~vinoth]
[~yanghua]
Sorry for starting to discuss this topic as long as so far. I have considered a
general implementation based on spark to complete incremental query for deleted
data:
1. We need to create a new method that use *HoodieWriteStat.getNumDeletes()* to
filter files which contains delete rows.
2. We also need to create a new method that use
*HoodieWriteStat.getPrevCommit()* to find previous file with same fileId.
3. Creating a new method that check current files and previous files with same
fileId, and then filter out deleted rows. The implementation method is similar
to spark merge method. It builds an *ExternalSpillableMap* (including COW
parquet and MOR log) of the current file datasets, and then determines whether
it is in prev *ExternalSpillableMap*. If the old key does not exist in
*ExternalSpillableMap*, it is a deleted row.
4. Add a new column named _ hoodie_ delete_ which is the commit time of the
current files. It is used to indicate the deletion time.
5. Delete mode of incremental view, we can add a new type
*DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL*
{code:scala}
Dataset<Row> hudiIncQueryDF = spark.read()
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),
DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),
<beginInstantTime>)
.load(tablePath);
hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select * from hudi_trips_incremental where fare > 20.0").show()
{code}
I haven't figured out how to implement it in Hive yet. But I figured out how to
query in Hive.
We can add a mode called 'DELETE'. Delete views in Hive can used by this:
{code:sql}
set hoodie.tableName.consume.mode=DELETE
set hoodie.tableName.consume.start.timestamp=20101013232359
set hoodie.tableName.consume.max.commits=1
{code}
> Support a querying delete data methond in incremental view
> ----------------------------------------------------------
>
> Key: HUDI-480
> URL: https://issues.apache.org/jira/browse/HUDI-480
> Project: Apache Hudi
> Issue Type: Improvement
> Components: Incremental Pull
> Reporter: cdmikechen
> Assignee: cdmikechen
> Priority: Minor
>
> As we known, hudi have supported many method to query data in Spark and Hive
> and Presto. And it also provides a very good timeline idea to trace changes
> in data, and it can be used to query incremental data in incremental view.
> In old time, we just have insert and update funciton to upsert data, and now
> we have added new functions to delete some existing data.
> *[HUDI-328] Adding delete api to HoodieWriteClient*
> https://github.com/apache/incubator-hudi/pull/1004
> *[HUDI-377] Adding Delete() support to
> DeltaStreamer**https://github.com/apache/incubator-hudi/pull/1073
> So I think if we have delete api, should we add another method to get deleted
> data in incremental view?
> I've looked at the methods for generating new parquet files. I think the main
> idea is to combine old and new data, and then filter the data which need to
> be deleted, so that the deleted data does not exist in the new dataset.
> However, in this way, the data to be deleted will not be retained in new
> dataset, so that only the inserted or modified data can be found according to
> the existing timestamp field during data tracing in incremental view.
> If we can do it, I feel that there are two ideas to consider:
> 1. Trace the dataset in the same file at different time check points
> according to the timeline, compare the two datasets according to the key and
> filter out the deleted data. This method does not consume extra when writing,
> but it needs to call the analysis function according to the actual request
> during query, which consumes a lot.
> 2. When writing data, if there is any deleted data, we will record it. File
> name such as *.delete_filename_version_timestamp*. So that we can immediately
> give feedback according to the time. But additional processing will be done
> at the time of writing.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)