stayrascal commented on a change in pull request #4724:
URL: https://github.com/apache/hudi/pull/4724#discussion_r801731047
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -141,27 +142,41 @@ public void snapshotState(FunctionSnapshotContext
context) {
@Override
public void initializeState(FunctionInitializationContext context) {
- ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
+ ValueStateDescriptor<HoodieRecord> indexStateDesc =
Review comment:
Thanks @danny0405 for reviewing this.
Yeah, as mentioned in this issue https://github.com/apache/hudi/issues/4030.
The reason why we need full record here is want to handle the case that the
record partition path is changed.
In the original logic, once the record partition path changed, will sink a
delete record to old partition file to delete old record, and then sink the
incoming record with new partition to new partition file, the final record will
only contains the info from incoming record and miss the info from old record.
(note the `OverwriteNonDefaultsWithLatestAvroPayload` also have this issue)
So we need to retrieve the old/existing record from base file, and then
merge/combine with incoming record, as currently we don't support lookup record
from base file, so we have to store the old/existing in somewhere. e.g. fink
state. `BucketAssignFunction` is the only place we can store the old/existing
record and change its location from old partition file to new partition file.
So the new logic is that:
- store the old record(from source or base file by enable Bootstrap) in
Flink state
- once a new record coming with same record key but the partition changed
- sink a delete record to old partition file to delete file
- retrieve & copy old record from state and change its location with new
partition, and sink to new partition file
- the copied record and incoming record will be merged by `#preCombine`
the drawback here is that it will increase the state size, but if we don't
use the state to store full record, it seems that we don't have approach to
merge incoming record with existing record in base file while partition change.
I also consider this problem, what I'm thinking to avoid impact the current
logic(overwrite with latest payload) is that create a `updateState` abstract
method and treat `indexState` as a abstract filed, different sub class will
implement the logic with `ValueState<HoodieRecordGlobalLocation>` or
`ValueState<HoodieRecord>`, or a `StateHelper` to handle state operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]