This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kafka-console
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d88937d3f01221a9f9d41339a32161e2db242f96
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Jul 9 12:09:15 2024 +0200

    CAMEL-20956: camel-kafka - Add dev console
---
 components/camel-kafka/pom.xml                     |   4 +
 .../org/apache/camel/dev-console/kafka.json        |  15 +++
 .../services/org/apache/camel/dev-console/kafka    |   2 +
 .../org/apache/camel/dev-consoles.properties       |   7 ++
 .../camel/component/kafka/KafkaConsumer.java       |   5 +
 .../camel/component/kafka/KafkaDevConsole.java     | 117 +++++++++++++++++++++
 .../camel/component/kafka/KafkaFetchRecords.java   |  47 ++++++++-
 .../component/kafka/consumer/CommitManager.java    |   1 -
 .../support/KafkaRecordProcessorFacade.java        |   2 +-
 .../kafka/consumer/support/ProcessingResult.java   |  31 +++++-
 .../kafka/consumer/support/TopicHelper.java        |   3 +-
 .../batching/KafkaRecordBatchingProcessor.java     |   1 -
 .../support/classic/ClassicRebalanceListener.java  |   2 +-
 .../classic/PartitionAssignmentAdapter.java        |   3 +
 .../support/resume/ResumeRebalanceListener.java    |   2 +-
 .../streaming/KafkaRecordStreamingProcessor.java   |   7 +-
 .../subcription/DefaultSubscribeAdapter.java       |   2 +-
 .../support/subcription/SubscribeAdapter.java      |   2 +-
 18 files changed, 238 insertions(+), 15 deletions(-)

diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 55b8986d86a..e64ee2bd4e6 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -42,6 +42,10 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-health</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-console</artifactId>
+        </dependency>
 
         <!-- kafka java client -->
         <dependency>
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json
 
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json
new file mode 100644
index 00000000000..55e93212a3c
--- /dev/null
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json
@@ -0,0 +1,15 @@
+{
+  "console": {
+    "kind": "console",
+    "group": "camel",
+    "name": "kafka",
+    "title": "Kafka",
+    "description": "Apache Kafka",
+    "deprecated": false,
+    "javaType": "org.apache.camel.component.kafka.KafkaDevConsole",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-kafka",
+    "version": "4.7.0-SNAPSHOT"
+  }
+}
+
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka
 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka
new file mode 100644
index 00000000000..975921645c6
--- /dev/null
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.kafka.KafkaDevConsole
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
new file mode 100644
index 00000000000..3f7bbe40841
--- /dev/null
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+dev-consoles=kafka
+groupId=org.apache.camel
+artifactId=camel-kafka
+version=4.7.0-SNAPSHOT
+projectName=Camel :: Kafka
+projectDescription=Camel Kafka support
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index f0c75bd3eb6..9ea0e1502ca 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -256,6 +257,10 @@ public class KafkaConsumer extends DefaultConsumer
         return tasks.stream().allMatch(KafkaFetchRecords::isPaused);
     }
 
