jeremy-degroot commented on code in PR #20:
URL: 
https://github.com/apache/flink-connector-kafka/pull/20#discussion_r1247944453


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -80,11 +80,20 @@ public KafkaPartitionSplitReader(
             Properties props,
             SourceReaderContext context,
             KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
+        this(props, context, kafkaSourceReaderMetrics, () -> null);
+    }
+
+    public KafkaPartitionSplitReader(
+            Properties props,
+            SourceReaderContext context,
+            KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
+            Supplier<String> rackIdSupplier) {
         this.subtaskId = context.getIndexOfSubtask();
         this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
         Properties consumerProps = new Properties();
         consumerProps.putAll(props);
         consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
createConsumerClientId(props));
+        setConsumerClientRack(consumerProps, rackIdSupplier);

Review Comment:
   I don't think it can be done in KafkaSourceReader either, since that doesn't 
interact with the Supplier<KafkaPartitionSplitReader> at all. The alternative 
seems to be to resolve it in KafkaSourceFetcherManager, since that extends 
SingleThreadFetcherManager, where both Suppliers eventually run. But that would 
involve a rather messy overriding of the createSplitFetcher() method from that 
included class, and I don't think that's any better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to