voonhous commented on code in PR #6856:
URL: https://github.com/apache/hudi/pull/6856#discussion_r990894756
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -279,8 +279,9 @@ private FlinkOptions() {
.defaultValue(false)// default read as batch
.withDescription("Whether to skip compaction instants for streaming
read,\n"
+ "there are two cases that this option can be used to avoid reading
duplicates:\n"
- + "1) you are definitely sure that the consumer reads faster than
any compaction instants, "
- + "usually with delta time compaction strategy that is long enough,
for e.g, one week;\n"
+ + "1) `hoodie.compaction.preserve.commit.metadata` is set to `false`
and you are definitely sure that the "
+ + "consumer reads faster than any compaction instants, usually with
delta time compaction strategy that is "
Review Comment:
Understand that `hoodie.compaction.preserve.commit.metadata` is `true` by
default.
The default values for the configuration keys of interest are as such:
```properties
hoodie.compaction.preserve.commit.metadata=true
read.streaming.skip_compaction=false
```
The phrasing of the `read.streaming.skip_compaction` configuration's
description is VERY CONFUSING.
As of now, the description is as such:
```txt
Whether to skip compaction instants for streaming read, there are two cases
that this option can be used to avoid reading duplicates:
1) you are definitely sure that the consumer reads faster than any
compaction instants, usually with delta time compaction strategy that is long
enough, for e.g, one week;
2) changelog mode is enabled, this option is a solution to keep data
integrity
```
What I understand from this is:
You can enable (set the configuration value to true)
`read.streaming.skip_compaction` to prevent reading of duplicates under these 2
cases:
1) you are definitely sure that the consumer reads faster than any
compaction instants, usually with delta time compaction strategy that is long
enough, for e.g, one week;
# Consumer reads FASTER than compaction instants
Consumer reads faster than any compaction instants would mean that
compaction is slower than consumer read.
As such, compaction will complete after reading. A concrete example is shown
below. I am not sure if I am understanding is correct. Please correct me for
any conceptual mistakes made.
## Read iteration 1
Say the commit timeline looks like this at read iteration 1:
1. compaction plan at `instant-0` is created but still ongoing
2. deltacommit at `instant-1` has completed
The read consumer will only read out newly inserted rows in deltacommit @
`instant-1`.
Issued instant is updated as such:
```txt
issuedInstant=1
```
**Timeline when performing read iteration 1:**
```
0.compaction.requested
0.compaction.inflight
1.deltacommit.requested
1.deltacommit.inflight
1.deltacommit
```
### Read iteration 2
on read iteration 2:
1. compaction plan at `instant-0` has completed
2. deltacommit at `instant-2` has completed
Since the `issuedInstant=1`, InstantRange is as such:
```txt
InstantRange[type=OPEN_CLOSE, start=1, end=2]
```
The read consumer will read out newly inserted rows in deltacommit @
`instant-2`, at this point, the rows generated by the base parquet files at
compaction `instant-0` will be ignored since the data file iterators will skip
rows that have an `_hoodie_commit_instant` that lies outside the instantRange.
**Timeline when performing read iteration 2:**
```
0.compaction.requested
0.compaction.inlifght
0.commit
1.deltacommit.requested
1.deltacommit.inflight
1.deltacommit
2.deltacommit.requested
2.deltacommit.inflight
2.deltacommit
```
## Configuration description
Circling back to the configuration description, when a consumer is reading
faster than a compaction instant, the possibility of duplicated data being read
out (due to them existing in the base parquet file and delta log file) is
virtually 0.
`read.streaming.skip_compaction` SHOULD NOT be used to avoid duplicates if
**consumer reads faster than any compaction instants**. Hence, explaining why i
feel the original description is misleading.
## Proposed change
That being said, my phrasing in the initial change is pretty misleading too.
What I was trying to say is:
The `read.streaming.skip_compaction` should only be enabled if
`hoodie.compaction.preserve.commit.metadata` is modified to its non-default
value of false IFF compaction plan completes before a deltacommit to be read in
the next read iteration.
Building upon the previous examples, suppose a read iteration 3 is performed
with the following configurations:
```properties
hoodie.compaction.preserve.commit.metadata=false (non-default value)
read.streaming.skip_compaction=false (default value)
```
**Timeline when performing read iteration 3:**
`instant-3` and `instant-4` has completed and will be read in during read
iteration 3.
```txt
3.compaction.requested
3.compaction.inflight
3.commit
4.deltacommit.requested
4.deltacommit.inflight
4.deltacommit
issuedInstant=2
InstantRange[type=OPEN_CLOSE, start=2, end=4]
```
At this point, the newly compacted rows (which have already been read during
read iteration 2) in the base-parquet files generated in `instant-3` will have
a `_hoodie_commit_time` of `3`. (commit metadata is not preserved, hence
overwritten)
This falls within the InstantRange in the current read iteration, causing
the records that have been read in read iteration 2 to be read out again,
causing duplicated data to be read out.
As such, `read.streaming.skip_compaction` should be used to avoid reading
duplicates under such a case when the user is definitely sure that the
compaction instants are completing faster than / before the next deltacommit to
be read in.
# Disclaimer
I am ignoring changelog mode's description as I have yet to test this
configuration under such a use case.
We will also need to sync up the changes here with #6855
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]