PatrickRen commented on code in PR #20475:
URL: https://github.com/apache/flink/pull/20475#discussion_r1011493529
##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -156,6 +157,9 @@
this.numBytesOutCounter =
metricGroup.getIOMetricGroup().getNumBytesOutCounter();
this.numRecordsOutCounter =
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
this.numRecordsOutErrorsCounter =
metricGroup.getNumRecordsOutErrorsCounter();
+ this.kafkaProducerConfig.setProperty(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ createProducerClientId(kafkaProducerConfig,
sinkInitContext.getSubtaskId()));
Review Comment:
It's possible to have multiple producers existing in the same writer.
KafkaWriter has a producer pool for reusing, and creates a new producer once
the pool is empty. I think there will be collision if those producers in the
pool use the same client id.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]