Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4935#discussion_r148442489
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
---
@@ -245,6 +238,23 @@ public void run() {
if (records == null) {
try {
records =
consumer.poll(pollTimeout);
+ // register Kafka's very own
metrics in Flink's metric reporters
+ if (useMetrics &&
!records.isEmpty()) {
+ // register Kafka
metrics to Flink
+ Map<MetricName, ?
extends Metric> metrics = consumer.metrics();
+ if (metrics == null) {
+ // MapR's Kafka
implementation returns null here.
+
log.info("Consumer implementation does not support metrics");
--- End diff --
The log will be overloaded with these, if the MapR implementation is used
and metrics is turned on.
---