This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f86fcb8234a CAMEL-23789: Improve camel-kafka docs with multi-DSL tabs 
(#24096)
2f86fcb8234a is described below

commit 2f86fcb8234a13c00a45832b377f1752af1a2772
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jun 18 11:03:03 2026 +0200

    CAMEL-23789: Improve camel-kafka docs with multi-DSL tabs (#24096)
    
    CAMEL-23789: Improve camel-kafka docs with multi-DSL tabs and Java-only 
markers
    
    Signed-off-by: Claus Ibsen <[email protected]>
    Co-authored-by: Claude <[email protected]>
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 608 ++++++++++++++++-----
 1 file changed, 467 insertions(+), 141 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index cbc11a29d825..e4a2d1e3e3a5 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -83,14 +83,23 @@ How Camel handles a message that results in an exception 
can be altered using th
 Instead of continuing to poll the next message, Camel will instead commit the 
offset so that the message that caused the exception will be retried.
 This is similar to the *RETRY* polling strategy above.
 
+You can configure this on the component level, either programmatically in 
Java, or via configuration properties:
+
+._Java-only: programmatic component configuration_
 [source,java]
 ----
 KafkaComponent kafka = new KafkaComponent();
 kafka.setBreakOnFirstError(true);
-...
 camelContext.addComponent("kafka", kafka);
 ----
 
+Or using configuration properties:
+
+[source,properties]
+----
+camel.component.kafka.break-on-first-error=true
+----
+
 It is recommended that you read the section below "Using manual commit with 
Kafka consumer" to understand how `breakOnFirstError`
 will work based on the `CommitManager` that is configured.
 
@@ -122,23 +131,18 @@ To use, this repository must be placed in the Camel 
registry, either manually or
 
 Sample usage is as follows:
 
+NOTE: The `KafkaIdempotentRepository` bean must be registered in the Camel 
registry before it can be referenced by the route.
+
+._Java-only: registering the bean_
 [source,java]
 ----
 KafkaIdempotentRepository kafkaIdempotentRepository = new 
KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
 
 SimpleRegistry registry = new SimpleRegistry();
-registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be 
registered in the registry, to enable access to the CamelContext
-CamelContext context = new CamelContext(registry);
-
-// later in RouteBuilder...
-from("direct:performInsert")
-    .idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo")
-        // once-only insert into the database
-    .end()
+registry.put("insertDbIdemRepo", kafkaIdempotentRepository);
 ----
 
-In XML:
-
+._XML: registering the bean_
 [source,xml]
 ----
 <!-- simple -->
@@ -166,18 +170,63 @@ In XML:
 </bean>
 ----
 
+[tabs]
+====
+Java::
++
+[source,java]
+----
+from("direct:performInsert")
+    .idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo")
+        .to("sql:INSERT INTO ...")
+    .end();
+----
+
+XML::
++
+[source,xml]
+----
+<route>
+  <from uri="direct:performInsert"/>
+  <idempotentConsumer idempotentRepository="insertDbIdemRepo">
+    <header>id</header>
+    <to uri="sql:INSERT INTO ..."/>
+  </idempotentConsumer>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: direct:performInsert
+      steps:
+        - idempotentConsumer:
+            idempotentRepository: "#insertDbIdemRepo"
+            expression:
+              header: id
+            steps:
+              - to:
+                  uri: "sql:INSERT INTO ..."
+----
+====
+
 There are 3 alternatives to choose from when using idempotency with numeric 
identifiers. The first one is to use the static method `numericHeader` method 
from `org.apache.camel.component.kafka.serde.KafkaSerdeHelper` to perform the 
conversion for you:
 
+._Java-only: numericHeader helper method_
 [source,java]
 ----
 from("direct:performInsert")
     
.idempotentConsumer(numericHeader("id")).idempotentRepository("insertDbIdemRepo")
-        // once-only insert into the database
-    .end()
+        .to("sql:INSERT INTO ...")
+    .end();
 ----
 
 Alternatively, it is possible to use a custom serializer configured via the 
route URL to perform the conversion:
 
+._Java-only: custom header deserializer class_
 [source,java]
 ----
 public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
@@ -198,18 +247,18 @@ public class CustomHeaderDeserializer extends 
DefaultKafkaHeaderDeserializer {
 
 Lastly, it is also possible to do so in a processor:
 
+._Java-only: processor for header type conversion_
 [source,java]
 ----
-from(from).routeId("foo")
+from("kafka:my-topic")
     .process(exchange -> {
         byte[] id = exchange.getIn().getHeader("id", byte[].class);
-
         BigInteger bi = new BigInteger(id);
         exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
     })
     .idempotentConsumer(header("id"))
     .idempotentRepository("kafkaIdempotentRepository")
-    .to(to);
+    .to("direct:process");
 ----
 
 === Manual commits with the Kafka consumer
@@ -220,27 +269,38 @@ In case you want to force manual commits, you can use 
`KafkaManualCommit` API fr
 This requires turning on manual commits by either setting the option 
`allowManualCommit` to `true` on the `KafkaComponent`
 or on the endpoint, for example:
 
+._Java-only: programmatic component configuration_
 [source,java]
 ----
 KafkaComponent kafka = new KafkaComponent();
 kafka.setAutoCommitEnable(false);
 kafka.setAllowManualCommit(true);
-// ...
 camelContext.addComponent("kafka", kafka);
 ----
 
+Or using configuration properties:
+
+[source,properties]
+----
+camel.component.kafka.auto-commit-enable=false
+camel.component.kafka.allow-manual-commit=true
+----
+
 By default, it uses the `NoopCommitManager` behind the scenes. To commit an 
offset, you will
-require you to use the `KafkaManualCommit` from Java code such as a Camel 
`Processor`:
+need to use the `KafkaManualCommit` from Java code such as a Camel `Processor`:
 
+._Java-only: manual commit processor_
 [source,java]
 ----
 public void process(Exchange exchange) {
     KafkaManualCommit manual =
-        exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        exchange.getIn().getHeader("CamelKafkaManualCommit", 
KafkaManualCommit.class);
     manual.commit();
 }
 ----
 
+TIP: The header name `CamelKafkaManualCommit` is also available as the 
constant `KafkaConstants.MANUAL_COMMIT`.
+
 The `KafkaManualCommit` will force a synchronous commit which will block until 
the commit is acknowledged on Kafka, or if it fails an exception is thrown.
 You can use an asynchronous commit as well by configuring the 
`KafkaManualCommitFactory` with the `DefaultKafkaManualAsyncCommitFactory` 
implementation.
 
@@ -252,6 +312,7 @@ on the `KafkaComponent` that creates instances of your 
custom implementation.
 When configuring a consumer to use manual commit and a specific 
`CommitManager` it is important to understand how these influence the behavior
 of `breakOnFirstError`
 
+._Java-only: programmatic component configuration_
 [source,java]
 ----
 KafkaComponent kafka = new KafkaComponent();
@@ -259,10 +320,19 @@ kafka.setAutoCommitEnable(false);
 kafka.setAllowManualCommit(true);
 kafka.setBreakOnFirstError(true);
 kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory());
-...
 camelContext.addComponent("kafka", kafka);
 ----
 
+Or using configuration properties:
+
+[source,properties]
+----
+camel.component.kafka.auto-commit-enable=false
+camel.component.kafka.allow-manual-commit=true
+camel.component.kafka.break-on-first-error=true
+camel.component.kafka.kafka-manual-commit-factory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory
+----
+
 When the `CommitManager` is left to the default `NoopCommitManager` then 
`breakOnFirstError` will not automatically commit the offset so that the
 message with an error is retried. The consumer must manage that in the route 
using `KafkaManualCommit`.
 
@@ -281,13 +351,14 @@ exception with the message `KafkaConsumer is not safe for 
multi-threaded access`
 The Kafka component supports pausable consumers. This type of consumer can 
pause consuming data based on
 conditions external to the component itself, such as an external system being 
unavailable or other transient conditions.
 
+._Java-only: pausable consumer with lambda_
 [source,java]
 ----
 from("kafka:topic")
-    .pausable(new KafkaConsumerListener(), () -> canContinue()) // the 
pausable check gets called if the exchange fails to be processed ...
+    .pausable(new KafkaConsumerListener(), () -> canContinue())
     .routeId("pausable-route")
-    .process(this::process) // Kafka consumer will be paused if this one 
throws an exception ...
-    .to("some:destination"); // or this one
+    .process(this::process)
+    .to("some:destination");
 ----
 
 In this example, consuming messages can pause (by calling the Kafka's Consumer 
pause method) if the result from `canContinue` is false.
@@ -312,24 +383,84 @@ The following header value types are supported: `String`, 
`Integer`, `Long`, `Do
 Note: all headers propagated *from* kafka *to* camel exchange will contain 
`byte[]` value by default.
 To override default functionality, these uri parameters can be set: 
`headerDeserializer` for `from` route and `headerSerializer` for `to` route. 
For example:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("kafka:my_topic?headerDeserializer=#myDeserializer")
-...
-.to("kafka:my_topic?headerSerializer=#mySerializer")
+    .to("kafka:my_topic?headerSerializer=#mySerializer");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from uri="kafka:my_topic?headerDeserializer=#myDeserializer"/>
+  <to uri="kafka:my_topic?headerSerializer=#mySerializer"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my_topic
+      parameters:
+        headerDeserializer: "#myDeserializer"
+      steps:
+        - to:
+            uri: kafka:my_topic
+            parameters:
+              headerSerializer: "#mySerializer"
+----
+====
+
 By default, all headers are being filtered by `KafkaHeaderFilterStrategy`.
 Strategy filters out headers which start with `Camel` or `org.apache.camel` 
prefixes.
 Default strategy can be overridden by using `headerFilterStrategy` uri 
parameter in both `to` and `from` routes:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("kafka:my_topic?headerFilterStrategy=#myStrategy")
-...
-.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
+    .to("kafka:my_topic?headerFilterStrategy=#myStrategy");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/>
+  <to uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my_topic
+      parameters:
+        headerFilterStrategy: "#myStrategy"
+      steps:
+        - to:
+            uri: kafka:my_topic
+            parameters:
+              headerFilterStrategy: "#myStrategy"
+----
+====
+
 `myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be 
placed in the Camel registry, either manually or by registration as a bean in 
Spring, as it is `CamelContext` aware.
 
 === Kafka Transaction
@@ -383,6 +514,7 @@ If both `transacted=true` and `transactionalId` are 
present, the latter takes pr
 
 Configure the 'krb5.conf' file directly through the API:
 
+._Java-only: static Kerberos configuration_
 [source,java]
 ----
 static {
@@ -390,6 +522,13 @@ static {
 }
 ----
 
+Alternatively, you can set the JVM system property:
+
+[source,properties]
+----
+java.security.krb5.conf=/path/to/config/file
+----
+
 === Authentication to Kafka
 
 Kafka supports several ways to authenticate the clients to the server, 
including plain text, PKI (certificates) over TLS, you can refer to the 
https://kafka.apache.org/documentation/#security_sasl[Kafka documentation] for 
a detailed view of the supported mechanisms. The kafka authentication and 
authorization is based on JAAS, so you must use a JAAS Login Module 
implementation on the client side.
@@ -464,16 +603,45 @@ camel.component.kafka.sasl-password=mypassword
 
 *OAuth Authentication Example*
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
-from("kafka:my-topic?brokers=localhost:9092" +
-     "&saslAuthType=OAUTH" +
-     "&oauthClientId=my-client" +
-     "&oauthClientSecret=my-secret" +
-     "&oauthTokenEndpointUri=https://auth.example.com/oauth/token";)
+from("kafka:my-topic?brokers=localhost:9092&saslAuthType=OAUTH&oauthClientId=my-client&oauthClientSecret=my-secret&oauthTokenEndpointUri=https://auth.example.com/oauth/token";)
     .to("log:received");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from 
uri="kafka:my-topic?brokers=localhost:9092&amp;saslAuthType=OAUTH&amp;oauthClientId=my-client&amp;oauthClientSecret=my-secret&amp;oauthTokenEndpointUri=https://auth.example.com/oauth/token"/>
+  <to uri="log:received"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my-topic
+      parameters:
+        brokers: "localhost:9092"
+        saslAuthType: OAUTH
+        oauthClientId: my-client
+        oauthClientSecret: my-secret
+        oauthTokenEndpointUri: "https://auth.example.com/oauth/token";
+      steps:
+        - to:
+            uri: log:received
+----
+====
+
 *AWS MSK IAM Authentication*
 
 When using AWS MSK with IAM authentication, ensure the `aws-msk-iam-auth` 
library is on the classpath:
@@ -600,26 +768,23 @@ By default, Camel uses automatic commits when using batch 
processing. In this ca
 In case of failures, the records will not be processed.
 
 The code below provides an example of this approach:
+
+._Java-only: batch processing with inline processor_
 [source,java]
 ----
-public void configure() {
-    
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e
 -> {
-        // The received records are stored as exchanges in a list. This gets 
the list of those exchanges
+from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest")
+    .process(e -> {
         final List<?> exchanges = e.getMessage().getBody(List.class);
-
-        // Ensure we are actually receiving what we are asking for
         if (exchanges == null || exchanges.isEmpty()) {
             return;
         }
-
-        // The records from the batch are stored in a list of exchanges in the 
original exchange. To process, we iterate over that list
         for (Object obj : exchanges) {
             if (obj instanceof Exchange exchange) {
                 LOG.info("Processing exchange with body {}", 
exchange.getMessage().getBody(String.class));
             }
         }
-    }).to(KafkaTestUtil.MOCK_RESULT);
-}
+    })
+    .to("mock:result");
 ----
 
 ===== Handling Errors with Automatic Commits
@@ -630,44 +795,31 @@ It is recommended to implement appropriate error handling 
mechanisms and pattern
 
 The code below provides an example of handling errors with automatic commits:
 
+._Java-only: error handling with batch processing_
 [source,java]
 ----
-public void configure() {
-    /*
-     We want to use continued here, so that Camel auto-commits the batch even 
though part of it has failed. In a
-     production scenario, applications should probably send these records to a 
separate topic or fix the condition
-     that lead to the failure
-     */
-    onException(IllegalArgumentException.class).process(exchange -> {
-        LOG.warn("Failed to process batch {}", 
exchange.getMessage().getBody());
-        LOG.warn("Failed to process due to {}", 
exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
-    }).continued(true);
-
-    
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e
 -> {
-        // The received records are stored as exchanges in a list. This gets 
the list of those exchanges
-        final List<?> exchanges = e.getMessage().getBody(List.class);
+onException(IllegalArgumentException.class).process(exchange -> {
+    LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
+    LOG.warn("Failed to process due to {}", 
exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
+}).continued(true);
 
-        // Ensure we are actually receiving what we are asking for
+from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest")
+    .process(e -> {
+        final List<?> exchanges = e.getMessage().getBody(List.class);
         if (exchanges == null || exchanges.isEmpty()) {
             return;
         }
-
-        // The records from the batch are stored in a list of exchanges in the 
original exchange.
-        int i = 0;
         for (Object o : exchanges) {
             if (o instanceof Exchange exchange) {
-                i++;
                 LOG.info("Processing exchange with body {}", 
exchange.getMessage().getBody(String.class));
-
-                if (i == 4) {
-                    throw new IllegalArgumentException("Failed to process 
record");
-                }
             }
         }
-    }).to(KafkaTestUtil.MOCK_RESULT);
-}
+    })
+    .to("mock:result");
 ----
 
+TIP: In a production scenario, applications should send failed records to a 
separate topic (dead-letter queue) or fix the condition that led to the failure.
+
 ===== Break on First Error in Batching Mode
 
 The `breakOnFirstError` option is also supported in batching mode, providing 
the same error handling behavior as in streaming mode but applied to batch 
processing.
@@ -775,7 +927,7 @@ onException(Exception.class)
     .process(exchange -> {
         // Commit manually when error occurs
         KafkaManualCommit manual = exchange.getMessage()
-            .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
         manual.commit();
     });
 
@@ -790,7 +942,7 @@ 
from("kafka:topic?groupId=myGroup&batching=true&breakOnFirstError=true&autoCommi
     .process(exchange -> {
         // Manual commit on successful processing
         KafkaManualCommit manual = exchange.getMessage()
-            .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
         manual.commit();
     })
     .to("mock:result");
@@ -870,7 +1022,7 @@ public class ErrorCommitProcessor implements Processor {
         LOG.warn("Error occurred, performing manual commit before 
reconnection");
 
         KafkaManualCommit manual = exchange.getMessage()
-            .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
 
         if (manual != null) {
             manual.commit();
@@ -888,7 +1040,7 @@ public class SuccessCommitProcessor implements Processor {
         LOG.debug("Batch processed successfully, performing manual commit");
 
         KafkaManualCommit manual = exchange.getMessage()
-            .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
 
         if (manual != null) {
             manual.commit();
@@ -905,33 +1057,25 @@ When working with batch processing with manual commits, 
it's up to the applicati
 
 The code below provides an example of this approach:
 
+._Java-only: batch processing with manual commits_
 [source,java]
 ----
-public void configure() {
-    
from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
     .process(e -> {
-        // The received records are stored as exchanges in a list. This gets 
the list of those exchanges
         final List<?> exchanges = e.getMessage().getBody(List.class);
-
-        // Ensure we are actually receiving what we are asking for
         if (exchanges == null || exchanges.isEmpty()) {
             return;
         }
-
-        /*
-        Every exchange in that list should contain a reference to the manual 
commit object. We use the reference
-        for the last exchange in the list to commit the whole batch
-         */
+        // Use the last exchange in the list to commit the whole batch
         final Object tmp = exchanges.getLast();
         if (tmp instanceof Exchange exchange) {
             KafkaManualCommit manual =
-                    
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+                    exchange.getMessage().getHeader("CamelKafkaManualCommit", 
KafkaManualCommit.class);
             LOG.debug("Performing manual commit");
             manual.commit();
             LOG.debug("Done performing manual commit");
         }
     });
-}
 ----
 
 ==== Dealing with long polling timeouts
@@ -942,14 +1086,17 @@ To properly do so, first make sure to have a max polling 
interval that is higher
 
 Then, increase the shutdown timeout to ensure that committing, closing and 
other Kafka operations are not abruptly aborted. For instance:
 
+._Java-only: programmatic shutdown timeout configuration_
 [source,java]
 ----
-public void configure() {
-    // Note that this can be configured in other ways
-    getCamelContext().getShutdownStrategy().setTimeout(10000);
+getCamelContext().getShutdownStrategy().setTimeout(10000);
+----
 
-    // route setup ...
-}
+Or using configuration properties:
+
+[source,properties]
+----
+camel.main.shutdown-timeout=10000
 ----
 
 === Custom Subscription Adapters
@@ -957,8 +1104,8 @@ public void configure() {
 Applications with complex subscription logic may provide a custom bean to 
handle the subscription process. To so, it is
 necessary to implement the interface `SubscribeAdapter`.
 
+._Java-only: custom subscribe adapter class_
 [source,java]
-.Example subscriber adapter that subscribes to a set of Kafka topics or 
patterns
 ----
 public class CustomSubscribeAdapter implements SubscribeAdapter {
     @Override
@@ -972,12 +1119,12 @@ public class CustomSubscribeAdapter implements 
SubscribeAdapter {
 }
 ----
 
-Then, it is necessary to add it as named bean instance to the registry:
+Then, it is necessary to add it as a named bean instance to the registry:
 
+._Java-only: registering the bean_
 [source,java]
-.Add to registry example
 ----
-context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new 
CustomSubscribeAdapter());
+context.getRegistry().bind("subscribeAdapter", new CustomSubscribeAdapter());
 ----
 
 === Interoperability
@@ -1002,25 +1149,90 @@ To utilize this solution, you need to modify the route 
URI on the consumer end o
 `headerDeserializer` option.
 For example:
 
+[tabs]
+====
+Java::
++
 [source,java]
-.Route snippet
 ----
 
from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer")
-    .to("...");
+    .to("direct:process");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from 
uri="kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer"/>
+  <to uri="direct:process"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:topic
+      parameters:
+        headerDeserializer: 
"#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer"
+      steps:
+        - to:
+            uri: direct:process
+----
+====
+
 === Producer Performance
 
 If the producer is performing too slowly for your needs, you may want to 
aggregate the exchanges before sending.
 
+[tabs]
+====
+Java::
++
 [source,java]
-.Route snippet
 ----
-from("source")
-    // .other route stuff
+from("direct:start")
     .aggregate(constant(true), new GroupedExchangeAggregationStrategy())
-    .to("kafka:topic");
+    .to("kafka:my-topic");
+----
+
+XML::
++
+[source,xml]
+----
+<route>
+  <from uri="direct:start"/>
+  <aggregate aggregationStrategy="#groupedExchange">
+    <correlationExpression>
+      <constant>true</constant>
+    </correlationExpression>
+    <to uri="kafka:my-topic"/>
+  </aggregate>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: direct:start
+      steps:
+        - aggregate:
+            aggregationStrategy: "#groupedExchange"
+            correlationExpression:
+              constant: "true"
+            steps:
+              - to:
+                  uri: kafka:my-topic
 ----
+====
+
+TIP: In XML and YAML, register the `GroupedExchangeAggregationStrategy` as a 
bean named `groupedExchange` in the registry.
 
 The reason for this is related to how the producer handles the two different 
cases:
 
@@ -1183,68 +1395,162 @@ To keep the offsets, the component needs a 
`StateRepository` implementation such
 This bean should be available in the registry.
 Here how to use it :
 
+NOTE: The `FileStateRepository` bean must be registered in the Camel registry 
before it can be referenced by the route.
+
+._Java-only: registering the bean_
 [source,java]
 ----
-// Create the repository in which the Kafka offsets will be persisted
 FileStateRepository repository = FileStateRepository.fileStateRepository(new 
File("/path/to/repo.dat"));
 
 // Bind this repository into the Camel registry
 Registry registry = createCamelRegistry();
 registry.bind("offsetRepo", repository);
+----
 
-// Configure the camel context
-DefaultCamelContext camelContext = new DefaultCamelContext(registry);
-camelContext.addRoutes(new RouteBuilder() {
-    @Override
-    public void configure() throws Exception {
-        fromF("kafka:%s?brokers=localhost:{{kafkaPort}}" +
-                     // Set up the topic and broker address
-                     "&groupId=A" +
-                     // The consumer processor group ID
-                     "&autoOffsetReset=earliest" +
-                     // Ask to start from the beginning if we have unknown 
offset
-                     "&offsetRepository=#offsetRepo", TOPIC)
-                     // Keep the offsets in the previously configured 
repository
-                .to("mock:result");
-    }
-});
+[tabs]
+====
+Java::
++
+[source,java]
+----
+from("kafka:my-topic?brokers=localhost:9092&groupId=A&autoOffsetReset=earliest&offsetRepository=#offsetRepo")
+    .to("mock:result");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from 
uri="kafka:my-topic?brokers=localhost:9092&amp;groupId=A&amp;autoOffsetReset=earliest&amp;offsetRepository=#offsetRepo"/>
+  <to uri="mock:result"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my-topic
+      parameters:
+        brokers: "localhost:9092"
+        groupId: A
+        autoOffsetReset: earliest
+        offsetRepository: "#offsetRepo"
+      steps:
+        - to:
+            uri: mock:result
+----
+====
+
 
 === Producing messages to Kafka
 
 Here is the minimal route you need to produce messages to Kafka.
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:start")
-    .setBody(constant("Message from Camel"))          // Message to send
-    .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
+    .setBody(constant("Message from Camel"))
+    .setHeader("CamelKafkaKey", constant("Camel"))
     .to("kafka:test?brokers=localhost:9092");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from uri="direct:start"/>
+  <setBody>
+    <constant>Message from Camel</constant>
+  </setBody>
+  <setHeader name="CamelKafkaKey">
+    <constant>Camel</constant>
+  </setHeader>
+  <to uri="kafka:test?brokers=localhost:9092"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: direct:start
+      steps:
+        - setBody:
+            constant: "Message from Camel"
+        - setHeader:
+            name: CamelKafkaKey
+            constant: "Camel"
+        - to:
+            uri: kafka:test
+            parameters:
+              brokers: "localhost:9092"
+----
+====
+
+TIP: In Java, you can use the constant `KafkaConstants.KEY` instead of the 
string `"CamelKafkaKey"`.
+
 === SSL configuration
 
 You have two different ways to configure the SSL communication on the Kafka 
component.
 
 The first way is through the many SSL endpoint parameters:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
-from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
-             "&groupId=A" +
-             "&sslKeystoreLocation=/path/to/keystore.jks" +
-             "&sslKeystorePassword=changeit" +
-             "&sslKeyPassword=changeit" +
-             "&securityProtocol=SSL")
-        .to("mock:result");
+from("kafka:my-topic?brokers=localhost:9092&groupId=A&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL")
+    .to("mock:result");
+----
+
+XML::
++
+[source,xml]
+----
+<route>
+  <from 
uri="kafka:my-topic?brokers=localhost:9092&amp;groupId=A&amp;sslKeystoreLocation=/path/to/keystore.jks&amp;sslKeystorePassword=changeit&amp;sslKeyPassword=changeit&amp;securityProtocol=SSL"/>
+  <to uri="mock:result"/>
+</route>
 ----
 
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my-topic
+      parameters:
+        brokers: "localhost:9092"
+        groupId: A
+        sslKeystoreLocation: "/path/to/keystore.jks"
+        sslKeystorePassword: changeit
+        sslKeyPassword: changeit
+        securityProtocol: SSL
+      steps:
+        - to:
+            uri: mock:result
+----
+====
+
 The second way is to use the `sslContextParameters` endpoint parameter:
 
+._Java-only: SSLContextParameters bean setup_
 [source,java]
 ----
-// Configure the SSLContextParameters object
 KeyStoreParameters ksp = new KeyStoreParameters();
 ksp.setResource("/path/to/keystore.jks");
 ksp.setPassword("changeit");
@@ -1254,26 +1560,46 @@ kmp.setKeyPassword("changeit");
 SSLContextParameters scp = new SSLContextParameters();
 scp.setKeyManagers(kmp);
 
-// Bind this SSLContextParameters into the Camel registry
 Registry registry = createCamelRegistry();
 registry.bind("ssl", scp);
+----
 
-// Configure the camel context
-DefaultCamelContext camelContext = new DefaultCamelContext(registry);
-camelContext.addRoutes(new RouteBuilder() {
-    @Override
-    public void configure() throws Exception {
-        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
-                     // Set up the topic and broker address
-                     "&groupId=A" +
-                     // The consumer processor group ID
-                     "&sslContextParameters=#ssl" +
-                     // The security protocol
-                     "&securityProtocol=SSL)
-                     // Reference the SSL configuration
-                .to("mock:result");
-    }
-});
+[tabs]
+====
+Java::
++
+[source,java]
+----
+from("kafka:my-topic?brokers=localhost:9092&groupId=A&sslContextParameters=#ssl&securityProtocol=SSL")
+    .to("mock:result");
+----
+
+XML::
++
+[source,xml]
+----
+<route>
+  <from 
uri="kafka:my-topic?brokers=localhost:9092&amp;groupId=A&amp;sslContextParameters=#ssl&amp;securityProtocol=SSL"/>
+  <to uri="mock:result"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my-topic
+      parameters:
+        brokers: "localhost:9092"
+        groupId: A
+        sslContextParameters: "#ssl"
+        securityProtocol: SSL
+      steps:
+        - to:
+            uri: mock:result
 ----
+====
 
 include::spring-boot:partial$starter.adoc[]

Reply via email to