CTTY commented on code in PR #12195:
URL: https://github.com/apache/hudi/pull/12195#discussion_r1827198929


##########
rfc/rfc-83/rfc-83.md:
##########
@@ -0,0 +1,209 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-83: Incremental Query with Completion Time
+
+## Proposers
+
+- @yihua
+- @CTTY
+
+## Approvers
+- @<approver1 github username>
+- @<approver2 github username>
+
+## Status
+
+JIRA: 
+- 
[HUDI-8141](https://issues.apache.org/jira/browse/HUDI-8141)(Closed)([PR](https://github.com/apache/hudi/pull/11947))
+- [HUDI-8354](https://issues.apache.org/jira/browse/HUDI-8354)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Hudi incremental queries require scanning the timeline to find the relevant 
commits.
+Current incremental queries use commits' start times(instant times) to sort 
the timeline and find eligible commits, 
+but this introduces the necessity of handling hollow commits, which 
complicates the incremental logic.
+
+Since Hudi 1.x, each instant has two important timestamps: start time and 
completion time. 
+This change makes an instant a "range" rather than a single "dot" in the 
timeline, enabling finer control and management.
+
+This RFC aims to change Hudi incremental-related logic to use commits' 
completion times by default instead of start times 
+to simplify the timeline scanning logic and improve the overall incremental 
experience.
+
+## Background
+### Problem 1: Hollow Commits
+Let's take a look at the example Hudi timeline below:
+```
+Timeline —----------------------
+C1              [------]
+C2                    [---------
+C3                       [----]
+```
+In the timeline above, there are three Hudi commits: C1, C2, and C3. C1 and C3 
are completed instants, while C2 is still inflight.
+Suppose a query is scanning the entire timeline that includes all three 
commits, then C2 would be considered a hollow commit 
+as it hasn't completed yet. 
+
+Incremental query relies on checkpointing, which also uses start times, and 
will return a new checkpoint when it finishes.
+If the query reads to C3, the next checkpoint will be `C3.start`, and future 
incremental queries will only read commits that start after `C3.start`.
+However, if C2 completes later, it will never be read, as the checkpoint has 
already moved to `C3.start`. 
+This will cause the data loss.
+```
+If we ignore the hollow commit, then the checkpoint would be moved to C3.start
+Timeline —----------------------
+C1              [------]
+C2                    [---------++++]   C2 would be skipped even if it 
completes normally later
+C3                       [----]
+                         *
+                     Checkpoint
+```
+
+To avoid data loss, Hudi can either 1) only read until the hollow commit or 2) 
fail the incremental query. Either of these two 
+ways will require Hudi to detect hollow commits first by scanning the timeline 
before running the actual incremental read, 
+which is not ideal.
+
+Let's use the same example here again but this time we will use completion 
time as the checkpoint.
+```
+                                Now
+                                 *
+Timeline —------------------------
+C1              [------]
+C2                    [-----------++++] 
+C3                       [----]
+                              *
+                         Checkpoint
+```
+Assuming the input time range covers all three commits, C1 and C3 would be 
eligible as of now. The checkpoint 
+would be moved to `C3.completion`. Even if C2 completes some time in the 
future, `C2.completion` would still be 
+larger than `C3.completion` thus C2 can be picked up by future incremental 
query without issues. With this approach, 
+we no longer need to concern about handling hollow commits when running 
incremental queries.
+
+### Problem 2: Cloud Events Source Incremental Query
+Existing Hudi supports an S3/GCS events incremental source, which is tightly 
coupled with `QueryInfo`, 
+using instant start times throughout.
+To better support the new incremental assumptions and cleaner modularization, 
+it's necessary to refactor the existing cloud events incremental classes and 
provide a unified interface that 
+can work across Hudi incremental source, Flink incremental source, and cloud 
events incremental source.
+
+For the background, let's first review how existing cloud events source 
operate. We will cover the refactoring
+design in the implementation section.
+
+The existing cloud events source consists of five main steps:
+1. Generate `QueryInfo` based on last checkpoint and configs
+2. Pass `QueryInfo` to `QueryRunner` and get events info from Hudi cloud 
metadata table
+3. Apply filters (e.g. source limiting filters, file filters, etc.) to events 
info returned by `QueryRunner`
+4. Fetch the cloud data from the actual Hudi data table with cloud events info
+5. Post actions like dropping Hudi meta fields from returned dataset, 
reporting metrics, setting the next checkpoint, etc.
+
+These steps can be concluded in the following diagram:
+![old_events_source](./old_events_source.png)
+
+
+## Implementation
+### Integrate with IncrementalQueryAnalyzer
+#### Glossary
+- `IncrementalQueryAnalyzer`: An existing Hudi class that is solely used by 
Flink's incremental logic. 
+It helps read and filter the timeline to obtain commits that fall in 
completion time range.
+- `QueryContext`: The result of `IncrementalQueryAnalyzer#analyze`, it 
contains a list of eligible commits.
+- `IncrementalRelation`: Hudi's main class of incremental query logic. It 
accepts a time range (start, end] 
+and helps scan the timeline to find eligible commits and use commits metadata 
to fetch the actual table data.
+- `HoodieIncrSource`: An implementation of Hudi's `RowSource` that helps fetch 
incremental data from the source 
+Hudi table. This class was the focus of 
[HUDI-8141](https://issues.apache.org/jira/browse/HUDI-8141).
+- `QueryInfo`: This class contains the logic of finding timeline based on 
start time range.
+
+`HoodieIncrSource` analyzes the table timeline and generates `QueryInfo` 
containing start and end instant times. 
+This range would be passed to `IncrementalRelation` for further analyzing and 
filtering.
+`IncrementalRelation` will fetch available commits within the range and build 
a scan RDD that covers the incremental data.
+The filter would be applied on the returned dataset: 
+```
+spark.read.load(...)
+...
+.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
startInstantTime))
+.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
endInstantTime))
+```
+
+Switching to completion time introduces the following key changes:
+1. `HoodieIncrSource` would use `IncrementalQueryAnalyzer` and `QueryContext` 
that work under completion time
+semantics instead of using `QueryInfo`.
+2. For filtering, we cannot rely on some range like (start, end] because there 
may be hollow commits in the range.
+We will need to rely on the list of eligible commits stored in the 
`QueryContext`, so the new filter would look like this:
+```
+.filter(String.format("%s IN ('%s')", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+              String.join("','", instantTimeList)));
+```
+This change is enough to cover incremental query logic for `HoodieIncrSource` 
and `IncrementalRelation`.
+After the change, now `HoodieIncrSource` would behave like this:
+1. Use the last checkpoint and query configs to build 
`IncrementalQueryAnalyzer` and get `QueryContext`.
+2. Run snapshot/incremental query with the info from `QueryContext` and this 
will trigger the underlying `IncrementalRelation`.
+3. Filter the returned data with plugins like `SnapshotLoadQuerySplitter`, 
drop metadata fields, report metrics, etc.
+
+### Refactor Cloud Events Incremental Source
+#### Incremental Logic Abstraction
+Now let's move on to cloud events incremental source for S3/GCS. Due to its 
vastly different logic, 
+it's very challenging to plug in `IncrementalQueryAnalyzer` and migrate logic 
we have for `HoodieIncrSource` to 
+cloud events incremental source. Even if we managed to do so, the code would 
not be maintainable. Hence, a refactoring is necessary.
+
+We've already covered the updated logic flow of `HoodieIncrSource` and cloud 
events incremental source.
+Comparison of these sources reveals three abstract steps:
+- Get metadata
+- Fetch data
+- Post actions
+
+The following is a table to describe what `HoodieIncrSource` and cloud events 
incremental source need to do for each step.
+
+| *                        | Get Metadata                                      
                                                                                
                                                         | Fetch Data           
                                                              | Post Actions    
                                     |
+|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------|------------------------------------------------------|
+| `HoodieIncrSource`       | - Pass checkpoint and query config to 
`IncrementalQueryAnalyzer` and get `QueryContext`                               
                                                                     | - Use 
`QueryContext` and `IncrementalRelation` to query data                       | 
- Filter the result<br/>- Report metrics<br/> - etc. |
+| Cloud Events Incr Source | - Pass checkpoint and query config to 
`IncrementalQueryAnalyzer` and get `QueryContext`<br/>- Fetch eligible 
`CloudObjectMetadata` from cloud metadata Hudi table<br/>- Filter the results | 
- Use `QueryContext` and `CloudObjectMetadata` to fetch objects from cloud 
storage | - Report metrics<br/>- etc.                          |
+
+Note that Flink's `IncrementalInputSplits` can also be abstracted into these 
flow, but it's not in the scope of this RFC.
+
+#### IncrementalQueryRunner
+With the abstraction above, we can come up with the interface 
`IncrementalQueryRunner`.
+```java
+public interface IncrementalQueryRunner<M, D> {
+  
+  M getMetadata();
+
+  D fetchData();

Review Comment:
   I think the `D` here can be a single row, it mainly depends on the 
implementation. If we are talking about buffering then I think we will need to 
add more plugins like `IncrementalQueryBuffer` and put it into the 
implementation of `IncrementalQueryRunner`. 
   
   Do you have any examples/code code I can refer to?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to