AHeise commented on a change in pull request #16838:
URL: https://github.com/apache/flink/pull/16838#discussion_r689667537
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##########
@@ -75,15 +81,25 @@
private final SimpleCollector<T> collector;
private final String groupId;
private final int subtaskId;
+
+ // For Kafka specific metrics
private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
+ // For FLIP-33 standard metrics
+ private final SourceReaderMetricGroup sourceReaderMetricGroup;
+ @Nullable private final Supplier<Long> bytesConsumedTotalSupplier;
Review comment:
I don't immediately see the value of using the `Supplier` indirection
over using `@Nullable Metric bytesConsumedMetric`. Having a nullable supplier
is kind of an anti-pattern as we are mixing functional-style with imperative
constructs (null).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##########
@@ -414,6 +436,45 @@ private void maybeRegisterKafkaConsumerMetrics(
}
}
+ private Supplier<Long> getBytesConsumedTotalSupplier(KafkaConsumer<?, ?>
consumer) {
+ final String bytesConsumedTotalName = "bytes-consumed-total";
+ final Optional<Supplier<Long>> supplier =
+ consumer.metrics().entrySet().stream()
+ .filter(
+ e ->
+ e.getKey()
+ .group()
+ .equals(
+
KAFKA_METRIC_GROUP_CONSUMER_FETCH_MANAGER_METRICS)
+ && e.getKey()
+ .name()
+
.equals(KAFKA_METRIC_BYTES_CONSUMED_TOTAL))
+ .map(Map.Entry::getValue)
+ .findFirst()
Review comment:
I'd extract that to `Optional<Metric> findKafkaMetic(String group,
String name)`, then you can reuse it for the lag metric.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]