This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bf6c9de6ee4fb03d3faab8ce8c7d44f705234b58
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Jul 10 11:43:31 2024 +0200

    CAMEL-20956: separate metrics collection concerns from record fetching 
concerns
---
 .../camel/component/kafka/KafkaDevConsole.java     |  44 ++++---
 .../camel/component/kafka/KafkaFetchRecords.java   | 111 ++++--------------
 .../devconsole/DefaultMetricsCollector.java        | 130 +++++++++++++++++++++
 .../devconsole/DevConsoleMetricsCollector.java     |  52 +++++++++
 .../consumer/devconsole/NoopMetricsCollector.java  |  69 +++++++++++
 5 files changed, 298 insertions(+), 108 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
index 1f80df5a8b6..b7f94dd12e3 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
@@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Route;
+import 
org.apache.camel.component.kafka.consumer.devconsole.DefaultMetricsCollector;
+import 
org.apache.camel.component.kafka.consumer.devconsole.DevConsoleMetricsCollector;
 import org.apache.camel.spi.annotations.DevConsole;
 import org.apache.camel.support.console.AbstractDevConsole;
 import org.apache.camel.util.StopWatch;
@@ -58,28 +60,29 @@ public class KafkaDevConsole extends AbstractDevConsole {
                 sb.append(String.format("\n    Route Id: %s", 
route.getRouteId()));
                 sb.append(String.format("\n    From: %s", 
route.getEndpoint().getEndpointUri()));
                 for (KafkaFetchRecords t : kc.tasks()) {
-                    sb.append(String.format("\n        Worked Thread: %s", 
t.getThreadId()));
+                    final DevConsoleMetricsCollector metricsCollector = 
t.getMetricsCollector();
+                    sb.append(String.format("\n        Worked Thread: %s", 
metricsCollector.getThreadId()));
                     sb.append(String.format("\n        Worker State: %s", 
t.getState()));
                     TaskHealthState hs = t.healthState();
                     if (!hs.isReady()) {
                         sb.append(String.format("\n        Worker Last Error: 
%s", hs.buildStateMessage()));
                     }
-                    KafkaFetchRecords.GroupMetadata meta = 
t.getGroupMetadata();
+                    DefaultMetricsCollector.GroupMetadata meta = 
metricsCollector.getGroupMetadata();
                     if (meta != null) {
                         sb.append(String.format("\n        Group Id: %s", 
meta.groupId()));
                         sb.append(String.format("\n        Group Instance Id: 
%s", meta.groupInstanceId()));
                         sb.append(String.format("\n        Member Id: %s", 
meta.memberId()));
                         sb.append(String.format("\n        Generation Id: %d", 
meta.generationId()));
                     }
-                    if (t.getLastRecord() != null) {
-                        sb.append(String.format("\n        Last Topic: %s", 
t.getLastRecord().topic()));
-                        sb.append(String.format("\n        Last Partition: 
%d", t.getLastRecord().partition()));
-                        sb.append(String.format("\n        Last Offset: %d", 
t.getLastRecord().offset()));
+                    if (metricsCollector.getLastRecord() != null) {
+                        sb.append(String.format("\n        Last Topic: %s", 
metricsCollector.getLastRecord().topic()));
+                        sb.append(String.format("\n        Last Partition: 
%d", metricsCollector.getLastRecord().partition()));
+                        sb.append(String.format("\n        Last Offset: %d", 
metricsCollector.getLastRecord().offset()));
                     }
                     if (committed) {
-                        List<KafkaFetchRecords.KafkaTopicPosition> l = 
fetchCommitOffsets(kc, t);
+                        List<DefaultMetricsCollector.KafkaTopicPosition> l = 
fetchCommitOffsets(kc, metricsCollector);
                         if (l != null) {
-                            for (KafkaFetchRecords.KafkaTopicPosition r : l) {
+                            for (DefaultMetricsCollector.KafkaTopicPosition r 
: l) {
                                 sb.append(String.format("\n        Commit 
Topic: %s", r.topic()));
                                 sb.append(String.format("\n        Commit 
Partition: %s", r.partition()));
                                 sb.append(String.format("\n        Commit 
Offset: %s", r.offset()));
@@ -99,14 +102,15 @@ public class KafkaDevConsole extends AbstractDevConsole {
         return sb.toString();
     }
 
-    private static List<KafkaFetchRecords.KafkaTopicPosition> 
fetchCommitOffsets(KafkaConsumer kc, KafkaFetchRecords task) {
+    private static List<DefaultMetricsCollector.KafkaTopicPosition> 
fetchCommitOffsets(
+            KafkaConsumer kc, DevConsoleMetricsCollector collector) {
         StopWatch watch = new StopWatch();
 
-        CountDownLatch latch = task.fetchCommitRecords();
+        CountDownLatch latch = collector.fetchCommitRecords();
         long timeout = 
Math.min(kc.getEndpoint().getConfiguration().getPollTimeoutMs(), 
COMMITTED_TIMEOUT);
         try {
             latch.await(timeout, TimeUnit.MILLISECONDS);
-            var answer = task.getCommitRecords();
+            var answer = collector.getCommitRecords();
             LOG.debug("Fetching commit offsets took: {} ms", watch.taken());
             return answer;
         } catch (Exception e) {
@@ -134,31 +138,33 @@ public class KafkaDevConsole extends AbstractDevConsole {
                 jo.put("workers", arr);
 
                 for (KafkaFetchRecords t : kc.tasks()) {
+                    final DevConsoleMetricsCollector metricsCollector = 
t.getMetricsCollector();
+
                     JsonObject wo = new JsonObject();
                     arr.add(wo);
-                    wo.put("threadId", t.getThreadId());
+                    wo.put("threadId", metricsCollector.getThreadId());
                     wo.put("state", t.getState());
                     TaskHealthState hs = t.healthState();
                     if (!hs.isReady()) {
                         wo.put("lastError", hs.buildStateMessage());
                     }
-                    KafkaFetchRecords.GroupMetadata meta = 
t.getGroupMetadata();
+                    DefaultMetricsCollector.GroupMetadata meta = 
metricsCollector.getGroupMetadata();
                     if (meta != null) {
                         wo.put("groupId", meta.groupId());
                         wo.put("groupInstanceId", meta.groupInstanceId());
                         wo.put("memberId", meta.memberId());
                         wo.put("generationId", meta.generationId());
                     }
-                    if (t.getLastRecord() != null) {
-                        wo.put("lastTopic", t.getLastRecord().topic());
-                        wo.put("lastPartition", t.getLastRecord().partition());
-                        wo.put("lastOffset", t.getLastRecord().offset());
+                    if (metricsCollector.getLastRecord() != null) {
+                        wo.put("lastTopic", 
metricsCollector.getLastRecord().topic());
+                        wo.put("lastPartition", 
metricsCollector.getLastRecord().partition());
+                        wo.put("lastOffset", 
metricsCollector.getLastRecord().offset());
                     }
                     if (committed) {
-                        List<KafkaFetchRecords.KafkaTopicPosition> l = 
fetchCommitOffsets(kc, t);
+                        List<DefaultMetricsCollector.KafkaTopicPosition> l = 
fetchCommitOffsets(kc, metricsCollector);
                         if (l != null) {
                             JsonArray ca = new JsonArray();
-                            for (KafkaFetchRecords.KafkaTopicPosition r : l) {
+                            for (DefaultMetricsCollector.KafkaTopicPosition r 
: l) {
                                 JsonObject cr = new JsonObject();
                                 cr.put("topic", r.topic());
                                 cr.put("partition", r.partition());
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index fc357f60c8a..3534fccf3f4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -17,21 +17,17 @@
 package org.apache.camel.component.kafka;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManagers;
+import 
org.apache.camel.component.kafka.consumer.devconsole.DefaultMetricsCollector;
+import 
org.apache.camel.component.kafka.consumer.devconsole.DevConsoleMetricsCollector;
+import 
org.apache.camel.component.kafka.consumer.devconsole.NoopMetricsCollector;
 import 
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
 import 
org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
 import 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
@@ -52,7 +48,6 @@ import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -63,8 +58,6 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.rmi.registry.LocateRegistry.getRegistry;
-
 public class KafkaFetchRecords implements Runnable {
 
     /*
@@ -105,19 +98,7 @@ public class KafkaFetchRecords implements Runnable {
     private volatile boolean connected; // this is the state (connected or not)
     private volatile State state = State.RUNNING;
 
-    // dev-console records and state
-    record GroupMetadata(String groupId, String groupInstanceId, String 
memberId, int generationId) {
-    }
-
-    record KafkaTopicPosition(String topic, int partition, long offset, int 
epoch) {
-    }
-
-    private final boolean devConsoleEnabled;
-    private volatile GroupMetadata groupMetadata;
-    private volatile KafkaTopicPosition lastRecord;
-    private final List<KafkaTopicPosition> commitRecords = new ArrayList<>();
-    private final AtomicBoolean commitRecordsRequested = new AtomicBoolean();
-    private final AtomicReference<CountDownLatch> latch = new 
AtomicReference<>();
+    private final DevConsoleMetricsCollector metricsCollector;
 
     KafkaFetchRecords(KafkaConsumer kafkaConsumer,
                       BridgeExceptionHandlerToErrorHandler bridge, String 
topicName, Pattern topicPattern, String id,
@@ -129,7 +110,13 @@ public class KafkaFetchRecords implements Runnable {
         this.consumerListener = consumerListener;
         this.threadId = topicName + "-" + "Thread " + id;
         this.kafkaProps = kafkaProps;
-        this.devConsoleEnabled = 
kafkaConsumer.getEndpoint().getCamelContext().isDevConsole();
+        final boolean devConsoleEnabled = 
kafkaConsumer.getEndpoint().getCamelContext().isDevConsole();
+
+        if (devConsoleEnabled) {
+            metricsCollector = new DefaultMetricsCollector(threadId);
+        } else {
+            metricsCollector = new NoopMetricsCollector();
+        }
     }
 
     @Override
@@ -191,13 +178,8 @@ public class KafkaFetchRecords implements Runnable {
                 setConnected(true);
             }
 
-            if (devConsoleEnabled && isConnected()) {
-                // store metadata
-                ConsumerGroupMetadata meta = consumer.groupMetadata();
-                if (meta != null) {
-                    groupMetadata = new GroupMetadata(
-                            meta.groupId(), meta.groupInstanceId().orElse(""), 
meta.memberId(), meta.generationId());
-                }
+            if (isConnected()) {
+                metricsCollector.storeMetadata(consumer);
             }
 
             setLastError(null);
@@ -377,9 +359,7 @@ public class KafkaFetchRecords implements Runnable {
 
                 // if dev-console is in use then a request to fetch the commit 
offsets can be requested on-demand
                 // which must happen using this polling thread, so we use the 
commitRecordsRequested to trigger this
-                if (devConsoleEnabled) {
-                    collectCommitMetrics();
-                }
+                metricsCollector.collectCommitMetrics(consumer);
 
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
                 if (consumerListener != null) {
@@ -389,9 +369,8 @@ public class KafkaFetchRecords implements Runnable {
                 }
 
                 ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords);
-                if (devConsoleEnabled && result != null && result.getTopic() 
!= null) {
-                    // dev-console uses information from last processed record
-                    lastRecord = new KafkaTopicPosition(result.getTopic(), 
result.getPartition(), result.getOffset(), 0);
+                if (result != null && result.getTopic() != null) {
+                    metricsCollector.storeLastRecord(result);
                 }
                 updateTaskState();
 
@@ -448,31 +427,6 @@ public class KafkaFetchRecords implements Runnable {
         }
     }
 
-    private void collectCommitMetrics() {
-        if (commitRecordsRequested.compareAndSet(true, false)) {
-            try {
-                Map<TopicPartition, OffsetAndMetadata> commits = 
consumer.committed(consumer.assignment());
-                commitRecords.clear();
-                for (var e : commits.entrySet()) {
-                    KafkaTopicPosition p
-                            = new KafkaTopicPosition(
-                            e.getKey().topic(), e.getKey().partition(), 
e.getValue().offset(),
-                            e.getValue().leaderEpoch().orElse(0));
-                    commitRecords.add(p);
-                }
-                CountDownLatch count = latch.get();
-                if (count != null) {
-                    count.countDown();
-                }
-            } catch (Exception e) {
-                // ignore cannot get last commit details
-                LOG.debug(
-                        "Cannot get last offset committed from Kafka brokers 
due to: {}. This exception is ignored.",
-                        e.getMessage(), e);
-            }
-        }
-    }
-
     private KafkaRecordProcessorFacade createRecordProcessor() {
         final KafkaConfiguration configuration = 
kafkaConsumer.getEndpoint().getConfiguration();
         if (configuration.isBatching()) {
@@ -701,36 +655,15 @@ public class KafkaFetchRecords implements Runnable {
         this.lastError = lastError;
     }
 
-    // dev console information
-    // ------------------------------------------------------------------------
-
-    GroupMetadata getGroupMetadata() {
-        return groupMetadata;
-    }
-
-    KafkaTopicPosition getLastRecord() {
-        return lastRecord;
-    }
-
-    String getThreadId() {
-        return threadId;
+    /**
+     * Gets the metrics collector for the dev console. Defaults to the {@link 
NoopMetricsCollector} unless the dev
+     * console is enabled
+     */
+    public DevConsoleMetricsCollector getMetricsCollector() {
+        return metricsCollector;
     }
 
     String getState() {
         return state.name();
     }
-
-    List<KafkaTopicPosition> getCommitRecords() {
-        return Collections.unmodifiableList(commitRecords);
-    }
-
-    CountDownLatch fetchCommitRecords() {
-        // use a latch to wait for commit records to be ready
-        // as the consumer thread must be calling Kafka brokers to get this 
information
-        // so this thread need to wait for that to be complete
-        CountDownLatch answer = new CountDownLatch(1);
-        latch.set(answer);
-        commitRecordsRequested.set(true);
-        return answer;
-    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DefaultMetricsCollector.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DefaultMetricsCollector.java
new file mode 100644
index 00000000000..f00367bc9a0
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DefaultMetricsCollector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.devconsole;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The default collector if the dev console is enabled for Kafka
+ */
+public class DefaultMetricsCollector implements DevConsoleMetricsCollector {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultMetricsCollector.class);
+    private final String threadId;
+
+    public DefaultMetricsCollector(String threadId) {
+        this.threadId = threadId;
+    }
+
+    // dev-console records and state
+
+    private volatile GroupMetadata groupMetadata;
+    private volatile KafkaTopicPosition lastRecord;
+    private final List<KafkaTopicPosition> commitRecords = new ArrayList<>();
+    private final AtomicBoolean commitRecordsRequested = new AtomicBoolean();
+    private final AtomicReference<CountDownLatch> latch = new 
AtomicReference<>();
+
+    @Override
+    public void storeMetadata(Consumer<?, ?> consumer) {
+        // store metadata
+        ConsumerGroupMetadata meta = consumer.groupMetadata();
+        if (meta != null) {
+            groupMetadata = new GroupMetadata(
+                    meta.groupId(), meta.groupInstanceId().orElse(""), 
meta.memberId(), meta.generationId());
+        }
+    }
+
+    @Override
+    public void storeLastRecord(ProcessingResult result) {
+        // dev-console uses information from last processed record
+        lastRecord = new KafkaTopicPosition(result.getTopic(), 
result.getPartition(), result.getOffset(), 0);
+    }
+
+    @Override
+    public void collectCommitMetrics(Consumer<?, ?> consumer) {
+        if (commitRecordsRequested.compareAndSet(true, false)) {
+            try {
+                Map<TopicPartition, OffsetAndMetadata> commits = 
consumer.committed(consumer.assignment());
+                commitRecords.clear();
+                for (var e : commits.entrySet()) {
+                    KafkaTopicPosition p
+                            = new KafkaTopicPosition(
+                                    e.getKey().topic(), 
e.getKey().partition(), e.getValue().offset(),
+                                    e.getValue().leaderEpoch().orElse(0));
+                    commitRecords.add(p);
+                }
+                CountDownLatch count = latch.get();
+                if (count != null) {
+                    count.countDown();
+                }
+            } catch (Exception e) {
+                // ignore cannot get last commit details
+                LOG.debug(
+                        "Cannot get last offset committed from Kafka brokers 
due to: {}. This exception is ignored.",
+                        e.getMessage(), e);
+            }
+        }
+    }
+
+    // dev console information
+    // ------------------------------------------------------------------------
+
+    @Override
+    public GroupMetadata getGroupMetadata() {
+        return groupMetadata;
+    }
+
+    @Override
+    public KafkaTopicPosition getLastRecord() {
+        return lastRecord;
+    }
+
+    @Override
+    public String getThreadId() {
+        return threadId;
+    }
+
+    @Override
+    public List<KafkaTopicPosition> getCommitRecords() {
+        return Collections.unmodifiableList(commitRecords);
+    }
+
+    @Override
+    public CountDownLatch fetchCommitRecords() {
+        // use a latch to wait for commit records to be ready
+        // as the consumer thread must be calling Kafka brokers to get this 
information
+        // so this thread need to wait for that to be complete
+        CountDownLatch answer = new CountDownLatch(1);
+        latch.set(answer);
+        commitRecordsRequested.set(true);
+        return answer;
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DevConsoleMetricsCollector.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DevConsoleMetricsCollector.java
new file mode 100644
index 00000000000..9a602532533
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DevConsoleMetricsCollector.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.devconsole;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.kafka.clients.consumer.Consumer;
+
+/**
+ * Collects metrics for the dev console
+ */
+public interface DevConsoleMetricsCollector {
+
+    void storeMetadata(Consumer<?, ?> consumer);
+
+    void storeLastRecord(ProcessingResult result);
+
+    void collectCommitMetrics(Consumer<?, ?> consumer);
+
+    DefaultMetricsCollector.GroupMetadata getGroupMetadata();
+
+    DefaultMetricsCollector.KafkaTopicPosition getLastRecord();
+
+    String getThreadId();
+
+    List<DefaultMetricsCollector.KafkaTopicPosition> getCommitRecords();
+
+    CountDownLatch fetchCommitRecords();
+
+    record GroupMetadata(String groupId, String groupInstanceId, String 
memberId, int generationId) {
+    }
+
+    record KafkaTopicPosition(String topic, int partition, long offset, int 
epoch) {
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/NoopMetricsCollector.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/NoopMetricsCollector.java
new file mode 100644
index 00000000000..993fc0867f4
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/NoopMetricsCollector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.devconsole;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.kafka.clients.consumer.Consumer;
+
+/**
+ * A NO-OP collector that is used if the metrics collector is disabled
+ */
+public class NoopMetricsCollector implements DevConsoleMetricsCollector {
+    @Override
+    public void storeMetadata(Consumer<?, ?> consumer) {
+        // NO-OP
+    }
+
+    @Override
+    public void storeLastRecord(ProcessingResult result) {
+        // NO-OP
+    }
+
+    @Override
+    public void collectCommitMetrics(Consumer<?, ?> consumer) {
+        // NO-OP
+    }
+
+    @Override
+    public GroupMetadata getGroupMetadata() {
+        return null;
+    }
+
+    @Override
+    public KafkaTopicPosition getLastRecord() {
+        return null;
+    }
+
+    @Override
+    public String getThreadId() {
+        return "";
+    }
+
+    @Override
+    public List<KafkaTopicPosition> getCommitRecords() {
+        return List.of();
+    }
+
+    @Override
+    public CountDownLatch fetchCommitRecords() {
+        return null;
+    }
+}

Reply via email to