[
https://issues.apache.org/jira/browse/FLINK-37323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zimehr Abbasi updated FLINK-37323:
----------------------------------
Description:
While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the
same {{uid}} for both sinks to maintain continuity within our large distributed
system. Initially, this was not an issue, but after multiple job restarts, we
encountered an {*}Akka FrameSize exception{*}:
{code:java}
The rpc invocation size 16,234,344 exceeds the maximum akka framesize.{code}
This occurred in a simple stream setup with a source and sink function, with no
changing state. However, despite no explicit interaction with state,
*checkpoint sizes kept increasing* with each restart. Upon deserializing the
state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s
{{next-transactional-id-hint-v2}} were growing continuously.
The root cause appears to be that {{next-transactional-id-hint-v2}} is stored
as a {*}UnionListState{*}, meaning that upon each restart, the number of
partition offsets in the state is {*}multiplied by the parallelism{*}, as all
state is assigned to all operator subtasks.
This issue does not occur with {{FlinkKafkaProducer}} because it explicitly
calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this state.
h4. Workarounds Considered
* Setting a different {{uid}} for the two sinks avoids the issue but requires
{{{}--allow-non-restored-state{}}}, which is not viable as we {*}cannot afford
any data loss{*}.
* Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
* Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of
{{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before
forwarding records.
h4. Question
What would be the recommended approach to safely transition from
{{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary state
and without requiring {{{}--allow-non-restored-state{}}}? Would introducing a
NOOP operator to clear the legacy state be a valid approach?
was:
While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the
same {{uid}} for both sinks to maintain continuity within our large distributed
system. Initially, this was not an issue, but after multiple job restarts, we
encountered an {*}Akka FrameSize exception{*}:
{code:java}
The rpc invocation size 16,234,343 exceeds the maximum akka framesize.{code}
This occurred in a simple stream setup with a source and sink function, with no
changing state. However, despite no explicit interaction with state,
*checkpoint sizes kept increasing* with each restart. Upon deserializing the
state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s
{{next-transactional-id-hint-v2}} were growing continuously.
The root cause appears to be that {{next-transactional-id-hint-v2}} is stored
as a {*}UnionListState{*}, meaning that upon each restart, the number of
partition offsets in the state is {*}multiplied by the parallelism{*}, as all
state is assigned to all operator subtasks.
This issue does not occur with {{FlinkKafkaProducer}} because it explicitly
calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this state.
h4. Workarounds Considered
* Setting a different {{uid}} for the two sinks avoids the issue but requires
{{{}--allow-non-restored-state{}}}, which is not viable as we {*}cannot afford
any data loss{*}.
* Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
* Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of
{{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before
forwarding records.
h4. Question
What would be the recommended approach to safely transition from
{{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary state
and without requiring {{{}--allow-non-restored-state{}}}? Would introducing a
NOOP operator to clear the legacy state be a valid approach?
> Checkpoint Growth & Akka FrameSize Exception when Migrating from
> FlinkKafkaProducer to KafkaSink
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-37323
> URL: https://issues.apache.org/jira/browse/FLINK-37323
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.3
> Reporter: Zimehr Abbasi
> Priority: Minor
>
> While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the
> same {{uid}} for both sinks to maintain continuity within our large
> distributed system. Initially, this was not an issue, but after multiple job
> restarts, we encountered an {*}Akka FrameSize exception{*}:
> {code:java}
> The rpc invocation size 16,234,344 exceeds the maximum akka framesize.{code}
> This occurred in a simple stream setup with a source and sink function, with
> no changing state. However, despite no explicit interaction with state,
> *checkpoint sizes kept increasing* with each restart. Upon deserializing the
> state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s
> {{next-transactional-id-hint-v2}} were growing continuously.
> The root cause appears to be that {{next-transactional-id-hint-v2}} is stored
> as a {*}UnionListState{*}, meaning that upon each restart, the number of
> partition offsets in the state is {*}multiplied by the parallelism{*}, as all
> state is assigned to all operator subtasks.
> This issue does not occur with {{FlinkKafkaProducer}} because it explicitly
> calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this
> state.
> h4. Workarounds Considered
> * Setting a different {{uid}} for the two sinks avoids the issue but
> requires {{{}--allow-non-restored-state{}}}, which is not viable as we
> {*}cannot afford any data loss{*}.
> * Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
> * Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of
> {{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before
> forwarding records.
> h4. Question
> What would be the recommended approach to safely transition from
> {{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary
> state and without requiring {{{}--allow-non-restored-state{}}}? Would
> introducing a NOOP operator to clear the legacy state be a valid approach?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)