Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/3766
One thing to be careful with, though:
Since now we're querying Kafka for partition metadata within the `invoke`
method, the query must be handled robustly and make sure it doesn't result in
unexpectedly longer checkpoint times by blocking the whole stream at the Kafka
sink.
Most notably, we need to consider the corner case where Kafka isn't
cooperating nicely:
1. how to handle arbitrary long response time for fetching the partition
metadata?
2. how to handle the case where, due to some Kafka brokers temporary
unavailable, the returned partitions is not complete?
For 2., I can also forsee that we have a separate "partitions update
thread" that refreshes the `Map<String, int[]>` cache continuously at a fixed
interval. This can also involve to a `FlinkKafkaPartitioner` that can provide
dynamically changing `int[] partitions` when invoking the `partition` method.
Perhaps we shouldn't include that with this PR, as its orthogonal to the
API change. Just some food for though :)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---