This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-17792/doc-message-headers
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d8ab4e7cef556eec8c9c3687a7356ffe28535cd4
Author: Nicolas Filotto <[email protected]>
AuthorDate: Wed Mar 30 19:10:43 2022 +0200

    CAMEL-17792: Add doc about the message headers of camel-kafka
---
 .../org/apache/camel/component/kafka/kafka.json    | 15 ++++++++
 .../camel-kafka/src/main/docs/kafka-component.adoc | 42 +++-------------------
 .../camel/component/kafka/KafkaConstants.java      | 30 +++++++++++++++-
 .../camel/component/kafka/KafkaEndpoint.java       |  2 +-
 4 files changed, 49 insertions(+), 40 deletions(-)

diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index f3ed143..9ca1624 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -129,6 +129,21 @@
     "sslTruststoreType": { "kind": "property", "displayName": "Ssl Truststore 
Type", "group": "security", "label": "common,security", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "JKS", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The file format of the 
trust store file. Default value is JKS." },
     "useGlobalSslContextParameters": { "kind": "property", "displayName": "Use 
Global Ssl Context Parameters", "group": "security", "label": "security", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": false, 
"description": "Enable usage of global SSL context parameters." }
   },
+  "headers": {
+    "kafka.PARTITION_KEY": { "kind": "header", "displayName": "", "group": 
"producer", "label": "producer", "required": false, "javaType": "Integer", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Explicitly specify the partition" },
+    "kafka.PARTITION": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "Integer", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The partition where the message was stored" },
+    "kafka.KEY": { "kind": "header", "displayName": "", "group": "common", 
"label": "", "required": true, "javaType": "Object", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": 
"*Producer:* The key of the message in order to ensure that all related message 
goes in the same partition. *Consumer:* The key of the message if configured" },
+    "kafka.TOPIC": { "kind": "header", "displayName": "", "group": "consumer", 
"label": "consumer", "required": false, "javaType": "String", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"description": "The topic from where the message originated" },
+    "kafka.OVERRIDE_TOPIC": { "kind": "header", "displayName": "", "group": 
"producer", "label": "producer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The topic to which send the message (override and takes 
precedence), and the header is not preserved." },
+    "kafka.OFFSET": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "Long", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The offset of the message" },
+    "kafka.HEADERS": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": 
"org.apache.kafka.common.header.Headers", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": "The 
record headers" },
+    "kafka.LAST_RECORD_BEFORE_COMMIT": { "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Whether or not it's the last record before 
commit (only available if `autoCommitEnable` endpoint parameter is `false`)" },
+    "kafka.LAST_POLL_RECORD": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "Boolean", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Indicates the last record within the current poll 
request (only available if `autoCommitEnable` endpoint parameter is `false` or 
`allowManualCommit` is `true`)" },
+    "kafka.TIMESTAMP": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "Long", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The timestamp of the message" },
+    "kafka.OVERRIDE_TIMESTAMP": { "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The ProducerRecord also has an associated 
timestamp. If the user did provide a timestamp, the producer will stamp the  
record with the provided timestamp and the header is not preserved." },
+    "org.apache.kafka.clients.producer.RecordMetadata": { "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The metadata (only 
configured if `recordMetadata` endpoint parameter is `true`)" },
+    "CamelKafkaManualCommit": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"description": "Can be used for forcing manual offset commit when using Kafka 
consumer." }
+  },
   "properties": {
     "topic": { "kind": "path", "displayName": "Topic", "group": "common", 
"label": "common", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Name of the topic to use. On the consumer you 
can use comma to separate multiple topics. A producer can only send a me [...]
     "additionalProperties": { "kind": "parameter", "displayName": "Additional 
Properties", "group": "common", "label": "common", "required": false, "type": 
"object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", 
"prefix": "additionalProperties.", "multiValue": true, "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Sets additional  [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 7bbc937..c462106 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -55,46 +55,12 @@ For more information about Producer/Consumer configuration:
 
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]
 
http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs]
 
-== Message headers
-
-=== Consumer headers
-
-The following headers are available when consuming messages from Kafka.
-[width="100%",cols="2m,2m,1m,5",options="header"]
-|===
-| Header constant                          | Header value                      
| Type    | Description
-| KafkaConstants.TOPIC                     | "kafka.TOPIC"                     
| String  | The topic from where the message originated
-| KafkaConstants.PARTITION                 | "kafka.PARTITION"                 
| Integer | The partition where the message was stored
-| KafkaConstants.OFFSET                    | "kafka.OFFSET"                    
| Long    | The offset of the message
-| KafkaConstants.KEY                       | "kafka.KEY"                       
| Object  | The key of the message if configured
-| KafkaConstants.HEADERS                   | "kafka.HEADERS"                   
| org.apache.kafka.common.header.Headers  | The record headers
-| KafkaConstants.LAST_RECORD_BEFORE_COMMIT | "kafka.LAST_RECORD_BEFORE_COMMIT" 
| Boolean | Whether or not it's the last record before commit (only available 
if `autoCommitEnable` endpoint parameter is `false`)
-| KafkaConstants.LAST_POLL_RECORD | "kafka.LAST_POLL_RECORD" | Boolean | 
Indicates the last record within the current poll request (only available if 
`autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is 
`true`)
-| KafkaConstants.MANUAL_COMMIT             | "CamelKafkaManualCommit"          
| KafkaManualCommit | Can be used for forcing manual offset commit when using 
Kafka consumer. |
-|===
-
-=== Producer headers
-
-Before sending a message to Kafka you can configure the following headers.
-[width="100%",cols="2m,2m,1m,5",options="header"]
-|===
-| Header constant              | Header value          | Type    | Description
-| KafkaConstants.KEY           | "kafka.KEY"           | Object  | *Required* 
The key of the message in order to ensure that all related message goes in the 
same partition
-| KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String  | The topic 
to which send the message (override and takes precedence), and the header is 
not preserved.
-| KafkaConstants.OVERRIDE_TIMESTAMP | "kafka.OVERRIDE_TIMESTAMP" | Long | The 
ProducerRecord also has an associated timestamp. If the user did provide a 
timestamp, the producer will stamp the  record with the provided timestamp and 
the header is not preserved. 
-| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly 
specify the partition
-|===
+// component headers: START
+include::partial$component-endpoint-headers.adoc[]
+// component headers: END
 
 If you want to send a message to a dynamic topic then use 
`KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
-that are not send along the message, as its removed in the producer.
-
-After the message is sent to Kafka, the following headers are available
-[width="100%",cols="2m,2m,1m,5",options="header"]
-|===
-| Header constant                 | Header value                               
        | Type                 | Description
-| KafkaConstants.KAFKA_RECORDMETA | 
"org.apache.kafka.clients.producer.RecordMetadata" | List<RecordMetadata> | The 
metadata (only configured if `recordMetadata` endpoint parameter is `true`
-|===
-
+that are not send along the message, as it's removed in the producer.
 
 == Consumer error handling
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index edc2dcb..ba6e0ef 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -16,18 +16,42 @@
  */
 package org.apache.camel.component.kafka;
 
+import org.apache.camel.spi.Metadata;
+
 public final class KafkaConstants {
 
+    @Metadata(label = "producer", description = "Explicitly specify the 
partition", javaType = "Integer")
     public static final String PARTITION_KEY = "kafka.PARTITION_KEY";
+    @Metadata(label = "consumer", description = "The partition where the 
message was stored", javaType = "Integer")
     public static final String PARTITION = "kafka.PARTITION";
+    @Metadata(description = "*Producer:* The key of the message in order to 
ensure that all related message goes in the same partition. "
+                            +
+                            "*Consumer:* The key of the message if configured",
+              javaType = "Object", required = true)
     public static final String KEY = "kafka.KEY";
+    @Metadata(label = "consumer", description = "The topic from where the 
message originated", javaType = "String")
     public static final String TOPIC = "kafka.TOPIC";
+    @Metadata(label = "producer",
+              description = "The topic to which send the message (override and 
takes precedence), and the header is not preserved.",
+              javaType = "String")
     public static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC";
+    @Metadata(label = "consumer", description = "The offset of the message", 
javaType = "Long")
     public static final String OFFSET = "kafka.OFFSET";
+    @Metadata(label = "consumer", description = "The record headers", javaType 
= "org.apache.kafka.common.header.Headers")
     public static final String HEADERS = "kafka.HEADERS";
+    @Metadata(label = "consumer",
+              description = "Whether or not it's the last record before commit 
(only available if `autoCommitEnable` endpoint parameter is `false`)",
+              javaType = "Boolean")
     public static final String LAST_RECORD_BEFORE_COMMIT = 
"kafka.LAST_RECORD_BEFORE_COMMIT";
+    @Metadata(label = "consumer", description = "Indicates the last record 
within the current poll request " +
+                                                "(only available if 
`autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is 
`true`)",
+              javaType = "Boolean")
     public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD";
+    @Metadata(label = "consumer", description = "The timestamp of the 
message", javaType = "Long")
     public static final String TIMESTAMP = "kafka.TIMESTAMP";
+    @Metadata(label = "producer", description = "The ProducerRecord also has 
an associated timestamp. " +
+                                                "If the user did provide a 
timestamp, the producer will stamp the  record with the provided timestamp and 
the header is not preserved.",
+              javaType = "Long")
     public static final String OVERRIDE_TIMESTAMP = "kafka.OVERRIDE_TIMESTAMP";
 
     @Deprecated
@@ -39,8 +63,12 @@ public final class KafkaConstants {
     public static final String KAFKA_DEFAULT_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
     public static final String KAFKA_DEFAULT_PARTITIONER = 
"org.apache.kafka.clients.producer.internals.DefaultPartitioner";
     public static final String PARTITIONER_RANGE_ASSIGNOR = 
"org.apache.kafka.clients.consumer.RangeAssignor";
+    @Metadata(label = "producer",
+              description = "The metadata (only configured if `recordMetadata` 
endpoint parameter is `true`)",
+              javaType = "List<RecordMetadata>")
     public static final String KAFKA_RECORDMETA = 
"org.apache.kafka.clients.producer.RecordMetadata";
-
+    @Metadata(label = "consumer", description = "Can be used for forcing 
manual offset commit when using Kafka consumer.",
+              javaType = 
"org.apache.camel.component.kafka.consumer.KafkaManualCommit")
     public static final String MANUAL_COMMIT = "CamelKafkaManualCommit";
 
     private KafkaConstants() {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1821084..e28309b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
  * Sent and receive messages to/from an Apache Kafka broker.
  */
 @UriEndpoint(firstVersion = "2.13.0", scheme = "kafka", title = "Kafka", 
syntax = "kafka:topic",
-             category = { Category.MESSAGING })
+             category = { Category.MESSAGING }, headersClass = 
KafkaConstants.class)
 public class KafkaEndpoint extends DefaultEndpoint implements 
MultipleConsumersSupport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaEndpoint.class);

Reply via email to