[
https://issues.apache.org/jira/browse/FLINK-13537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nico Kruber updated FLINK-13537:
--------------------------------
Description:
The Kafka producer's transaction IDs are only generated once when there was no
previous state for that operator. In the case where we restore and increase
parallelism (scale-out), some operators may not have previous state and create
new IDs. Now, if we also reduce the {{poolSize}}, these new IDs may overlap
with the old ones which should never happen! Similarly, a scale-in + increasing
{{poolSize}} could lead the the same thing.
An easy "fix" for this would be to forbid changing the {{poolSize}}. We could
potentially be a bit better by only forbidding changes that can lead to
transaction ID overlaps which we can identify from the formulae that
{{TransactionalIdsGenerator}} uses. This should probably be the first step
which can also be back-ported to older Flink versions just in case.
----
On a side note, the current scheme also relies on the fact, that the operator's
list state distributes previous states during scale-out in a fashion that only
the operators with the highest subtask indices do not get a previous state.
This is somewhat "guaranteed" by {{OperatorStateStore#getListState()}} but I'm
not sure whether we should actually rely on that there.
was:
The Kafka producer's transaction IDs are only generated once when there was no
previous state for that operator. In the case where we restore and increase
parallelism (scale-out), some operators may not have previous state and create
new IDs. Now, if we also reduce the poolSize, these new IDs may overlap with
the old ones which should never happen!
On a side note, the current scheme also relies on the fact, that the operator's
list state distributes previous states during scale-out in a fashion that only
the operators with the highest subtask indices do not get a previous state.
This is somewhat "guaranteed" by {{OperatorStateStore#getListState()}} but I'm
not sure whether we should actually rely on that there.
> Changing Kafka producer pool size and scaling out may create overlapping
> transaction IDs
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-13537
> URL: https://issues.apache.org/jira/browse/FLINK-13537
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.8.1, 1.9.0
> Reporter: Nico Kruber
> Priority: Major
>
> The Kafka producer's transaction IDs are only generated once when there was
> no previous state for that operator. In the case where we restore and
> increase parallelism (scale-out), some operators may not have previous state
> and create new IDs. Now, if we also reduce the {{poolSize}}, these new IDs
> may overlap with the old ones which should never happen! Similarly, a
> scale-in + increasing {{poolSize}} could lead the the same thing.
> An easy "fix" for this would be to forbid changing the {{poolSize}}. We could
> potentially be a bit better by only forbidding changes that can lead to
> transaction ID overlaps which we can identify from the formulae that
> {{TransactionalIdsGenerator}} uses. This should probably be the first step
> which can also be back-ported to older Flink versions just in case.
> ----
> On a side note, the current scheme also relies on the fact, that the
> operator's list state distributes previous states during scale-out in a
> fashion that only the operators with the highest subtask indices do not get a
> previous state. This is somewhat "guaranteed" by
> {{OperatorStateStore#getListState()}} but I'm not sure whether we should
> actually rely on that there.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)