AHeise commented on a change in pull request #16875:
URL: https://github.com/apache/flink/pull/16875#discussion_r692825902
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -389,4 +409,28 @@ private void
initMetrics(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
return Collections.emptyList();
}
}
+
+ private static long computeSendTime(Producer<?, ?> producer) {
+ final Metric sendTime =
+ MetricUtil.getKafkaMetric(
+ producer.metrics(), "producer-metrics",
"request-latency-avg");
+ final Metric queueTime =
+ MetricUtil.getKafkaMetric(
+ producer.metrics(), "producer-metrics",
"record-queue-time-avg");
+ return ((Number) sendTime.metricValue()).longValue()
+ + ((Number) queueTime.metricValue()).longValue();
+ }
+
+ private void registerMetricSync() {
+ if (closed) {
+ return;
+ }
+ timeService.registerProcessingTimer(
+ lastSync + METRIC_UPDATE_INTERVAL_MILLIS,
+ (time) -> {
+ MetricUtil.sync(byteOutMetric, numBytesOutCounter);
+ lastSync = time;
+ registerMetricSync();
+ });
Review comment:
To avoid reading the Kafka metric after producer is closed (not sure
what the semantics is here)
```suggestion
timeService.registerProcessingTimer(
lastSync + METRIC_UPDATE_INTERVAL_MILLIS,
(time) -> {
if (!closed) {
MetricUtil.sync(byteOutMetric, numBytesOutCounter);
lastSync = time;
registerMetricSync();
}
});
```
It might be easier to just use `System.currentTimeMillis()` instead of
`lastSync` to not trigger stuff to often in case of overload? But not sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]