Zimehr Abbasi created FLINK-37323:
-------------------------------------

             Summary: Checkpoint Growth & Akka FrameSize Exception when 
Migrating from FlinkKafkaProducer to KafkaSink
                 Key: FLINK-37323
                 URL: https://issues.apache.org/jira/browse/FLINK-37323
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.14.3
            Reporter: Zimehr Abbasi


While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the 
same {{uid}} for both sinks to maintain continuity within our large distributed 
system. Initially, this was not an issue, but after multiple job restarts, we 
encountered an {*}Akka FrameSize exception{*}:

```

The rpc invocation size 16,234,343 exceeds the maximum akka framesize.

```

This occurred in a simple stream setup with a source and sink function, with no 
changing state. However, despite no explicit interaction with state, 
*checkpoint sizes kept increasing* with each restart. Upon deserializing the 
state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s 
{{next-transactional-id-hint-v2}} were growing continuously.

The root cause appears to be that {{next-transactional-id-hint-v2}} is stored 
as a {*}UnionListState{*}, meaning that upon each restart, the number of 
partition offsets in the state is {*}multiplied by the parallelism{*}, as all 
state is assigned to all operator subtasks.

This issue does not occur with {{FlinkKafkaProducer}} because it explicitly 
calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this state.
h4. Workarounds Considered
 * Setting a different {{uid}} for the two sinks avoids the issue but requires 
{{{}--allow-non-restored-state{}}}, which is not viable as we {*}cannot afford 
any data loss{*}.
 * Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
 * Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of 
{{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before 
forwarding records.

h4. Question

What would be the recommended approach to safely transition from 
{{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary state 
and without requiring {{{}--allow-non-restored-state{}}}? Would introducing a 
NOOP operator to clear the legacy state be a valid approach?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to