GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/5108
[FLINK-8181] [kafka] Make FlinkFixedPartitioner insensitive to topic
rescaling
## What is the purpose of the change
This PR fixes a behavioral regression caused by
9ed9b68397b51bfd2b0f6e532212a82f771641bd.
In that commit, the new `FlinkFixedPartitioner` no longer returns identical
target partitions once a target topic is rescaled.
With this PR, the `FlinkFixedPartitioner` returns identical target Kafka
partitions for a given target Kafka topic when rescaling happens. A cache is
maintained in the partitioner to remember the determined target partition for
each target topic, calculated at the time of the topic's first appearance at
the partitioner.
## Brief change log
- Make the `FlinkFixedPartitioner` insensitive to topic rescaling
- Add new test `FlinkFixedPartitionerTest#testIncreasingPartitions()`.
- Minor hotfix to properly use independent partitioner instances in all
tests of `FlinkFixedPartitionerTest`.
## Verifying this change
Covered by new test `FlinkFixedPartitionerTest#testIncreasingPartitions()`.
The test fails without the fix.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): **yes**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? n/a
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-8181
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5108.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5108
----
commit 393ee95ce6b71f8ba29b8c62e8116d4ef429b6ac
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-12-01T05:20:59Z
[FLINK-8181] [kafka] Make FlinkFixedPartitioner insensitive to topic
rescaling
With this commit, the FlinkFixedPartitioner returns identical target
Kafka partitions for a given target Kafka topic when rescaling happens.
A cache is maintained to remember the determined target partition for
each target topic, calculated at the time of the topic's first
appearance.
commit 17770af77ba759f645a1e057424cacafa613651e
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-12-01T05:29:40Z
[hotfix] [kafka] Use separate Kafka partitioner instances for each mock
subtask in FlinkFixedPartitionerTest
Prior to this commit, the unit tests in FlinkFixedPartitionerTest used
the same FlinkFixedPartitioner instance for testing different mock
subtasks. In reality, each sink subtask will have its own partitioner
instance.
This commit fixes that to match proper usage.
----
---