stayrascal commented on a change in pull request #4724:
URL: https://github.com/apache/hudi/pull/4724#discussion_r801731047



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -141,27 +142,41 @@ public void snapshotState(FunctionSnapshotContext 
context) {
 
   @Override
   public void initializeState(FunctionInitializationContext context) {
-    ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
+    ValueStateDescriptor<HoodieRecord> indexStateDesc =

Review comment:
       Thanks @danny0405 for reviewing this.
   
   Yeah, as mentioned in this issue https://github.com/apache/hudi/issues/4030. 
   The reason why we need full record here is want to handle the case that the 
record partition path is changed.
   
   In the original logic, once the record partition path changed, will sink a 
delete record to old partition file to delete old record, and then sink the 
incoming record with new partition to new partition file, the final record will 
only contains the info from incoming record and miss the info from old record. 
(note the `OverwriteNonDefaultsWithLatestAvroPayload` also have this issue)
   
   So we need to retrieve the old/existing record from base file, and then 
merge/combine with incoming record, as currently we don't support lookup record 
from base file, so we have to store the old/existing in somewhere. e.g. fink 
state. `BucketAssignFunction` is the only place we can store the old/existing 
record and change its location from old partition file to new partition file.
   
   So the new logic is that:
   - store the old record(from source or base file by enable Bootstrap) in 
Flink state
   - once a new record coming with same record key but the partition changed
     - sink a delete record to old partition file to delete file
     - retrieve & copy old record from state and change its location with new 
partition, and sink to new partition file
     - the copied record and incoming record will be merged by `#preCombine`
   
   the drawback here is that it will increase the state size, but if we don't 
use the state to store full record, it seems that we don't have approach to 
merge incoming record with existing record in base file while partition change.
   
   I also consider this problem, what I'm thinking to avoid impact the current 
logic(overwrite with latest payload) is that create a `updateState` abstract 
method and treat `indexState` as a abstract filed, different sub class will 
implement the logic with `ValueState<HoodieRecordGlobalLocation>` or 
`ValueState<HoodieRecord>`, or a `StateHelper` to handle state operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to