jeremy-degroot commented on code in PR #20:
URL:
https://github.com/apache/flink-connector-kafka/pull/20#discussion_r1160774759
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -80,11 +80,20 @@ public KafkaPartitionSplitReader(
Properties props,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
+ this(props, context, kafkaSourceReaderMetrics, () -> null);
+ }
+
+ public KafkaPartitionSplitReader(
+ Properties props,
+ SourceReaderContext context,
+ KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
+ Supplier<String> rackIdSupplier) {
this.subtaskId = context.getIndexOfSubtask();
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
Properties consumerProps = new Properties();
consumerProps.putAll(props);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
createConsumerClientId(props));
+ setConsumerClientRack(consumerProps, rackIdSupplier);
Review Comment:
I looked into this, and while it's certainly possible to do it that way the
testing path is much more complex. Since the Supplier would have to be resolved
in another Supplier<KafkaPartitionSplitReader> that's passed to the Reader,
testing that the behavior is as expected in the actual execution path is
difficult. In KafkaPartitionSplitReader, we can call the constructor directly
and verify that the rackIdSupplier is called, and then also verify it does what
we need it to by verifying the behavior of the helper method you noted further
down.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]