This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-17792/doc-message-headers in repository https://gitbox.apache.org/repos/asf/camel.git
commit 664837158dd3d07a852c013462b4b5617fd17ee9 Author: Nicolas Filotto <[email protected]> AuthorDate: Wed Apr 6 12:31:22 2022 +0200 CAMEL-17792: Add doc about the message headers of camel-vertx-kafka-component --- .../camel/component/vertx/kafka/vertx-kafka.json | 11 ++++++ .../src/main/docs/vertx-kafka-component.adoc | 44 ++-------------------- .../component/vertx/kafka/VertxKafkaConstants.java | 24 ++++++++++++ .../component/vertx/kafka/VertxKafkaEndpoint.java | 2 +- 4 files changed, 40 insertions(+), 41 deletions(-) diff --git a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json index d08839a2769..c090329ca26 100644 --- a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json +++ b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json @@ -129,6 +129,17 @@ "sslTruststorePassword": { "kind": "property", "displayName": "Ssl Truststore Password", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The password for the trust store file. If a password is not set, t [...] "sslTruststoreType": { "kind": "property", "displayName": "Ssl Truststore Type", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "JKS", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The file format of the trust store file." } }, + "headers": { + "CamelVertxKafkaPartitionId": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Producer: Explicitly specify the partition identifier, for example partition 0. This will trigger the component to produce all the massages to the specified partition. Consumer: The partition identifier where the message were consumed from." }, + "CamelVertxKafkaMessageKey": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Producer: Explicitly specify the message key, if partition ID is not specified, this will trigger the messages to go into the same partition. Consumer: The message key." }, + "CamelVertxKafkaTopic": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Producer: Explicitly specify the topic to where produce the messages, this will be preserved in case of header aggregation. Consumer: The topic from where the message originated." }, + "CamelVertxKafkaRecordMetadata": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "List<RecordMetadata>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Produced record metadata." }, + "CamelVertxKafkaOffset": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The offset of the message in Kafka topic." }, + "CamelVertxKafkaHeaders": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "List<KafkaHeader>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The record Kafka headers." }, + "CamelVertxKafkaTimestamp": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of this record." }, + "CamelVertxKafkaOverrideTimestamp": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved." }, + "CamelVertxKafkaOverrideTopic": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Explicitly specify the topic to where produce the messages, this will not be preserved in case of header aggregation and it will take precedence over CamelVertxKafkaTopic." } + }, "properties": { "topic": { "kind": "path", "displayName": "Topic", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Name of the topic to use. On the consumer you can use comma to separate multiple topics. A pr [...] "additionalProperties": { "kind": "parameter", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "descri [...] diff --git a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc index 7c3518df2e3..c9f9cbc96d8 100644 --- a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc +++ b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc @@ -71,7 +71,6 @@ include::partial$component-endpoint-options.adoc[] // endpoint options: END - For more information about Producer/Consumer configuration: http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] @@ -85,47 +84,12 @@ This allows camel route to consume and produce events asynchronously without blo == Usage -=== Message headers set by the component consumer -The following headers are available when consuming messages from Kafka. - -[width="100%",cols="10%,10%,10%,70%",options="header",] -|======================================================================= -|Header |Variable Name |Type |Description - -|`CamelVertxKafkaPartitionId`| `VertxKafkaConstants.PARTITION_ID`|`Integer`| The partition identifier where the message were consumed from. -|`CamelVertxKafkaMessageKey`| `VertxKafkaConstants.MESSAGE_KEY`|`String`| The message key. -|`CamelVertxKafkaTopic`| `VertxKafkaConstants.TOPIC`|`String`| The topic from where the message originated. -|`CamelVertxKafkaOffset`| `VertxKafkaConstants.OFFSET`|`Long`| The offset of the message in Kafka topic. -|`CamelVertxKafkaHeaders`| `VertxKafkaConstants.HEADERS`|`List<io.vertx.kafka.client.producer.KafkaHeader>`| The record Kafka headers. -|`CamelVertxKafkaTimestamp`| `VertxKafkaConstants.TIMESTAMP`|`Long`| The timestamp of this record. -|======================================================================= - -=== Message headers evaluated by the component producer -Before sending a message to Kafka you can configure the following headers. - -[width="100%",cols="10%,10%,10%,70%",options="header",] -|======================================================================= -|Header |Variable Name |Type |Description - -|`CamelVertxKafkaPartitionId`| `VertxKafkaConstants.PARTITION_ID`|`Integer`| Explicitly specify the partition identifier, for example partition `0`. This will trigger the component to produce all the massages to the specified partition. -|`CamelVertxKafkaMessageKey`| `VertxKafkaConstants.MESSAGE_KEY`|`String`| Explicitly specify the message key, if partition ID is not specified, this will trigger the messages to go into the same partition. -|`CamelVertxKafkaTopic`| `VertxKafkaConstants.TOPIC`|`String`| Explicitly specify the topic to where produce the messages, this will be *preserved* in case of header aggregation. -|`CamelVertxKafkaOverrideTopic`| `VertxKafkaConstants.OVERRIDE_TOPIC`|`String`| Explicitly specify the topic to where produce the messages, this will *not be preserved* in case of header aggregation and it will take *precedence* over `CamelVertxKafkaTopic`. -| `CamelVertxKafkaOverrideTimestamp` | `VertxKafkaConstants.OVERRIDE_TIMESTAMP` | Long | The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved. -|======================================================================= +// component headers: START +include::partial$component-endpoint-headers.adoc[] +// component headers: END If you want to send a message to a dynamic topic then use `VertxKafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header -that are not send along the message, as its removed in the producer. - -=== Message headers set by the component producer -After the message is sent to Kafka, the following headers are available - -[width="100%",cols="10%,10%,10%,70%",options="header",] -|======================================================================= -|Header |Variable Name |Type |Description - -|`CamelVertxKafkaRecordMetadata`| `VertxKafkaConstants.RECORD_METADATA`|`List<io.vertx.kafka.client.producer.RecordMetadata>`| Produced record metadata. -|======================================================================= +that are not send along the message, as it's removed in the producer. === Message body type Currently, the component supports the following value serializers for the body message on the producer side: diff --git a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java index 30a3da08f80..689baa828b1 100644 --- a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java +++ b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java @@ -16,21 +16,45 @@ */ package org.apache.camel.component.vertx.kafka; +import org.apache.camel.spi.Metadata; + public final class VertxKafkaConstants { private static final String HEADER_PREFIX = "CamelVertxKafka"; // common headers, set by the consumer and evaluated by the producer + @Metadata(description = "*Producer:* Explicitly specify the partition identifier, for example partition `0`. " + + "This will trigger the component to produce all the massages to the specified partition.\n" + + "*Consumer:* The partition identifier where the message were consumed from.", + javaType = "Integer") public static final String PARTITION_ID = HEADER_PREFIX + "PartitionId"; + @Metadata(description = "*Producer:* Explicitly specify the message key, if partition ID is not specified, " + + "this will trigger the messages to go into the same partition.\n" + + "*Consumer:* The message key.", + javaType = "String") public static final String MESSAGE_KEY = HEADER_PREFIX + "MessageKey"; + @Metadata(description = "*Producer:* Explicitly specify the topic to where produce the messages, this will be *preserved* in case of header aggregation.\n" + + + "*Consumer:* The topic from where the message originated.", + javaType = "String") public static final String TOPIC = HEADER_PREFIX + "Topic"; // headers set by the producer only + @Metadata(label = "producer", description = "Produced record metadata.", javaType = "List<RecordMetadata>") public static final String RECORD_METADATA = HEADER_PREFIX + "RecordMetadata"; // headers set by the consumer only + @Metadata(label = "consumer", description = "The offset of the message in Kafka topic.", javaType = "Long") public static final String OFFSET = HEADER_PREFIX + "Offset"; + @Metadata(label = "consumer", description = "The record Kafka headers.", javaType = "List<KafkaHeader>") public static final String HEADERS = HEADER_PREFIX + "Headers"; + @Metadata(label = "consumer", description = "The timestamp of this record.", javaType = "Long") public static final String TIMESTAMP = HEADER_PREFIX + "Timestamp"; + @Metadata(label = "producer", description = "The ProducerRecord also has an associated timestamp. " + + "If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.", + javaType = "Long") public static final String OVERRIDE_TIMESTAMP = HEADER_PREFIX + "OverrideTimestamp"; public static final String MANUAL_COMMIT = HEADER_PREFIX + "ManualCommit"; // headers evaluated by the producer only + @Metadata(label = "producer", description = "Explicitly specify the topic to where produce the messages," + + " this will *not be preserved* in case of header aggregation and it will take *precedence* over `CamelVertxKafkaTopic`.", + javaType = "String") public static final String OVERRIDE_TOPIC = HEADER_PREFIX + "OverrideTopic"; private VertxKafkaConstants() { diff --git a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpoint.java b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpoint.java index 8b909a87d0c..de279dd9d71 100644 --- a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpoint.java +++ b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpoint.java @@ -31,7 +31,7 @@ import org.apache.camel.support.DefaultEndpoint; * Sent and receive messages to/from an Apache Kafka broker using vert.x Kafka client */ @UriEndpoint(firstVersion = "3.7.0", scheme = "vertx-kafka", title = "Vert.x Kafka", syntax = "vertx-kafka:topic", - category = { Category.MESSAGING }) + category = { Category.MESSAGING }, headersClass = VertxKafkaConstants.class) public class VertxKafkaEndpoint extends DefaultEndpoint { @UriParam
