Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4935#discussion_r148442662
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
---
@@ -245,6 +238,23 @@ public void run() {
if (records == null) {
try {
records =
consumer.poll(pollTimeout);
+ // register Kafka's very own
metrics in Flink's metric reporters
+ if (useMetrics &&
!records.isEmpty()) {
+ // register Kafka
metrics to Flink
+ Map<MetricName, ?
extends Metric> metrics = consumer.metrics();
+ if (metrics == null) {
+ // MapR's Kafka
implementation returns null here.
+
log.info("Consumer implementation does not support metrics");
+ } else {
+ // we have
Kafka metrics, register them
+ for
(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
--- End diff --
I'm really not sure about this.
This includes a loop through every consumer metric on every record poll.
AFAIK, the Kafka consumer contains at least 6~8 shipped metrics. That could
be harmful for the performance of the consumer.
Is there any way we can avoid that?
---