garyli1019 commented on a change in pull request #2958:
URL: https://github.com/apache/hudi/pull/2958#discussion_r633391698
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -90,7 +92,7 @@
* <li>If it does not, use the {@link BucketAssigner} to generate a new
bucket ID</li>
* </ul>
*/
- private MapState<HoodieKey, HoodieRecordLocation> indexState;
+ private MapState<String, GlobalHoodieRecordLocation> indexState;
Review comment:
> > Let me give an example. When a record changes the partition, we will
create a delete record to the old location and update the index with the new
location, but this index state was globally distributed and only one
parallelism will be affected. If we change the parallelism and restart the job,
this record may be in different parallelism with a wrong location in the index
state.
>
> No worries, the flink state backed would keep the semantics for us.
That's interesting! I just started to learn Flink and found that the
keyedState was only responsible for local change. If we change the parallelism,
the data layout should be completely changed since the keyBy operation uses a
different parallelism number. Does Flink automatically sync the distributed
state for this case? Any doc for this feature?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]