fapaul commented on a change in pull request #16875:
URL: https://github.com/apache/flink/pull/16875#discussion_r692878739
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -389,4 +409,28 @@ private void
initMetrics(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
return Collections.emptyList();
}
}
+
+ private static long computeSendTime(Producer<?, ?> producer) {
+ final Metric sendTime =
+ MetricUtil.getKafkaMetric(
+ producer.metrics(), "producer-metrics",
"request-latency-avg");
+ final Metric queueTime =
+ MetricUtil.getKafkaMetric(
+ producer.metrics(), "producer-metrics",
"record-queue-time-avg");
+ return ((Number) sendTime.metricValue()).longValue()
+ + ((Number) queueTime.metricValue()).longValue();
+ }
+
+ private void registerMetricSync() {
+ if (closed) {
+ return;
+ }
+ timeService.registerProcessingTimer(
+ lastSync + METRIC_UPDATE_INTERVAL_MILLIS,
+ (time) -> {
+ MetricUtil.sync(byteOutMetric, numBytesOutCounter);
+ lastSync = time;
+ registerMetricSync();
+ });
Review comment:
> It might be easier to just use `System.currentTimeMillis()` instead of
`lastSync` to not trigger stuff to often in case of overload? But not sure.
I am not sure what the best way here is. With your suggestion we need to
call `System.currentTimeMillis()` for every registration. Currently is only
needed once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]