Zimehr Abbasi created FLINK-37323:
-------------------------------------
Summary: Checkpoint Growth & Akka FrameSize Exception when
Migrating from FlinkKafkaProducer to KafkaSink
Key: FLINK-37323
URL: https://issues.apache.org/jira/browse/FLINK-37323
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.14.3
Reporter: Zimehr Abbasi
While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the
same {{uid}} for both sinks to maintain continuity within our large distributed
system. Initially, this was not an issue, but after multiple job restarts, we
encountered an {*}Akka FrameSize exception{*}:
```
The rpc invocation size 16,234,343 exceeds the maximum akka framesize.
```
This occurred in a simple stream setup with a source and sink function, with no
changing state. However, despite no explicit interaction with state,
*checkpoint sizes kept increasing* with each restart. Upon deserializing the
state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s
{{next-transactional-id-hint-v2}} were growing continuously.
The root cause appears to be that {{next-transactional-id-hint-v2}} is stored
as a {*}UnionListState{*}, meaning that upon each restart, the number of
partition offsets in the state is {*}multiplied by the parallelism{*}, as all
state is assigned to all operator subtasks.
This issue does not occur with {{FlinkKafkaProducer}} because it explicitly
calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this state.
h4. Workarounds Considered
* Setting a different {{uid}} for the two sinks avoids the issue but requires
{{{}--allow-non-restored-state{}}}, which is not viable as we {*}cannot afford
any data loss{*}.
* Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
* Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of
{{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before
forwarding records.
h4. Question
What would be the recommended approach to safely transition from
{{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary state
and without requiring {{{}--allow-non-restored-state{}}}? Would introducing a
NOOP operator to clear the legacy state be a valid approach?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)