Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5336#discussion_r165021339
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
---
@@ -560,16 +585,11 @@ private void updateMinPunctuatedWatermark(Watermark
nextWatermark) {
/**
* Add current and committed offsets to metric group.
- *
- * @param metricGroup The metric group to use
*/
- protected void addOffsetStateGauge(MetricGroup metricGroup) {
- // add current offsets to gage
- MetricGroup currentOffsets =
metricGroup.addGroup("current-offsets");
- MetricGroup committedOffsets =
metricGroup.addGroup("committed-offsets");
- for (KafkaTopicPartitionState<KPH> ktp :
subscribedPartitionStates) {
- currentOffsets.gauge(ktp.getTopic() + "-" +
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
- committedOffsets.gauge(ktp.getTopic() + "-" +
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
+ protected void
registerOffsetMetrics(List<KafkaTopicPartitionState<KPH>>
partitionOffsetStates) {
--- End diff --
make private?
---