Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5108#discussion_r154363656
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
---
@@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value,
String targetTopic, int
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
- return partitions[parallelInstanceId % partitions.length];
+ if (topicToFixedPartition.containsKey(targetTopic)) {
--- End diff --
It might be that the "fixed partitioner" is hard to implement or that its
semantics are somewhat strange. Not sure though. What's the default Kafka
behaviour? Maybe we should also have that as the default behaviour.
---