This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-17792/doc-message-headers in repository https://gitbox.apache.org/repos/asf/camel.git
commit d8ab4e7cef556eec8c9c3687a7356ffe28535cd4 Author: Nicolas Filotto <[email protected]> AuthorDate: Wed Mar 30 19:10:43 2022 +0200 CAMEL-17792: Add doc about the message headers of camel-kafka --- .../org/apache/camel/component/kafka/kafka.json | 15 ++++++++ .../camel-kafka/src/main/docs/kafka-component.adoc | 42 +++------------------- .../camel/component/kafka/KafkaConstants.java | 30 +++++++++++++++- .../camel/component/kafka/KafkaEndpoint.java | 2 +- 4 files changed, 49 insertions(+), 40 deletions(-) diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index f3ed143..9ca1624 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -129,6 +129,21 @@ "sslTruststoreType": { "kind": "property", "displayName": "Ssl Truststore Type", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "JKS", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The file format of the trust store file. Default value is JKS." }, "useGlobalSslContextParameters": { "kind": "property", "displayName": "Use Global Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable usage of global SSL context parameters." } }, + "headers": { + "kafka.PARTITION_KEY": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Explicitly specify the partition" }, + "kafka.PARTITION": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The partition where the message was stored" }, + "kafka.KEY": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": true, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "*Producer:* The key of the message in order to ensure that all related message goes in the same partition. *Consumer:* The key of the message if configured" }, + "kafka.TOPIC": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The topic from where the message originated" }, + "kafka.OVERRIDE_TOPIC": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The topic to which send the message (override and takes precedence), and the header is not preserved." }, + "kafka.OFFSET": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The offset of the message" }, + "kafka.HEADERS": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.kafka.common.header.Headers", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The record headers" }, + "kafka.LAST_RECORD_BEFORE_COMMIT": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether or not it's the last record before commit (only available if `autoCommitEnable` endpoint parameter is `false`)" }, + "kafka.LAST_POLL_RECORD": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Indicates the last record within the current poll request (only available if `autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is `true`)" }, + "kafka.TIMESTAMP": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message" }, + "kafka.OVERRIDE_TIMESTAMP": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved." }, + "org.apache.kafka.clients.producer.RecordMetadata": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The metadata (only configured if `recordMetadata` endpoint parameter is `true`)" }, + "CamelKafkaManualCommit": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used for forcing manual offset commit when using Kafka consumer." } + }, "properties": { "topic": { "kind": "path", "displayName": "Topic", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a me [...] "additionalProperties": { "kind": "parameter", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets additional [...] diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 7bbc937..c462106 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -55,46 +55,12 @@ For more information about Producer/Consumer configuration: http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs] -== Message headers - -=== Consumer headers - -The following headers are available when consuming messages from Kafka. -[width="100%",cols="2m,2m,1m,5",options="header"] -|=== -| Header constant | Header value | Type | Description -| KafkaConstants.TOPIC | "kafka.TOPIC" | String | The topic from where the message originated -| KafkaConstants.PARTITION | "kafka.PARTITION" | Integer | The partition where the message was stored -| KafkaConstants.OFFSET | "kafka.OFFSET" | Long | The offset of the message -| KafkaConstants.KEY | "kafka.KEY" | Object | The key of the message if configured -| KafkaConstants.HEADERS | "kafka.HEADERS" | org.apache.kafka.common.header.Headers | The record headers -| KafkaConstants.LAST_RECORD_BEFORE_COMMIT | "kafka.LAST_RECORD_BEFORE_COMMIT" | Boolean | Whether or not it's the last record before commit (only available if `autoCommitEnable` endpoint parameter is `false`) -| KafkaConstants.LAST_POLL_RECORD | "kafka.LAST_POLL_RECORD" | Boolean | Indicates the last record within the current poll request (only available if `autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is `true`) -| KafkaConstants.MANUAL_COMMIT | "CamelKafkaManualCommit" | KafkaManualCommit | Can be used for forcing manual offset commit when using Kafka consumer. | -|=== - -=== Producer headers - -Before sending a message to Kafka you can configure the following headers. -[width="100%",cols="2m,2m,1m,5",options="header"] -|=== -| Header constant | Header value | Type | Description -| KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition -| KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String | The topic to which send the message (override and takes precedence), and the header is not preserved. -| KafkaConstants.OVERRIDE_TIMESTAMP | "kafka.OVERRIDE_TIMESTAMP" | Long | The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved. -| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition -|=== +// component headers: START +include::partial$component-endpoint-headers.adoc[] +// component headers: END If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header -that are not send along the message, as its removed in the producer. - -After the message is sent to Kafka, the following headers are available -[width="100%",cols="2m,2m,1m,5",options="header"] -|=== -| Header constant | Header value | Type | Description -| KafkaConstants.KAFKA_RECORDMETA | "org.apache.kafka.clients.producer.RecordMetadata" | List<RecordMetadata> | The metadata (only configured if `recordMetadata` endpoint parameter is `true` -|=== - +that are not send along the message, as it's removed in the producer. == Consumer error handling diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index edc2dcb..ba6e0ef 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -16,18 +16,42 @@ */ package org.apache.camel.component.kafka; +import org.apache.camel.spi.Metadata; + public final class KafkaConstants { + @Metadata(label = "producer", description = "Explicitly specify the partition", javaType = "Integer") public static final String PARTITION_KEY = "kafka.PARTITION_KEY"; + @Metadata(label = "consumer", description = "The partition where the message was stored", javaType = "Integer") public static final String PARTITION = "kafka.PARTITION"; + @Metadata(description = "*Producer:* The key of the message in order to ensure that all related message goes in the same partition. " + + + "*Consumer:* The key of the message if configured", + javaType = "Object", required = true) public static final String KEY = "kafka.KEY"; + @Metadata(label = "consumer", description = "The topic from where the message originated", javaType = "String") public static final String TOPIC = "kafka.TOPIC"; + @Metadata(label = "producer", + description = "The topic to which send the message (override and takes precedence), and the header is not preserved.", + javaType = "String") public static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC"; + @Metadata(label = "consumer", description = "The offset of the message", javaType = "Long") public static final String OFFSET = "kafka.OFFSET"; + @Metadata(label = "consumer", description = "The record headers", javaType = "org.apache.kafka.common.header.Headers") public static final String HEADERS = "kafka.HEADERS"; + @Metadata(label = "consumer", + description = "Whether or not it's the last record before commit (only available if `autoCommitEnable` endpoint parameter is `false`)", + javaType = "Boolean") public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT"; + @Metadata(label = "consumer", description = "Indicates the last record within the current poll request " + + "(only available if `autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is `true`)", + javaType = "Boolean") public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD"; + @Metadata(label = "consumer", description = "The timestamp of the message", javaType = "Long") public static final String TIMESTAMP = "kafka.TIMESTAMP"; + @Metadata(label = "producer", description = "The ProducerRecord also has an associated timestamp. " + + "If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.", + javaType = "Long") public static final String OVERRIDE_TIMESTAMP = "kafka.OVERRIDE_TIMESTAMP"; @Deprecated @@ -39,8 +63,12 @@ public final class KafkaConstants { public static final String KAFKA_DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; public static final String KAFKA_DEFAULT_PARTITIONER = "org.apache.kafka.clients.producer.internals.DefaultPartitioner"; public static final String PARTITIONER_RANGE_ASSIGNOR = "org.apache.kafka.clients.consumer.RangeAssignor"; + @Metadata(label = "producer", + description = "The metadata (only configured if `recordMetadata` endpoint parameter is `true`)", + javaType = "List<RecordMetadata>") public static final String KAFKA_RECORDMETA = "org.apache.kafka.clients.producer.RecordMetadata"; - + @Metadata(label = "consumer", description = "Can be used for forcing manual offset commit when using Kafka consumer.", + javaType = "org.apache.camel.component.kafka.consumer.KafkaManualCommit") public static final String MANUAL_COMMIT = "CamelKafkaManualCommit"; private KafkaConstants() { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 1821084..e28309b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; * Sent and receive messages to/from an Apache Kafka broker. */ @UriEndpoint(firstVersion = "2.13.0", scheme = "kafka", title = "Kafka", syntax = "kafka:topic", - category = { Category.MESSAGING }) + category = { Category.MESSAGING }, headersClass = KafkaConstants.class) public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { private static final Logger LOG = LoggerFactory.getLogger(KafkaEndpoint.class);
