Github user Aitozi commented on a diff in the pull request:
https://github.com/apache/flink/pull/4935#discussion_r148530767
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
---
@@ -245,6 +238,23 @@ public void run() {
if (records == null) {
try {
records =
consumer.poll(pollTimeout);
+ // register Kafka's very own
metrics in Flink's metric reporters
+ if (useMetrics &&
!records.isEmpty()) {
+ // register Kafka
metrics to Flink
+ Map<MetricName, ?
extends Metric> metrics = consumer.metrics();
+ if (metrics == null) {
+ // MapR's Kafka
implementation returns null here.
+
log.info("Consumer implementation does not support metrics");
+ } else {
+ // we have
Kafka metrics, register them
+ for
(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
--- End diff --
yes, i agree with you this is not the best way to solve. what do you think
about try to register kafka metrics at the beginnng of the job for about
serval times which can be configured by `properties`, after beyond the count,
we will not run in the loop~
---