Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5205#discussion_r159904126
--- Diff:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
@@ -742,7 +742,7 @@ public void snapshotState(FunctionSnapshotContext
context) throws Exception {
// case we adjust nextFreeTransactionalId by the range
of transactionalIds that could be used for this
// scaling up.
if (getRuntimeContext().getNumberOfParallelSubtasks() >
nextTransactionalIdHint.lastParallelism) {
- nextFreeTransactionalId +=
getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
+ nextFreeTransactionalId += (long)
getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
--- End diff --
Good change, although that is rather theoretical bug. To trigger it there
would need to be more then 1_000_000 subtasks and more then 2000 parallel
ongoing checkpoints.
---