[ 
https://issues.apache.org/jira/browse/FLINK-13537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-13537:
--------------------------------
    Description: 
The Kafka producer's transaction IDs are only generated once when there was no 
previous state for that operator. In the case where we restore and increase 
parallelism (scale-out), some operators may not have previous state and create 
new IDs. Now, if we also reduce the poolSize, these new IDs may overlap with 
the old ones which should never happen!

On a side note, the current scheme also relies on the fact, that the operator's 
list state distributes previous states during scale-out in a fashion that only 
the operators with the highest subtask indices do not get a previous state. 
This is somewhat "guaranteed" by {{OperatorStateStore#getListState()}} but I'm 
not sure whether we should actually rely on that there.

  was:The Kafka producer's transaction IDs are only generated once when there 
was no previous state for that operator. In the case where we restore and 
increase parallelism (scale-out), some operators may not have previous state 
and create new IDs. Now, if we also reduce the poolSize, these new IDs may 
overlap with the old ones which should never happen!


> Changing Kafka producer pool size and scaling out may create overlapping 
> transaction IDs
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-13537
>                 URL: https://issues.apache.org/jira/browse/FLINK-13537
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.1, 1.9.0
>            Reporter: Nico Kruber
>            Priority: Major
>
> The Kafka producer's transaction IDs are only generated once when there was 
> no previous state for that operator. In the case where we restore and 
> increase parallelism (scale-out), some operators may not have previous state 
> and create new IDs. Now, if we also reduce the poolSize, these new IDs may 
> overlap with the old ones which should never happen!
> On a side note, the current scheme also relies on the fact, that the 
> operator's list state distributes previous states during scale-out in a 
> fashion that only the operators with the highest subtask indices do not get a 
> previous state. This is somewhat "guaranteed" by 
> {{OperatorStateStore#getListState()}} but I'm not sure whether we should 
> actually rely on that there.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to