[ 
https://issues.apache.org/jira/browse/FLINK-37323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zimehr Abbasi updated FLINK-37323:
----------------------------------
    Description: 
While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the 
same {{uid}} for both sinks to maintain continuity within our large distributed 
system. Initially, this was not an issue, but after multiple job restarts, we 
encountered an {*}Akka FrameSize exception{*}:
{code:java}
The rpc invocation size 16,234,344 exceeds the maximum akka framesize.{code}
This occurred in a simple stream setup with a source and sink function, with no 
changing state. However, despite no explicit interaction with state, 
*checkpoint sizes kept increasing* with each restart. Upon deserializing the 
state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s 
{{next-transactional-id-hint-v2}} were growing continuously.

The root cause appears to be that {{next-transactional-id-hint-v2}} is stored 
as a {*}UnionListState{*}, meaning that upon each restart, the number of 
partition offsets in the state is {*}multiplied by the parallelism{*}, as all 
state is assigned to all operator subtasks.

This issue does not occur with {{FlinkKafkaProducer}} because it explicitly 
calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this state.
h4. Workarounds Considered
 * Setting a different {{uid}} for the two sinks avoids the issue but requires 
{{{}--allow-non-restored-state{}}}, which is not viable as we {*}cannot afford 
any data loss{*}.
 * Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
 * Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of 
{{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before 
forwarding records.

h4. Question

What would be the recommended approach to safely transition from 
{{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary state 
and without requiring {{{}--allow-non-restored-state{}}}? Would introducing a 
NOOP operator to clear the legacy state be a valid approach?

  was:
While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the 
same {{uid}} for both sinks to maintain continuity within our large distributed 
system. Initially, this was not an issue, but after multiple job restarts, we 
encountered an {*}Akka FrameSize exception{*}:
{code:java}
The rpc invocation size 16,234,343 exceeds the maximum akka framesize.{code}
This occurred in a simple stream setup with a source and sink function, with no 
changing state. However, despite no explicit interaction with state, 
*checkpoint sizes kept increasing* with each restart. Upon deserializing the 
state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s 
{{next-transactional-id-hint-v2}} were growing continuously.

The root cause appears to be that {{next-transactional-id-hint-v2}} is stored 
as a {*}UnionListState{*}, meaning that upon each restart, the number of 
partition offsets in the state is {*}multiplied by the parallelism{*}, as all 
state is assigned to all operator subtasks.

This issue does not occur with {{FlinkKafkaProducer}} because it explicitly 
calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this state.
h4. Workarounds Considered
 * Setting a different {{uid}} for the two sinks avoids the issue but requires 
{{{}--allow-non-restored-state{}}}, which is not viable as we {*}cannot afford 
any data loss{*}.
 * Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
 * Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of 
{{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before 
forwarding records.

h4. Question

What would be the recommended approach to safely transition from 
{{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary state 
and without requiring {{{}--allow-non-restored-state{}}}? Would introducing a 
NOOP operator to clear the legacy state be a valid approach?


> Checkpoint Growth & Akka FrameSize Exception when Migrating from 
> FlinkKafkaProducer to KafkaSink
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37323
>                 URL: https://issues.apache.org/jira/browse/FLINK-37323
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.3
>            Reporter: Zimehr Abbasi
>            Priority: Minor
>
> While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the 
> same {{uid}} for both sinks to maintain continuity within our large 
> distributed system. Initially, this was not an issue, but after multiple job 
> restarts, we encountered an {*}Akka FrameSize exception{*}:
> {code:java}
> The rpc invocation size 16,234,344 exceeds the maximum akka framesize.{code}
> This occurred in a simple stream setup with a source and sink function, with 
> no changing state. However, despite no explicit interaction with state, 
> *checkpoint sizes kept increasing* with each restart. Upon deserializing the 
> state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s 
> {{next-transactional-id-hint-v2}} were growing continuously.
> The root cause appears to be that {{next-transactional-id-hint-v2}} is stored 
> as a {*}UnionListState{*}, meaning that upon each restart, the number of 
> partition offsets in the state is {*}multiplied by the parallelism{*}, as all 
> state is assigned to all operator subtasks.
> This issue does not occur with {{FlinkKafkaProducer}} because it explicitly 
> calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this 
> state.
> h4. Workarounds Considered
>  * Setting a different {{uid}} for the two sinks avoids the issue but 
> requires {{{}--allow-non-restored-state{}}}, which is not viable as we 
> {*}cannot afford any data loss{*}.
>  * Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
>  * Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of 
> {{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before 
> forwarding records.
> h4. Question
> What would be the recommended approach to safely transition from 
> {{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary 
> state and without requiring {{{}--allow-non-restored-state{}}}? Would 
> introducing a NOOP operator to clear the legacy state be a valid approach?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to