This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e88f3ca6941a88952f210a8cdedef73394f45787
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Jul 10 11:14:54 2024 +0200

    CAMEL-20956: move the metrics collection to a separate method
    
    This should help inlining the code
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 47 +++++++++++++---------
 1 file changed, 27 insertions(+), 20 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 18423905894..fc357f60c8a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -377,26 +377,8 @@ public class KafkaFetchRecords implements Runnable {
 
                 // if dev-console is in use then a request to fetch the commit 
offsets can be requested on-demand
                 // which must happen using this polling thread, so we use the 
commitRecordsRequested to trigger this
-                if (devConsoleEnabled && 
commitRecordsRequested.compareAndSet(true, false)) {
-                    try {
-                        Map<TopicPartition, OffsetAndMetadata> commits = 
consumer.committed(consumer.assignment());
-                        commitRecords.clear();
-                        for (var e : commits.entrySet()) {
-                            KafkaTopicPosition p
-                                    = new KafkaTopicPosition(
-                                            e.getKey().topic(), 
e.getKey().partition(), e.getValue().offset(),
-                                            
e.getValue().leaderEpoch().orElse(0));
-                            commitRecords.add(p);
-                        }
-                        CountDownLatch count = latch.get();
-                        if (count != null) {
-                            count.countDown();
-                        }
-                    } catch (Exception e) {
-                        // ignore cannot get last commit details
-                        LOG.debug("Cannot get last offset committed from Kafka 
brokers due to: {}. This exception is ignored.",
-                                e.getMessage(), e);
-                    }
+                if (devConsoleEnabled) {
+                    collectCommitMetrics();
                 }
 
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
@@ -466,6 +448,31 @@ public class KafkaFetchRecords implements Runnable {
         }
     }
 
+    private void collectCommitMetrics() {
+        if (commitRecordsRequested.compareAndSet(true, false)) {
+            try {
+                Map<TopicPartition, OffsetAndMetadata> commits = 
consumer.committed(consumer.assignment());
+                commitRecords.clear();
+                for (var e : commits.entrySet()) {
+                    KafkaTopicPosition p
+                            = new KafkaTopicPosition(
+                            e.getKey().topic(), e.getKey().partition(), 
e.getValue().offset(),
+                            e.getValue().leaderEpoch().orElse(0));
+                    commitRecords.add(p);
+                }
+                CountDownLatch count = latch.get();
+                if (count != null) {
+                    count.countDown();
+                }
+            } catch (Exception e) {
+                // ignore cannot get last commit details
+                LOG.debug(
+                        "Cannot get last offset committed from Kafka brokers 
due to: {}. This exception is ignored.",
+                        e.getMessage(), e);
+            }
+        }
+    }
+
     private KafkaRecordProcessorFacade createRecordProcessor() {
         final KafkaConfiguration configuration = 
kafkaConsumer.getEndpoint().getConfiguration();
         if (configuration.isBatching()) {

Reply via email to