Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5335#discussion_r165022709
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
---
@@ -95,21 +95,19 @@ public Kafka09Fetcher(
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
- userCodeClassLoader,
+ userCodeClassLoader.getParent(),
+ consumerMetricGroup,
useMetrics);
this.deserializer = deserializer;
this.handover = new Handover();
- final MetricGroup kafkaMetricGroup =
metricGroup.addGroup(KAFKA_CONSUMER_METRICS_GROUP);
- addOffsetStateGauge(kafkaMetricGroup);
-
this.consumerThread = new KafkaConsumerThread(
LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
- kafkaMetricGroup,
+ subtaskMetricGroup, // TODO: the thread should
expose Kafka-shipped metrics through the consumer metric group, not subtask
metric group
--- End diff --
so why aren't we passing the consumerMetricGroup here?
---