[
https://issues.apache.org/jira/browse/FLINK-13537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nico Kruber updated FLINK-13537:
--------------------------------
Description:
The Kafka producer's transaction IDs are only generated once when there was no
previous state for that operator. In the case where we restore and increase
parallelism (scale-out), some operators may not have previous state and create
new IDs. Now, if we also reduce the poolSize, these new IDs may overlap with
the old ones which should never happen!
On a side note, the current scheme also relies on the fact, that the operator's
list state distributes previous states during scale-out in a fashion that only
the operators with the highest subtask indices do not get a previous state.
This is somewhat "guaranteed" by {{OperatorStateStore#getListState()}} but I'm
not sure whether we should actually rely on that there.
was:The Kafka producer's transaction IDs are only generated once when there
was no previous state for that operator. In the case where we restore and
increase parallelism (scale-out), some operators may not have previous state
and create new IDs. Now, if we also reduce the poolSize, these new IDs may
overlap with the old ones which should never happen!
> Changing Kafka producer pool size and scaling out may create overlapping
> transaction IDs
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-13537
> URL: https://issues.apache.org/jira/browse/FLINK-13537
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.8.1, 1.9.0
> Reporter: Nico Kruber
> Priority: Major
>
> The Kafka producer's transaction IDs are only generated once when there was
> no previous state for that operator. In the case where we restore and
> increase parallelism (scale-out), some operators may not have previous state
> and create new IDs. Now, if we also reduce the poolSize, these new IDs may
> overlap with the old ones which should never happen!
> On a side note, the current scheme also relies on the fact, that the
> operator's list state distributes previous states during scale-out in a
> fashion that only the operators with the highest subtask indices do not get a
> previous state. This is somewhat "guaranteed" by
> {{OperatorStateStore#getListState()}} but I'm not sure whether we should
> actually rely on that there.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)