This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kafka-console in repository https://gitbox.apache.org/repos/asf/camel.git
commit d88937d3f01221a9f9d41339a32161e2db242f96 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Jul 9 12:09:15 2024 +0200 CAMEL-20956: camel-kafka - Add dev console --- components/camel-kafka/pom.xml | 4 + .../org/apache/camel/dev-console/kafka.json | 15 +++ .../services/org/apache/camel/dev-console/kafka | 2 + .../org/apache/camel/dev-consoles.properties | 7 ++ .../camel/component/kafka/KafkaConsumer.java | 5 + .../camel/component/kafka/KafkaDevConsole.java | 117 +++++++++++++++++++++ .../camel/component/kafka/KafkaFetchRecords.java | 47 ++++++++- .../component/kafka/consumer/CommitManager.java | 1 - .../support/KafkaRecordProcessorFacade.java | 2 +- .../kafka/consumer/support/ProcessingResult.java | 31 +++++- .../kafka/consumer/support/TopicHelper.java | 3 +- .../batching/KafkaRecordBatchingProcessor.java | 1 - .../support/classic/ClassicRebalanceListener.java | 2 +- .../classic/PartitionAssignmentAdapter.java | 3 + .../support/resume/ResumeRebalanceListener.java | 2 +- .../streaming/KafkaRecordStreamingProcessor.java | 7 +- .../subcription/DefaultSubscribeAdapter.java | 2 +- .../support/subcription/SubscribeAdapter.java | 2 +- 18 files changed, 238 insertions(+), 15 deletions(-) diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml index 55b8986d86a..e64ee2bd4e6 100644 --- a/components/camel-kafka/pom.xml +++ b/components/camel-kafka/pom.xml @@ -42,6 +42,10 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-health</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-console</artifactId> + </dependency> <!-- kafka java client --> <dependency> diff --git a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json new file mode 100644 index 00000000000..55e93212a3c --- /dev/null +++ b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json @@ -0,0 +1,15 @@ +{ + "console": { + "kind": "console", + "group": "camel", + "name": "kafka", + "title": "Kafka", + "description": "Apache Kafka", + "deprecated": false, + "javaType": "org.apache.camel.component.kafka.KafkaDevConsole", + "groupId": "org.apache.camel", + "artifactId": "camel-kafka", + "version": "4.7.0-SNAPSHOT" + } +} + diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka new file mode 100644 index 00000000000..975921645c6 --- /dev/null +++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.kafka.KafkaDevConsole diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties new file mode 100644 index 00000000000..3f7bbe40841 --- /dev/null +++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties @@ -0,0 +1,7 @@ +# Generated by camel build tools - do NOT edit this file! +dev-consoles=kafka +groupId=org.apache.camel +artifactId=camel-kafka +version=4.7.0-SNAPSHOT +projectName=Camel :: Kafka +projectDescription=Camel Kafka support diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index f0c75bd3eb6..9ea0e1502ca 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kafka; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -256,6 +257,10 @@ public class KafkaConsumer extends DefaultConsumer return tasks.stream().allMatch(KafkaFetchRecords::isPaused); } + protected List<KafkaFetchRecords> tasks() { + return Collections.unmodifiableList(tasks); + } + @Override public String adapterFactoryService() { return "kafka-adapter-factory"; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java new file mode 100644 index 00000000000..86631f915ac --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.camel.Route; +import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; + +@DevConsole(name = "kafka", displayName = "Kafka", description = "Apache Kafka") +public class KafkaDevConsole extends AbstractDevConsole { + + public KafkaDevConsole() { + super("camel", "kafka", "Kafka", "Apache Kafka"); + } + + @Override + protected String doCallText(Map<String, Object> options) { + StringBuilder sb = new StringBuilder(); + for (Route route : getCamelContext().getRoutes()) { + if (route.getConsumer() instanceof KafkaConsumer kc) { + sb.append(String.format("\n Route Id: %s", route.getRouteId())); + sb.append(String.format("\n From: %s", route.getEndpoint().getEndpointUri())); + sb.append( + String.format("\n State: %s", getCamelContext().getRouteController().getRouteStatus(route.getId()))); + sb.append(String.format("\n Uptime: %s", route.getUptime())); + for (KafkaFetchRecords t : kc.tasks()) { + sb.append(String.format("\n Worked Thread: %s", t.getThreadId())); + sb.append(String.format("\n Worker State: %s", t.getState())); + if (t.getLastError() != null) { + sb.append(String.format("\n Worker Last Error: %s", t.getLastError().getMessage())); + } + KafkaFetchRecords.GroupMetadata meta = t.getGroupMetadata(); + if (meta != null) { + sb.append(String.format("\n Group Id: %s", meta.groupId())); + sb.append(String.format("\n Group Instance Id: %s", meta.groupInstanceId())); + sb.append(String.format("\n Member Id: %s", meta.memberId())); + sb.append(String.format("\n Generation Id: %d", meta.generationId())); + } + if (t.getLastRecord() != null) { + sb.append(String.format("\n Last Topic: %s", t.getLastRecord().topic())); + sb.append(String.format("\n Last Partition: %d", t.getLastRecord().partition())); + sb.append(String.format("\n Last Offset: %d", t.getLastRecord().offset())); + } + } + sb.append("\n"); + } + } + + return sb.toString(); + } + + @Override + protected Map<String, Object> doCallJson(Map<String, Object> options) { + JsonObject root = new JsonObject(); + + List<JsonObject> list = new ArrayList<>(); + root.put("kafkaConsumers", list); + + for (Route route : getCamelContext().getRoutes()) { + if (route.getConsumer() instanceof KafkaConsumer kc) { + JsonObject jo = new JsonObject(); + jo.put("routeId", route.getRouteId()); + jo.put("uri", route.getEndpoint().getEndpointUri()); + jo.put("state", getCamelContext().getRouteController().getRouteStatus(route.getId())); + jo.put("uptime", route.getUptime()); + + JsonArray arr = new JsonArray(); + jo.put("workers", arr); + + for (KafkaFetchRecords t : kc.tasks()) { + JsonObject wo = new JsonObject(); + arr.add(wo); + wo.put("threadId", t.getThreadId()); + wo.put("state", t.getState()); + if (t.getLastError() != null) { + wo.put("lastError", t.getLastError().getMessage()); + } + KafkaFetchRecords.GroupMetadata meta = t.getGroupMetadata(); + if (meta != null) { + wo.put("groupId", meta.groupId()); + wo.put("groupInstanceId", meta.groupInstanceId()); + wo.put("memberId", meta.memberId()); + wo.put("generationId", meta.generationId()); + } + if (t.getLastRecord() != null) { + wo.put("lastTopic", t.getLastRecord().topic()); + wo.put("lastPartition", t.getLastRecord().partition()); + wo.put("lastOffset", t.getLastRecord().offset()); + } + } + list.add(jo); + } + } + return root; + } + +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index a40a29ebbd6..9d64ed69dbc 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -45,6 +45,8 @@ import org.apache.camel.util.IOHelper; import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.TimeUtils; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -92,12 +94,19 @@ public class KafkaFetchRecords implements Runnable { private volatile boolean terminated; private volatile long currentBackoffInterval; - private volatile boolean reconnect; // The reconnect must be false at init (this is the policy whether to reconnect). private volatile boolean connected; // this is the state (connected or not) - private volatile State state = State.RUNNING; + record GroupMetadata(String groupId, String groupInstanceId, String memberId, int generationId) { + } + + record LastRecord(String topic, int partition, long offset) { + } + + private volatile GroupMetadata groupMetadata; + private volatile LastRecord lastRecord; + KafkaFetchRecords(KafkaConsumer kafkaConsumer, BridgeExceptionHandlerToErrorHandler bridge, String topicName, Pattern topicPattern, String id, Properties kafkaProps, KafkaConsumerListener consumerListener) { @@ -169,6 +178,15 @@ public class KafkaFetchRecords implements Runnable { setConnected(true); } + if (isConnected()) { + // store metadata + ConsumerGroupMetadata meta = consumer.groupMetadata(); + if (meta != null) { + groupMetadata = new GroupMetadata( + meta.groupId(), meta.groupInstanceId().orElse(""), meta.memberId(), meta.generationId()); + } + } + setLastError(null); startPolling(); } while ((pollExceptionStrategy.canContinue() || isReconnect()) && isKafkaConsumerRunnable()); @@ -351,6 +369,9 @@ public class KafkaFetchRecords implements Runnable { } ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords); + if (result != null && result.getTopic() != null) { + lastRecord = new LastRecord(result.getTopic(), result.getPartition(), result.getOffset()); + } updateTaskState(); // when breakOnFirstError we want to unsubscribe from Kafka @@ -494,7 +515,7 @@ public class KafkaFetchRecords implements Runnable { return kafkaConsumer.isRunAllowed() && !kafkaConsumer.isStoppingOrStopped(); } - private boolean isReconnect() { + boolean isReconnect() { return reconnect; } @@ -633,4 +654,24 @@ public class KafkaFetchRecords implements Runnable { private synchronized void setLastError(Exception lastError) { this.lastError = lastError; } + + Exception getLastError() { + return lastError; + } + + GroupMetadata getGroupMetadata() { + return groupMetadata; + } + + public LastRecord getLastRecord() { + return lastRecord; + } + + String getThreadId() { + return threadId; + } + + String getState() { + return state.name(); + } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java index a8b8a776aa3..6e94d33ac56 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer; import org.apache.camel.Exchange; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java index 41a747b0ef9..89416f86c49 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -23,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; * A processing facade that allows processing consumer records in different ways */ public interface KafkaRecordProcessorFacade { + /** * Sends a set of records polled from Kafka for processing * diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java index 4be6b8ba7eb..4979ff312c9 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java @@ -14,18 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support; /** * Holds the result of processing one or more consumer records */ public final class ProcessingResult { + private static final ProcessingResult UNPROCESSED_RESULT = new ProcessingResult(false, false); private final boolean breakOnErrorHit; private final boolean failed; + private final String topic; + private final int partition; + private final long offset; /** * Constructs a new processing result @@ -34,8 +37,22 @@ public final class ProcessingResult { * @param failed whether processing has failed */ public ProcessingResult(boolean breakOnErrorHit, boolean failed) { + this(breakOnErrorHit, failed, null, 0, 0); + } + + /** + * Constructs a new processing result + * + * @param breakOnErrorHit break on error hit setting + * @param failed whether processing has failed + */ + public ProcessingResult(boolean breakOnErrorHit, boolean failed, String topic, int partition, long offset) { this.breakOnErrorHit = breakOnErrorHit; this.failed = failed; + this.topic = topic; + ; + this.partition = partition; + this.offset = offset; } public boolean isBreakOnErrorHit() { @@ -46,6 +63,18 @@ public final class ProcessingResult { return failed; } + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; + } + + public long getOffset() { + return offset; + } + public static ProcessingResult newUnprocessed() { return UNPROCESSED_RESULT; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java index fc7c16d5d97..3e21afbf19a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support; import java.util.Collection; @@ -23,8 +22,8 @@ import java.util.regex.Pattern; import org.apache.camel.component.kafka.consumer.support.subcription.TopicInfo; public final class TopicHelper { - private TopicHelper() { + private TopicHelper() { } public static String getPrintableTopic(Pattern topicPattern, Collection<String> topics) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index ed924dc78dd..3e10a65e0ea 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -147,7 +147,6 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { // None of the states provided by the processing result are relevant for batch processing. We can simply return the // default state return ProcessingResult.newUnprocessed(); - } private boolean hasExpiredRecords(ConsumerRecords<Object, Object> consumerRecords) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java index ba11d1a7657..2a8b3ca49c8 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support.classic; import java.util.Collection; @@ -28,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClassicRebalanceListener implements ConsumerRebalanceListener { + private static final Logger LOG = LoggerFactory.getLogger(ClassicRebalanceListener.class); private final String threadId; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java index 84f25bce7e7..634f59951ca 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java @@ -36,5 +36,8 @@ public interface PartitionAssignmentAdapter { */ void setConsumer(Consumer<?, ?> consumer); + /** + * Callback for custom logic when partitions has been assigned. + */ void handlePartitionAssignment(); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java index 41aa7de74a8..e8847c83238 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support.resume; import java.util.Collection; @@ -29,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ResumeRebalanceListener implements ConsumerRebalanceListener { + private static final Logger LOG = LoggerFactory.getLogger(ResumeRebalanceListener.class); private final String threadId; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java index f9eabf13ce2..9b8be56a523 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java @@ -86,9 +86,12 @@ final class KafkaRecordStreamingProcessor extends KafkaRecordProcessor { final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); boolean breakOnErrorExit = processException(exchange, topicPartition, consumerRecord, exceptionHandler); - result = new ProcessingResult(breakOnErrorExit, true); + result = new ProcessingResult( + breakOnErrorExit, true, consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()); } else { - result = new ProcessingResult(false, exchange.getException() != null); + result = new ProcessingResult( + false, exchange.getException() != null, consumerRecord.topic(), consumerRecord.partition(), + consumerRecord.offset()); } if (!result.isBreakOnErrorHit()) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java index 07fb5f2ef30..ce4c78ed36d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support.subcription; import org.apache.camel.component.kafka.consumer.support.TopicHelper; @@ -24,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DefaultSubscribeAdapter implements SubscribeAdapter { + private static final Logger LOG = LoggerFactory.getLogger(DefaultSubscribeAdapter.class); @Override diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java index ac257f535af..a7b3e4a94bb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support.subcription; import org.apache.kafka.clients.consumer.Consumer; @@ -24,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; * A pluggable adapter for handling custom subscriptions */ public interface SubscribeAdapter { + /** * Handle the subscription to a Kafka topic *
