Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5108#discussion_r154293212
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
---
@@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value,
String targetTopic, int
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
- return partitions[parallelInstanceId % partitions.length];
+ if (topicToFixedPartition.containsKey(targetTopic)) {
--- End diff --
The full solution would be to allow FlinkKafkaPartitioner to be stateful
and participate in checkpointing.
That might mean passing in the runtime context in
FlinkKafkaPartitioner.open(), but that would be touching the user API.
We could also opt for a workaround to only register state for the
FlinkFixedPartitioner internally, and still restrict custom partitioners to be
stateless.
However, I think it would make more sense to avoid workarounds at this
stage.
---