Yuwei Xiao created HUDI-4521:
--------------------------------
Summary: Fix lost re-commit in rare restart case (changing
write.tasks)
Key: HUDI-4521
URL: https://issues.apache.org/jira/browse/HUDI-4521
Project: Apache Hudi
Issue Type: Bug
Components: core, flink
Reporter: Yuwei Xiao
Assignee: Yuwei Xiao
The current `StreamWriteOperatorCoordinator` in Flink will try to re-commit the
last batch during restart. And users may rely on this behavior to achieve
exactly-once semantic when reading from source like Kafka.
However, the re-commit operation may be skipped if the user set a different
`write.tasks` parameter in restart, because the current implementation keeps
`write.tasks` number of slots to track events from subtasks and does not handle
the case where the write parallelism changes.
For example:
# Start with `write.tasks = 4` and the application crashes (or stops) right
after the Flink checkpoint (e.g., CkpId=1) while before the hudi commit.
# Restart with `write.tasks = 8` and the coordinator will receive 4 restored
bootstrap metadata event and 4 empty bootstrap event. Since the arrival order
of these events is not deterministic, so the coordinator may not re-commit the
last commit.
# The source (e.g., Kafka reader) use checkpointId to guide its consumption.
So in the restart, it will read at the next offset given by `CkpId=1`. Then we
will lost all data in hudi for the batch (i.e., Ckp=1).
Similar problem also happens for having smaller `write.tasks` during restart,
e.g., 4 -> 2.
This Jira will fix the implementation to ensure the re-commit will be done for
changing `write.tasks` case. Though the exactly-once semantic could be fixed by
changing the reader side (e.g., track ckpId in hudi commit data and use it to
guide the reader), it requires hudi users to change their application code.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)