Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5336#discussion_r165021513
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
---
@@ -92,21 +93,19 @@ public Kafka09Fetcher(
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
- userCodeClassLoader,
+ userCodeClassLoader.getParent(),
+ consumerMetricGroup,
useMetrics);
this.deserializer = deserializer;
this.handover = new Handover();
- final MetricGroup kafkaMetricGroup =
metricGroup.addGroup("KafkaConsumer");
- addOffsetStateGauge(kafkaMetricGroup);
-
this.consumerThread = new KafkaConsumerThread(
LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
- kafkaMetricGroup,
+ subtaskMetricGroup, // TODO: the thread should
expose Kafka-shipped metrics through the consumer metric group, not subtask
metric group
--- End diff --
so why aren't we passing the consumerMetricGroup here?
---