Zimehr Abbasi created FLINK-37323: ------------------------------------- Summary: Checkpoint Growth & Akka FrameSize Exception when Migrating from FlinkKafkaProducer to KafkaSink Key: FLINK-37323 URL: https://issues.apache.org/jira/browse/FLINK-37323 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.3 Reporter: Zimehr Abbasi
While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the same {{uid}} for both sinks to maintain continuity within our large distributed system. Initially, this was not an issue, but after multiple job restarts, we encountered an {*}Akka FrameSize exception{*}: ``` The rpc invocation size 16,234,343 exceeds the maximum akka framesize. ``` This occurred in a simple stream setup with a source and sink function, with no changing state. However, despite no explicit interaction with state, *checkpoint sizes kept increasing* with each restart. Upon deserializing the state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s {{next-transactional-id-hint-v2}} were growing continuously. The root cause appears to be that {{next-transactional-id-hint-v2}} is stored as a {*}UnionListState{*}, meaning that upon each restart, the number of partition offsets in the state is {*}multiplied by the parallelism{*}, as all state is assigned to all operator subtasks. This issue does not occur with {{FlinkKafkaProducer}} because it explicitly calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this state. h4. Workarounds Considered * Setting a different {{uid}} for the two sinks avoids the issue but requires {{{}--allow-non-restored-state{}}}, which is not viable as we {*}cannot afford any data loss{*}. * Restarting the job from scratch resolves the issue but is {*}not ideal{*}. * Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of {{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before forwarding records. h4. Question What would be the recommended approach to safely transition from {{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary state and without requiring {{{}--allow-non-restored-state{}}}? Would introducing a NOOP operator to clear the legacy state be a valid approach? -- This message was sent by Atlassian Jira (v8.20.10#820010)