fapaul commented on a change in pull request #18825:
URL: https://github.com/apache/flink/pull/18825#discussion_r819371754
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -259,6 +262,16 @@ private void abortCurrentProducer() {
return currentProducer;
}
+ @VisibleForTesting
+ KafkaRecordSerializationSchema<IN> getRecordSerializer() {
Review comment:
Do you really need to expose the recordSerializer and the sink context?
I only see them used by the `testNumRecordsOutErrorsCounterMetric` and I think
both object are passed to writer during the construction so they are already
accessible from the test class.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -406,6 +419,7 @@ public WriterCallback(
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
{
if (exception != null) {
+ numRecordsOutErrorsCounter.inc();
Review comment:
I think it is not safe to call this here because it is called by the
Kafka thread. The counter should only be updated by the mail box thread. In
general, I am a bit skeptical about introducing this change because the maximum
number of errors is 1 since the pipeline immediately fails after coming to this
point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]