+    protected List<KafkaFetchRecords> tasks() {
+        return Collections.unmodifiableList(tasks);
+    }
+
     @Override
     public String adapterFactoryService() {
         return "kafka-adapter-factory";
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
new file mode 100644
index 00000000000..86631f915ac
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Route;
+import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.console.AbstractDevConsole;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+
+@DevConsole(name = "kafka", displayName = "Kafka", description = "Apache 
Kafka")
+public class KafkaDevConsole extends AbstractDevConsole {
+
+    public KafkaDevConsole() {
+        super("camel", "kafka", "Kafka", "Apache Kafka");
+    }
+
+    @Override
+    protected String doCallText(Map<String, Object> options) {
+        StringBuilder sb = new StringBuilder();
+        for (Route route : getCamelContext().getRoutes()) {
+            if (route.getConsumer() instanceof KafkaConsumer kc) {
+                sb.append(String.format("\n    Route Id: %s", 
route.getRouteId()));
+                sb.append(String.format("\n    From: %s", 
route.getEndpoint().getEndpointUri()));
+                sb.append(
+                        String.format("\n    State: %s", 
getCamelContext().getRouteController().getRouteStatus(route.getId())));
+                sb.append(String.format("\n    Uptime: %s", 
route.getUptime()));
+                for (KafkaFetchRecords t : kc.tasks()) {
+                    sb.append(String.format("\n        Worked Thread: %s", 
t.getThreadId()));
+                    sb.append(String.format("\n        Worker State: %s", 
t.getState()));
+                    if (t.getLastError() != null) {
+                        sb.append(String.format("\n        Worker Last Error: 
%s", t.getLastError().getMessage()));
+                    }
+                    KafkaFetchRecords.GroupMetadata meta = 
t.getGroupMetadata();
+                    if (meta != null) {
+                        sb.append(String.format("\n        Group Id: %s", 
meta.groupId()));
+                        sb.append(String.format("\n        Group Instance Id: 
%s", meta.groupInstanceId()));
+                        sb.append(String.format("\n        Member Id: %s", 
meta.memberId()));
+                        sb.append(String.format("\n        Generation Id: %d", 
meta.generationId()));
+                    }
+                    if (t.getLastRecord() != null) {
+                        sb.append(String.format("\n        Last Topic: %s", 
t.getLastRecord().topic()));
+                        sb.append(String.format("\n        Last Partition: 
%d", t.getLastRecord().partition()));
+                        sb.append(String.format("\n        Last Offset: %d", 
t.getLastRecord().offset()));
+                    }
+                }
+                sb.append("\n");
+            }
+        }
+
+        return sb.toString();
+    }
+
+    @Override
+    protected Map<String, Object> doCallJson(Map<String, Object> options) {
+        JsonObject root = new JsonObject();
+
+        List<JsonObject> list = new ArrayList<>();
+        root.put("kafkaConsumers", list);
+
+        for (Route route : getCamelContext().getRoutes()) {
+            if (route.getConsumer() instanceof KafkaConsumer kc) {
+                JsonObject jo = new JsonObject();
+                jo.put("routeId", route.getRouteId());
+                jo.put("uri", route.getEndpoint().getEndpointUri());
+                jo.put("state", 
getCamelContext().getRouteController().getRouteStatus(route.getId()));
+                jo.put("uptime", route.getUptime());
+
+                JsonArray arr = new JsonArray();
+                jo.put("workers", arr);
+
+                for (KafkaFetchRecords t : kc.tasks()) {
+                    JsonObject wo = new JsonObject();
+                    arr.add(wo);
+                    wo.put("threadId", t.getThreadId());
+                    wo.put("state", t.getState());
+                    if (t.getLastError() != null) {
+                        wo.put("lastError", t.getLastError().getMessage());
+                    }
+                    KafkaFetchRecords.GroupMetadata meta = 
t.getGroupMetadata();
+                    if (meta != null) {
+                        wo.put("groupId", meta.groupId());
+                        wo.put("groupInstanceId", meta.groupInstanceId());
+                        wo.put("memberId", meta.memberId());
+                        wo.put("generationId", meta.generationId());
+                    }
+                    if (t.getLastRecord() != null) {
+                        wo.put("lastTopic", t.getLastRecord().topic());
+                        wo.put("lastPartition", t.getLastRecord().partition());
+                        wo.put("lastOffset", t.getLastRecord().offset());
+                    }
+                }
+                list.add(jo);
+            }
+        }
+        return root;
+    }
+
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index a40a29ebbd6..9d64ed69dbc 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -45,6 +45,8 @@ import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -92,12 +94,19 @@ public class KafkaFetchRecords implements Runnable {
 
     private volatile boolean terminated;
     private volatile long currentBackoffInterval;
-
     private volatile boolean reconnect; // The reconnect must be false at init 
(this is the policy whether to reconnect).
     private volatile boolean connected; // this is the state (connected or not)
-
     private volatile State state = State.RUNNING;
 
+    record GroupMetadata(String groupId, String groupInstanceId, String 
memberId, int generationId) {
+    }
+
+    record LastRecord(String topic, int partition, long offset) {
+    }
+
+    private volatile GroupMetadata groupMetadata;
+    private volatile LastRecord lastRecord;
+
     KafkaFetchRecords(KafkaConsumer kafkaConsumer,
                       BridgeExceptionHandlerToErrorHandler bridge, String 
topicName, Pattern topicPattern, String id,
                       Properties kafkaProps, KafkaConsumerListener 
consumerListener) {
@@ -169,6 +178,15 @@ public class KafkaFetchRecords implements Runnable {
                 setConnected(true);
             }
 
+            if (isConnected()) {
+                // store metadata
+                ConsumerGroupMetadata meta = consumer.groupMetadata();
+                if (meta != null) {
+                    groupMetadata = new GroupMetadata(
+                            meta.groupId(), meta.groupInstanceId().orElse(""), 
meta.memberId(), meta.generationId());
+                }
+            }
+
             setLastError(null);
             startPolling();
         } while ((pollExceptionStrategy.canContinue() || isReconnect()) && 
isKafkaConsumerRunnable());
@@ -351,6 +369,9 @@ public class KafkaFetchRecords implements Runnable {
                 }
 
                 ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords);
+                if (result != null && result.getTopic() != null) {
+                    lastRecord = new LastRecord(result.getTopic(), 
result.getPartition(), result.getOffset());
+                }
                 updateTaskState();
 
                 // when breakOnFirstError we want to unsubscribe from Kafka
@@ -494,7 +515,7 @@ public class KafkaFetchRecords implements Runnable {
         return kafkaConsumer.isRunAllowed() && 
!kafkaConsumer.isStoppingOrStopped();
     }
 
-    private boolean isReconnect() {
+    boolean isReconnect() {
         return reconnect;
     }
 
@@ -633,4 +654,24 @@ public class KafkaFetchRecords implements Runnable {
     private synchronized void setLastError(Exception lastError) {
         this.lastError = lastError;
     }
+
+    Exception getLastError() {
+        return lastError;
+    }
+
+    GroupMetadata getGroupMetadata() {
+        return groupMetadata;
+    }
+
+    public LastRecord getLastRecord() {
+        return lastRecord;
+    }
+
+    String getThreadId() {
+        return threadId;
+    }
+
+    String getState() {
+        return state.name();
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
index a8b8a776aa3..6e94d33ac56 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer;
 
 import org.apache.camel.Exchange;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 41a747b0ef9..89416f86c49 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -23,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
  * A processing facade that allows processing consumer records in different 
ways
  */
 public interface KafkaRecordProcessorFacade {
+
     /**
      * Sends a set of records polled from Kafka for processing
      *
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 4be6b8ba7eb..4979ff312c9 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 /**
  * Holds the result of processing one or more consumer records
  */
 public final class ProcessingResult {
+
     private static final ProcessingResult UNPROCESSED_RESULT
             = new ProcessingResult(false, false);
 
     private final boolean breakOnErrorHit;
     private final boolean failed;
+    private final String topic;
+    private final int partition;
+    private final long offset;
 
     /**
      * Constructs a new processing result
@@ -34,8 +37,22 @@ public final class ProcessingResult {
      * @param failed          whether processing has failed
      */
     public ProcessingResult(boolean breakOnErrorHit, boolean failed) {
+        this(breakOnErrorHit, failed, null, 0, 0);
+    }
+
+    /**
+     * Constructs a new processing result
+     *
+     * @param breakOnErrorHit break on error hit setting
+     * @param failed          whether processing has failed
+     */
+    public ProcessingResult(boolean breakOnErrorHit, boolean failed, String 
topic, int partition, long offset) {
         this.breakOnErrorHit = breakOnErrorHit;
         this.failed = failed;
+        this.topic = topic;
+        ;
+        this.partition = partition;
+        this.offset = offset;
     }
 
     public boolean isBreakOnErrorHit() {
@@ -46,6 +63,18 @@ public final class ProcessingResult {
         return failed;
     }
 
+    public String getTopic() {
+        return topic;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
     public static ProcessingResult newUnprocessed() {
         return UNPROCESSED_RESULT;
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java
index fc7c16d5d97..3e21afbf19a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import java.util.Collection;
@@ -23,8 +22,8 @@ import java.util.regex.Pattern;
 import org.apache.camel.component.kafka.consumer.support.subcription.TopicInfo;
 
 public final class TopicHelper {
-    private TopicHelper() {
 
+    private TopicHelper() {
     }
 
     public static String getPrintableTopic(Pattern topicPattern, 
Collection<String> topics) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index ed924dc78dd..3e10a65e0ea 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -147,7 +147,6 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         // None of the states provided by the processing result are relevant 
for batch processing. We can simply return the
         // default state
         return ProcessingResult.newUnprocessed();
-
     }
 
     private boolean hasExpiredRecords(ConsumerRecords<Object, Object> 
consumerRecords) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
index ba11d1a7657..2a8b3ca49c8 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.classic;
 
 import java.util.Collection;
@@ -28,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClassicRebalanceListener implements ConsumerRebalanceListener {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ClassicRebalanceListener.class);
 
     private final String threadId;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
index 84f25bce7e7..634f59951ca 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
@@ -36,5 +36,8 @@ public interface PartitionAssignmentAdapter {
      */
     void setConsumer(Consumer<?, ?> consumer);
 
+    /**
+     * Callback for custom logic when partitions has been assigned.
+     */
     void handlePartitionAssignment();
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
index 41aa7de74a8..e8847c83238 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.resume;
 
 import java.util.Collection;
@@ -29,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ResumeRebalanceListener implements ConsumerRebalanceListener {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ResumeRebalanceListener.class);
 
     private final String threadId;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
index f9eabf13ce2..9b8be56a523 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
@@ -86,9 +86,12 @@ final class KafkaRecordStreamingProcessor extends 
KafkaRecordProcessor {
             final ExceptionHandler exceptionHandler = 
camelKafkaConsumer.getExceptionHandler();
 
             boolean breakOnErrorExit = processException(exchange, 
topicPartition, consumerRecord, exceptionHandler);
-            result = new ProcessingResult(breakOnErrorExit, true);
+            result = new ProcessingResult(
+                    breakOnErrorExit, true, consumerRecord.topic(), 
consumerRecord.partition(), consumerRecord.offset());
         } else {
-            result = new ProcessingResult(false, exchange.getException() != 
null);
+            result = new ProcessingResult(
+                    false, exchange.getException() != null, 
consumerRecord.topic(), consumerRecord.partition(),
+                    consumerRecord.offset());
         }
 
         if (!result.isBreakOnErrorHit()) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
index 07fb5f2ef30..ce4c78ed36d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.subcription;
 
 import org.apache.camel.component.kafka.consumer.support.TopicHelper;
@@ -24,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DefaultSubscribeAdapter implements SubscribeAdapter {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSubscribeAdapter.class);
 
     @Override
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java
index ac257f535af..a7b3e4a94bb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.subcription;
 
 import org.apache.kafka.clients.consumer.Consumer;
@@ -24,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
  * A pluggable adapter for handling custom subscriptions
  */
 public interface SubscribeAdapter {
+
     /**
      * Handle the subscription to a Kafka topic
      *

Reply via email to