This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit e88f3ca6941a88952f210a8cdedef73394f45787 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Wed Jul 10 11:14:54 2024 +0200 CAMEL-20956: move the metrics collection to a separate method This should help inlining the code --- .../camel/component/kafka/KafkaFetchRecords.java | 47 +++++++++++++--------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 18423905894..fc357f60c8a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -377,26 +377,8 @@ public class KafkaFetchRecords implements Runnable { // if dev-console is in use then a request to fetch the commit offsets can be requested on-demand // which must happen using this polling thread, so we use the commitRecordsRequested to trigger this - if (devConsoleEnabled && commitRecordsRequested.compareAndSet(true, false)) { - try { - Map<TopicPartition, OffsetAndMetadata> commits = consumer.committed(consumer.assignment()); - commitRecords.clear(); - for (var e : commits.entrySet()) { - KafkaTopicPosition p - = new KafkaTopicPosition( - e.getKey().topic(), e.getKey().partition(), e.getValue().offset(), - e.getValue().leaderEpoch().orElse(0)); - commitRecords.add(p); - } - CountDownLatch count = latch.get(); - if (count != null) { - count.countDown(); - } - } catch (Exception e) { - // ignore cannot get last commit details - LOG.debug("Cannot get last offset committed from Kafka brokers due to: {}. This exception is ignored.", - e.getMessage(), e); - } + if (devConsoleEnabled) { + collectCommitMetrics(); } ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); @@ -466,6 +448,31 @@ public class KafkaFetchRecords implements Runnable { } } + private void collectCommitMetrics() { + if (commitRecordsRequested.compareAndSet(true, false)) { + try { + Map<TopicPartition, OffsetAndMetadata> commits = consumer.committed(consumer.assignment()); + commitRecords.clear(); + for (var e : commits.entrySet()) { + KafkaTopicPosition p + = new KafkaTopicPosition( + e.getKey().topic(), e.getKey().partition(), e.getValue().offset(), + e.getValue().leaderEpoch().orElse(0)); + commitRecords.add(p); + } + CountDownLatch count = latch.get(); + if (count != null) { + count.countDown(); + } + } catch (Exception e) { + // ignore cannot get last commit details + LOG.debug( + "Cannot get last offset committed from Kafka brokers due to: {}. This exception is ignored.", + e.getMessage(), e); + } + } + } + private KafkaRecordProcessorFacade createRecordProcessor() { final KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration(); if (configuration.isBatching()) {
