jeremy-degroot commented on code in PR #20:
URL:
https://github.com/apache/flink-connector-kafka/pull/20#discussion_r1247944453
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -80,11 +80,20 @@ public KafkaPartitionSplitReader(
Properties props,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
+ this(props, context, kafkaSourceReaderMetrics, () -> null);
+ }
+
+ public KafkaPartitionSplitReader(
+ Properties props,
+ SourceReaderContext context,
+ KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
+ Supplier<String> rackIdSupplier) {
this.subtaskId = context.getIndexOfSubtask();
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
Properties consumerProps = new Properties();
consumerProps.putAll(props);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
createConsumerClientId(props));
+ setConsumerClientRack(consumerProps, rackIdSupplier);
Review Comment:
I don't think it can be done in KafkaSourceReader either, since that doesn't
interact with the Supplier<KafkaPartitionSplitReader> at all. The alternative
seems to be to resolve it in KafkaSourceFetcherManager, since that extends
SingleThreadFetcherManager, where both Suppliers eventually run. But that would
involve a rather messy overriding of the createSplitFetcher() method from that
included class, and I don't think that's any better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]