This is an automated email from the ASF dual-hosted git repository.
oscerd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f8914a219ad4 CAMEL-23584: camel-kafka - align Exchange header constant
names with Camel naming convention
f8914a219ad4 is described below
commit f8914a219ad473811c01562b5b85e83ba583b583
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri May 29 07:00:34 2026 +0200
CAMEL-23584: camel-kafka - align Exchange header constant names with Camel
naming convention
Renames the Exchange header string values in KafkaConstants from the
non-Camel-prefixed "kafka.*" namespace to the project-wide "CamelKafka*"
PascalCase convention, completing the camel-kafka sub-task under the
CAMEL-23577 umbrella header-naming-convention sweep. The Java field names
are unchanged, so routes using the symbolic constants are unaffected;
routes hard-coding the literal "kafka.*" header strings must move to the
new "CamelKafka*" values, documented as a potential breaking change in the
4.21 upgrade guide. Also keeps the camel-tracing / camel-telemetry
KafkaSpanDecorator copies and the bundled transform classes in sync, and
regenerates the catalog, endpoint DSL, and important-headers artifacts.
Tracker: CAMEL-23577
Closes #23602
---
.../org/apache/camel/catalog/components/kafka.json | 24 +++---
.../camel/catalog/main/important-headers.json | 8 +-
.../org/apache/camel/component/kafka/kafka.json | 24 +++---
.../camel-kafka/src/main/docs/kafka-component.adoc | 24 +++---
.../camel/component/kafka/KafkaConstants.java | 24 +++---
.../component/kafka/KafkaHeaderDeserializer.java | 2 +-
.../kafka/transform/MessageTimestampRouter.java | 5 +-
.../component/kafka/transform/RegexRouter.java | 5 +-
.../component/kafka/transform/TimestampRouter.java | 5 +-
.../component/kafka/transform/ValueToKey.java | 3 +-
.../kafka/integration/KafkaConsumerFullIT.java | 2 +-
.../component/kafka/transform/RegexRouterTest.java | 5 +-
.../kafka/clients/producer/KafkaProducerTest.java | 9 ++-
.../apache/camel/opentelemetry2/SpanKindTest.java | 6 +-
.../opentelemetry2/mock/MockKafkaProducer.java | 6 +-
.../telemetry/decorators/KafkaSpanDecorator.java | 10 +--
.../tracing/decorators/KafkaSpanDecorator.java | 10 +--
.../apache/camel/util/ImportantHeaderUtils.java | 8 +-
.../ROOT/pages/camel-4x-upgrade-guide-4_21.adoc | 92 ++++++++++++++++++++++
.../endpoint/dsl/KafkaEndpointBuilderFactory.java | 49 ++++++------
20 files changed, 209 insertions(+), 112 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 3cde0c9ee7c8..37f24b4c2b1c 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -155,18 +155,18 @@
"useGlobalSslContextParameters": { "index": 128, "kind": "property",
"displayName": "Use Global Ssl Context Parameters", "group": "security",
"label": "security", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Enable usage of global SSL context
parameters." }
},
"headers": {
- "kafka.PARTITION_KEY": { "index": 0, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Explicitly specify the partition",
"constantName": "org.apache.camel.component.kafka.KafkaConstants#PARTITION_KEY"
},
- "kafka.PARTITION": { "index": 1, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The partition where the
message was stored", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#PARTITION" },
- "kafka.KEY": { "index": 2, "kind": "header", "displayName": "", "group":
"common", "label": "", "required": true, "javaType": "Object", "deprecated":
false, "deprecationNote": "", "autowired": false, "secret": false, "important":
true, "description": "Producer: The key of the message in order to ensure that
all related message goes in the same partition. Consumer: The key of the
message if configured", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#KEY" },
- "kafka.TOPIC": { "index": 3, "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "important": true, "description": "The topic from where the message
originated", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#TOPIC" },
- "kafka.OVERRIDE_TOPIC": { "index": 4, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The topic to which send the message (override
and takes precedence), and the header is not preserved.", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#OVERRIDE_TOPIC" },
- "kafka.OFFSET": { "index": 5, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The offset of the message",
"constantName": "org.apache.camel.component.kafka.KafkaConstants#OFFSET" },
- "kafka.HEADERS": { "index": 6, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"org.apache.kafka.common.header.Headers", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
record headers", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#HEADERS" },
- "kafka.LAST_RECORD_BEFORE_COMMIT": { "index": 7, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Whether or not it's the last record
before commit (only available if autoCommitEnable endpoint parameter is
false)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#LAST_RECORD_BEFORE_COMMIT" },
- "kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Indicates the last record within the current
poll request (only available if autoCommitEnable endpoint parameter is false or
allowManualCommit is true)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...]
- "kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The timestamp of the message", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
- "kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The ProducerRecord also has an
associated timestamp. If the user did provide a timestamp, the producer will
stamp the record with the provided timestamp and the header is not preserved.",
"constantName": "org.apache.camel.componen [...]
- "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"List<RecordMetadata>", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The metadata (only
configured if recordMetadata endpoint parameter is true)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
+ "CamelKafkaPartitionKey": { "index": 0, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Explicitly specify the partition",
"constantName": "org.apache.camel.component.kafka.KafkaConstants#PARTITION_KEY"
},
+ "CamelKafkaPartition": { "index": 1, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The partition where the
message was stored", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#PARTITION" },
+ "CamelKafkaKey": { "index": 2, "kind": "header", "displayName": "",
"group": "common", "label": "", "required": true, "javaType": "Object",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "important": true, "description": "Producer: The key of the message in
order to ensure that all related message goes in the same partition. Consumer:
The key of the message if configured", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#KEY" },
+ "CamelKafkaTopic": { "index": 3, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The topic from where the
message originated", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#TOPIC" },
+ "CamelKafkaOverrideTopic": { "index": 4, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The topic to which send the message (override
and takes precedence), and the header is not preserved.", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#OVERRIDE_TOPIC" },
+ "CamelKafkaOffset": { "index": 5, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The offset of the message",
"constantName": "org.apache.camel.component.kafka.KafkaConstants#OFFSET" },
+ "CamelKafkaHeaders": { "index": 6, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"org.apache.kafka.common.header.Headers", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
record headers", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#HEADERS" },
+ "CamelKafkaLastRecordBeforeCommit": { "index": 7, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Whether or not it's the last record
before commit (only available if autoCommitEnable endpoint parameter is
false)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#LAST_RECORD_BEFORE_COMMIT" },
+ "CamelKafkaLastPollRecord": { "index": 8, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Indicates the last record within the current
poll request (only available if autoCommitEnable endpoint parameter is false or
allowManualCommit is true)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#LAST_POLL [...]
+ "CamelKafkaTimestamp": { "index": 9, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The timestamp of the message", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
+ "CamelKafkaOverrideTimestamp": { "index": 10, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The ProducerRecord also has an
associated timestamp. If the user did provide a timestamp, the producer will
stamp the record with the provided timestamp and the header is not preserved.",
"constantName": "org.apache.camel.compo [...]
+ "CamelKafkaRecordMeta": { "index": 11, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"List<RecordMetadata>", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The metadata (only
configured if recordMetadata endpoint parameter is true)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
"CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated":
false, "deprecationNote": "", "autowired": false, "secret": false,
"description": "Can be used for forcing manual offset commit when using Kafka
consumer.", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" }
},
"properties": {
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
index e88ef5eb65d7..d5eefe63ecb6 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
@@ -9,6 +9,10 @@
"CamelFtpReplyString",
"CamelHttpResponseCode",
"CamelHttpResponseText",
+ "CamelKafkaKey",
+ "CamelKafkaOffset",
+ "CamelKafkaPartition",
+ "CamelKafkaTopic",
"CamelMqttTopic",
"CamelNatsDeliveryCounter",
"CamelNatsSID",
@@ -23,9 +27,5 @@
"CamelSqlUpdateCount",
"CamelSshExitValue",
"Content-Type",
- "kafka.KEY",
- "kafka.OFFSET",
- "kafka.PARTITION",
- "kafka.TOPIC",
"websocket.eventType"
]
diff --git
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
index 3cde0c9ee7c8..37f24b4c2b1c 100644
---
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
@@ -155,18 +155,18 @@
"useGlobalSslContextParameters": { "index": 128, "kind": "property",
"displayName": "Use Global Ssl Context Parameters", "group": "security",
"label": "security", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Enable usage of global SSL context
parameters." }
},
"headers": {
- "kafka.PARTITION_KEY": { "index": 0, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Explicitly specify the partition",
"constantName": "org.apache.camel.component.kafka.KafkaConstants#PARTITION_KEY"
},
- "kafka.PARTITION": { "index": 1, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The partition where the
message was stored", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#PARTITION" },
- "kafka.KEY": { "index": 2, "kind": "header", "displayName": "", "group":
"common", "label": "", "required": true, "javaType": "Object", "deprecated":
false, "deprecationNote": "", "autowired": false, "secret": false, "important":
true, "description": "Producer: The key of the message in order to ensure that
all related message goes in the same partition. Consumer: The key of the
message if configured", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#KEY" },
- "kafka.TOPIC": { "index": 3, "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "important": true, "description": "The topic from where the message
originated", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#TOPIC" },
- "kafka.OVERRIDE_TOPIC": { "index": 4, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The topic to which send the message (override
and takes precedence), and the header is not preserved.", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#OVERRIDE_TOPIC" },
- "kafka.OFFSET": { "index": 5, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The offset of the message",
"constantName": "org.apache.camel.component.kafka.KafkaConstants#OFFSET" },
- "kafka.HEADERS": { "index": 6, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"org.apache.kafka.common.header.Headers", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
record headers", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#HEADERS" },
- "kafka.LAST_RECORD_BEFORE_COMMIT": { "index": 7, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Whether or not it's the last record
before commit (only available if autoCommitEnable endpoint parameter is
false)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#LAST_RECORD_BEFORE_COMMIT" },
- "kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Indicates the last record within the current
poll request (only available if autoCommitEnable endpoint parameter is false or
allowManualCommit is true)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...]
- "kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The timestamp of the message", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
- "kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The ProducerRecord also has an
associated timestamp. If the user did provide a timestamp, the producer will
stamp the record with the provided timestamp and the header is not preserved.",
"constantName": "org.apache.camel.componen [...]
- "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"List<RecordMetadata>", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The metadata (only
configured if recordMetadata endpoint parameter is true)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
+ "CamelKafkaPartitionKey": { "index": 0, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Explicitly specify the partition",
"constantName": "org.apache.camel.component.kafka.KafkaConstants#PARTITION_KEY"
},
+ "CamelKafkaPartition": { "index": 1, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The partition where the
message was stored", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#PARTITION" },
+ "CamelKafkaKey": { "index": 2, "kind": "header", "displayName": "",
"group": "common", "label": "", "required": true, "javaType": "Object",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "important": true, "description": "Producer: The key of the message in
order to ensure that all related message goes in the same partition. Consumer:
The key of the message if configured", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#KEY" },
+ "CamelKafkaTopic": { "index": 3, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The topic from where the
message originated", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#TOPIC" },
+ "CamelKafkaOverrideTopic": { "index": 4, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The topic to which send the message (override
and takes precedence), and the header is not preserved.", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#OVERRIDE_TOPIC" },
+ "CamelKafkaOffset": { "index": 5, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "important": true, "description": "The offset of the message",
"constantName": "org.apache.camel.component.kafka.KafkaConstants#OFFSET" },
+ "CamelKafkaHeaders": { "index": 6, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"org.apache.kafka.common.header.Headers", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
record headers", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#HEADERS" },
+ "CamelKafkaLastRecordBeforeCommit": { "index": 7, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Whether or not it's the last record
before commit (only available if autoCommitEnable endpoint parameter is
false)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#LAST_RECORD_BEFORE_COMMIT" },
+ "CamelKafkaLastPollRecord": { "index": 8, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Indicates the last record within the current
poll request (only available if autoCommitEnable endpoint parameter is false or
allowManualCommit is true)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#LAST_POLL [...]
+ "CamelKafkaTimestamp": { "index": 9, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The timestamp of the message", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
+ "CamelKafkaOverrideTimestamp": { "index": 10, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The ProducerRecord also has an
associated timestamp. If the user did provide a timestamp, the producer will
stamp the record with the provided timestamp and the header is not preserved.",
"constantName": "org.apache.camel.compo [...]
+ "CamelKafkaRecordMeta": { "index": 11, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"List<RecordMetadata>", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The metadata (only
configured if recordMetadata endpoint parameter is true)", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
"CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated":
false, "deprecationNote": "", "autowired": false, "secret": false,
"description": "Can be used for forcing manual offset commit when using Kafka
consumer.", "constantName":
"org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" }
},
"properties": {
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c9a43fbc8338..5bfe3fce1016 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -942,10 +942,10 @@ Here is the minimal route you need to read messages from
Kafka.
----
from("kafka:test?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
- .log(" on the topic ${headers[kafka.TOPIC]}")
- .log(" on the partition ${headers[kafka.PARTITION]}")
- .log(" with the offset ${headers[kafka.OFFSET]}")
- .log(" with the key ${headers[kafka.KEY]}")
+ .log(" on the topic ${headers[CamelKafkaTopic]}")
+ .log(" on the partition ${headers[CamelKafkaPartition]}")
+ .log(" with the offset ${headers[CamelKafkaOffset]}")
+ .log(" with the key ${headers[CamelKafkaKey]}")
----
If you need to consume messages from multiple topics, you can use a comma
separated list of topic names.
@@ -954,10 +954,10 @@ If you need to consume messages from multiple topics, you
can use a comma separa
----
from("kafka:test,test1,test2?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
- .log(" on the topic ${headers[kafka.TOPIC]}")
- .log(" on the partition ${headers[kafka.PARTITION]}")
- .log(" with the offset ${headers[kafka.OFFSET]}")
- .log(" with the key ${headers[kafka.KEY]}")
+ .log(" on the topic ${headers[CamelKafkaTopic]}")
+ .log(" on the partition ${headers[CamelKafkaPartition]}")
+ .log(" with the offset ${headers[CamelKafkaOffset]}")
+ .log(" with the key ${headers[CamelKafkaKey]}")
----
It's also possible to subscribe to multiple topics giving a pattern as the
topic name and using the `topicIsPattern` option.
@@ -966,10 +966,10 @@ It's also possible to subscribe to multiple topics giving
a pattern as the topic
----
from("kafka:test.*?brokers=localhost:9092&topicIsPattern=true")
.log("Message received from Kafka : ${body}")
- .log(" on the topic ${headers[kafka.TOPIC]}")
- .log(" on the partition ${headers[kafka.PARTITION]}")
- .log(" with the offset ${headers[kafka.OFFSET]}")
- .log(" with the key ${headers[kafka.KEY]}")
+ .log(" on the topic ${headers[CamelKafkaTopic]}")
+ .log(" on the partition ${headers[CamelKafkaPartition]}")
+ .log(" with the offset ${headers[CamelKafkaOffset]}")
+ .log(" with the key ${headers[CamelKafkaKey]}")
----
When consuming messages from Kafka, you can use your own offset management and
not delegate this management to Kafka.
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 2737ff371a46..13e3307d65b7 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -21,39 +21,39 @@ import org.apache.camel.spi.Metadata;
public final class KafkaConstants {
@Metadata(label = "producer", description = "Explicitly specify the
partition", javaType = "Integer")
- public static final String PARTITION_KEY = "kafka.PARTITION_KEY";
+ public static final String PARTITION_KEY = "CamelKafkaPartitionKey";
@Metadata(label = "consumer", description = "The partition where the
message was stored", javaType = "Integer",
important = true)
- public static final String PARTITION = "kafka.PARTITION";
+ public static final String PARTITION = "CamelKafkaPartition";
@Metadata(description = "Producer: The key of the message in order to
ensure that all related message goes in the same partition. "
+ "Consumer: The key of the message if configured",
javaType = "Object", required = true, important = true)
- public static final String KEY = "kafka.KEY";
+ public static final String KEY = "CamelKafkaKey";
@Metadata(label = "consumer", description = "The topic from where the
message originated", javaType = "String",
important = true)
- public static final String TOPIC = "kafka.TOPIC";
+ public static final String TOPIC = "CamelKafkaTopic";
@Metadata(label = "producer",
description = "The topic to which send the message (override and
takes precedence), and the header is not preserved.",
javaType = "String")
- public static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC";
+ public static final String OVERRIDE_TOPIC = "CamelKafkaOverrideTopic";
@Metadata(label = "consumer", description = "The offset of the message",
javaType = "Long", important = true)
- public static final String OFFSET = "kafka.OFFSET";
+ public static final String OFFSET = "CamelKafkaOffset";
@Metadata(label = "consumer", description = "The record headers", javaType
= "org.apache.kafka.common.header.Headers")
- public static final String HEADERS = "kafka.HEADERS";
+ public static final String HEADERS = "CamelKafkaHeaders";
@Metadata(label = "consumer",
description = "Whether or not it's the last record before commit
(only available if `autoCommitEnable` endpoint parameter is `false`)",
javaType = "Boolean")
- public static final String LAST_RECORD_BEFORE_COMMIT =
"kafka.LAST_RECORD_BEFORE_COMMIT";
+ public static final String LAST_RECORD_BEFORE_COMMIT =
"CamelKafkaLastRecordBeforeCommit";
@Metadata(label = "consumer", description = "Indicates the last record
within the current poll request " +
"(only available if
`autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is
`true`)",
javaType = "Boolean")
- public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD";
+ public static final String LAST_POLL_RECORD = "CamelKafkaLastPollRecord";
@Metadata(label = "consumer", description = "The timestamp of the
message", javaType = "Long")
- public static final String TIMESTAMP = "kafka.TIMESTAMP";
+ public static final String TIMESTAMP = "CamelKafkaTimestamp";
@Metadata(label = "producer", description = "The ProducerRecord also has
an associated timestamp. " +
"If the user did provide a
timestamp, the producer will stamp the record with the provided timestamp and
the header is not preserved.",
javaType = "Long")
- public static final String OVERRIDE_TIMESTAMP = "kafka.OVERRIDE_TIMESTAMP";
+ public static final String OVERRIDE_TIMESTAMP =
"CamelKafkaOverrideTimestamp";
@Deprecated
public static final String KAFKA_DEFAULT_ENCODER =
"kafka.serializer.DefaultEncoder";
@@ -66,7 +66,7 @@ public final class KafkaConstants {
@Metadata(label = "producer",
description = "The metadata (only configured if `recordMetadata`
endpoint parameter is `true`)",
javaType = "List<RecordMetadata>")
- public static final String KAFKA_RECORD_META = "kafka.RECORD_META";
+ public static final String KAFKA_RECORD_META = "CamelKafkaRecordMeta";
@Metadata(label = "consumer", description = "Can be used for forcing
manual offset commit when using Kafka consumer.",
javaType =
"org.apache.camel.component.kafka.consumer.KafkaManualCommit")
public static final String MANUAL_COMMIT = "CamelKafkaManualCommit";
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderDeserializer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderDeserializer.java
index bbe8eeb32623..e758d38dea14 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderDeserializer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderDeserializer.java
@@ -85,7 +85,7 @@ public class KafkaHeaderDeserializer implements Processor {
* Exclude special Kafka headers from auto deserialization.
*/
private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
- return !entry.getKey().equals("kafka.HEADERS") &&
!entry.getKey().equals("CamelKafkaManualCommit");
+ return !entry.getKey().equals(KafkaConstants.HEADERS) &&
!entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
}
public void setEnabled(String enabled) {
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/MessageTimestampRouter.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/MessageTimestampRouter.java
index 0ad6718b1183..9009188d9a20 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/MessageTimestampRouter.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/MessageTimestampRouter.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
public class MessageTimestampRouter {
@@ -54,7 +55,7 @@ public class MessageTimestampRouter {
}
Object rawTimestamp = null;
- String topicName = ex.getMessage().getHeader("kafka.TOPIC",
String.class);
+ String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
for (String key : splittedKeys) {
if (ObjectHelper.isNotEmpty(key)) {
rawTimestamp = body.get(key);
@@ -82,7 +83,7 @@ public class MessageTimestampRouter {
replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
}
- ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
+ ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
updatedTopic);
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/RegexRouter.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/RegexRouter.java
index 582b9761bf63..f72893f5f5e0 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/RegexRouter.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/RegexRouter.java
@@ -21,6 +21,7 @@ import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
public class RegexRouter {
@@ -28,12 +29,12 @@ public class RegexRouter {
public void process(
@ExchangeProperty("regex") String regex,
@ExchangeProperty("replacement") String replacement, Exchange ex) {
Pattern regexPattern = Pattern.compile(regex);
- String topicName = ex.getMessage().getHeader("kafka.TOPIC",
String.class);
+ String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
if (ObjectHelper.isNotEmpty(topicName)) {
final Matcher matcher = regexPattern.matcher(topicName);
if (matcher.matches()) {
final String topicUpdated = matcher.replaceFirst(replacement);
- ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC",
topicUpdated);
+ ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
topicUpdated);
}
}
String ceType = ex.getMessage().getHeader("ce-type", String.class);
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/TimestampRouter.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/TimestampRouter.java
index 8987568a455d..5c8e4fe42ea8 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/TimestampRouter.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/TimestampRouter.java
@@ -25,6 +25,7 @@ import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
public class TimestampRouter {
@@ -40,7 +41,7 @@ public class TimestampRouter {
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
Long timestamp = null;
- String topicName = ex.getMessage().getHeader("kafka.TOPIC",
String.class);
+ String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
if (rawTimestamp instanceof Long longValue) {
timestamp = longValue;
@@ -61,7 +62,7 @@ public class TimestampRouter {
replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
}
- ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
+ ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
updatedTopic);
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/ValueToKey.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/ValueToKey.java
index 9adf1067c1a8..b2fc89b9a2c8 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/ValueToKey.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/ValueToKey.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
public class ValueToKey {
@@ -47,7 +48,7 @@ public class ValueToKey {
}
}
- ex.getMessage().setHeader("kafka.KEY", key);
+ ex.getMessage().setHeader(KafkaConstants.KEY, key);
}
boolean filterNames(String fieldName, List<String> splittedFields) {
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 26a50f7782b6..9a195bb2a8aa 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -161,7 +161,7 @@ public class KafkaConsumerFullIT extends
BaseKafkaTestSupport {
to.assertIsSatisfied(3000);
Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
- assertTrue(headers.containsKey(KafkaConstants.TOPIC), "Should receive
KafkaEndpoint populated kafka.TOPIC header");
+ assertTrue(headers.containsKey(KafkaConstants.TOPIC), "Should receive
KafkaEndpoint populated CamelKafkaTopic header");
assertEquals(TOPIC, headers.get(KafkaConstants.TOPIC), "Topic name
received");
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/transform/RegexRouterTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/transform/RegexRouterTest.java
index 84d859a81508..473bfa05b776 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/transform/RegexRouterTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/transform/RegexRouterTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.kafka.transform;
import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Assertions;
@@ -41,10 +42,10 @@ class RegexRouterTest {
void shouldReplaceFieldToPlainJson() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.getMessage().setHeader("kafka.TOPIC", topic);
+ exchange.getMessage().setHeader(KafkaConstants.TOPIC, topic);
processor.process(".*ll.*", "newTopic", exchange);
- Assertions.assertEquals("newTopic",
exchange.getMessage().getHeader("kafka.OVERRIDE_TOPIC"));
+ Assertions.assertEquals("newTopic",
exchange.getMessage().getHeader(KafkaConstants.OVERRIDE_TOPIC));
}
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index d66338148810..1cea7b40ff6b 100644
---
a/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.component.kafka.KafkaProducer;
import org.apache.camel.impl.DefaultCamelContext;
@@ -56,8 +57,8 @@ public class KafkaProducerTest {
camelProducer.setKafkaProducer(kafkaProducer);
when(exchange.getIn()).thenReturn(message);
when(exchange.getContext()).thenReturn(context);
- when(message.getHeader("kafka.PARTITION_KEY",
Integer.class)).thenReturn(0);
- when(message.getHeader("kafka.KEY")).thenReturn("key");
+ when(message.getHeader(KafkaConstants.PARTITION_KEY,
Integer.class)).thenReturn(0);
+ when(message.getHeader(KafkaConstants.KEY)).thenReturn("key");
}
@AfterEach
@@ -67,9 +68,9 @@ public class KafkaProducerTest {
@Test
public void testSendOverrideTopic() throws Exception {
-
when(message.removeHeader("kafka.OVERRIDE_TOPIC")).thenReturn("overridden-topic");
+
when(message.removeHeader(KafkaConstants.OVERRIDE_TOPIC)).thenReturn("overridden-topic");
camelProducer.process(exchange);
- when(message.removeHeader("kafka.OVERRIDE_TOPIC")).thenReturn(new
TextNode("overridden-topic-jackson"));
+
when(message.removeHeader(KafkaConstants.OVERRIDE_TOPIC)).thenReturn(new
TextNode("overridden-topic-jackson"));
camelProducer.process(exchange);
List<ProducerRecord<Object, Object>> records = kafkaProducer.history();
assertThat(records.get(0).topic(), Is.is("overridden-topic"));
diff --git
a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanKindTest.java
b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanKindTest.java
index 7746b7515bd3..604016b0351c 100644
---
a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanKindTest.java
+++
b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanKindTest.java
@@ -105,9 +105,9 @@ public class SpanKindTest extends
OpenTelemetryTracerTestSupport {
// Send with Kafka headers that would normally be set before/during
sending
template.sendBodyAndHeaders("direct:kafkaProducer", "test message",
- Map.of("kafka.KEY", "test-key",
- "kafka.PARTITION", 0,
- "kafka.OFFSET", "12345"));
+ Map.of("CamelKafkaKey", "test-key",
+ "CamelKafkaPartition", 0,
+ "CamelKafkaOffset", "12345"));
mockEndpoint.assertIsSatisfied();
diff --git
a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/mock/MockKafkaProducer.java
b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/mock/MockKafkaProducer.java
index 16baf5c1baa1..04a8f135081a 100644
---
a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/mock/MockKafkaProducer.java
+++
b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/mock/MockKafkaProducer.java
@@ -33,9 +33,9 @@ class MockKafkaProducer extends DefaultProducer {
public void process(Exchange exchange) throws Exception {
// Simulate Kafka response with partition, offset, and key
// These headers would normally be set by the real Kafka producer
- exchange.getMessage().setHeader("kafka.PARTITION", 0);
- exchange.getMessage().setHeader("kafka.OFFSET", "12345");
- exchange.getMessage().setHeader("kafka.KEY", "test-key");
+ exchange.getMessage().setHeader("CamelKafkaPartition", 0);
+ exchange.getMessage().setHeader("CamelKafkaOffset", "12345");
+ exchange.getMessage().setHeader("CamelKafkaKey", "test-key");
exchange.getMessage().setBody("Kafka Response: " +
exchange.getIn().getBody());
}
}
diff --git
a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/KafkaSpanDecorator.java
b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/KafkaSpanDecorator.java
index c7905527fdfb..2a798e40f9f3 100644
---
a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/KafkaSpanDecorator.java
+++
b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/KafkaSpanDecorator.java
@@ -30,11 +30,11 @@ public class KafkaSpanDecorator extends
AbstractMessagingSpanDecorator {
/**
* Constants copied from {@link
org.apache.camel.component.kafka.KafkaConstants}
*/
- protected static final String PARTITION_KEY = "kafka.PARTITION_KEY";
- protected static final String PARTITION = "kafka.PARTITION";
- protected static final String KEY = "kafka.KEY";
- protected static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC";
- protected static final String OFFSET = "kafka.OFFSET";
+ protected static final String PARTITION_KEY = "CamelKafkaPartitionKey";
+ protected static final String PARTITION = "CamelKafkaPartition";
+ protected static final String KEY = "CamelKafkaKey";
+ protected static final String OVERRIDE_TOPIC = "CamelKafkaOverrideTopic";
+ protected static final String OFFSET = "CamelKafkaOffset";
@Override
public String getComponent() {
diff --git
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/KafkaSpanDecorator.java
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/KafkaSpanDecorator.java
index e72f62fa9424..de2db9687d8c 100644
---
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/KafkaSpanDecorator.java
+++
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/KafkaSpanDecorator.java
@@ -31,11 +31,11 @@ public class KafkaSpanDecorator extends
AbstractMessagingSpanDecorator {
/**
* Constants copied from {@link
org.apache.camel.component.kafka.KafkaConstants}
*/
- protected static final String PARTITION_KEY = "kafka.PARTITION_KEY";
- protected static final String PARTITION = "kafka.PARTITION";
- protected static final String KEY = "kafka.KEY";
- protected static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC";
- protected static final String OFFSET = "kafka.OFFSET";
+ protected static final String PARTITION_KEY = "CamelKafkaPartitionKey";
+ protected static final String PARTITION = "CamelKafkaPartition";
+ protected static final String KEY = "CamelKafkaKey";
+ protected static final String OVERRIDE_TOPIC = "CamelKafkaOverrideTopic";
+ protected static final String OFFSET = "CamelKafkaOffset";
@Override
public String getComponent() {
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
b/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
index e3c243efd5a1..14f22f6bd80e 100644
---
a/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
+++
b/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
@@ -37,6 +37,10 @@ public final class ImportantHeaderUtils {
"CamelFtpReplyString",
"CamelHttpResponseCode",
"CamelHttpResponseText",
+ "CamelKafkaKey",
+ "CamelKafkaOffset",
+ "CamelKafkaPartition",
+ "CamelKafkaTopic",
"CamelMqttTopic",
"CamelNatsDeliveryCounter",
"CamelNatsSID",
@@ -51,10 +55,6 @@ public final class ImportantHeaderUtils {
"CamelSqlUpdateCount",
"CamelSshExitValue",
"Content-Type",
- "kafka.KEY",
- "kafka.OFFSET",
- "kafka.PARTITION",
- "kafka.TOPIC",
"websocket.eventType"
// IMPORTANT-HEADER-KEYS: END
)));
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
index 17122c8aa8b4..a75f995c5473 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
@@ -1486,6 +1486,98 @@ Routes that set the header by its literal string value
(for example
As a consequence, the generated Endpoint DSL header accessor `await()` on
`MiloClientHeaderNameBuilder` has been renamed to `miloAwait()`.
+=== camel-kafka - potential breaking change
+
+The Exchange header constants in `KafkaConstants` used header values in the
+lowercase / dotted `kafka.*` namespace, outside the `Camel` namespace, and
+were therefore not filtered by the default `HeaderFilterStrategy` on
+upstream HTTP / REST consumers. They have been renamed to follow the Camel
+naming convention used across the rest of the component catalog. The Java
+field names are unchanged; only the header string values have changed:
+
+[options="header"]
+|===
+| Constant | Previous value | New value
+| `KafkaConstants.PARTITION_KEY` | `kafka.PARTITION_KEY` |
`CamelKafkaPartitionKey`
+| `KafkaConstants.PARTITION` | `kafka.PARTITION` | `CamelKafkaPartition`
+| `KafkaConstants.KEY` | `kafka.KEY` | `CamelKafkaKey`
+| `KafkaConstants.TOPIC` | `kafka.TOPIC` | `CamelKafkaTopic`
+| `KafkaConstants.OVERRIDE_TOPIC` | `kafka.OVERRIDE_TOPIC` |
`CamelKafkaOverrideTopic`
+| `KafkaConstants.OFFSET` | `kafka.OFFSET` | `CamelKafkaOffset`
+| `KafkaConstants.HEADERS` | `kafka.HEADERS` | `CamelKafkaHeaders`
+| `KafkaConstants.LAST_RECORD_BEFORE_COMMIT` |
`kafka.LAST_RECORD_BEFORE_COMMIT` | `CamelKafkaLastRecordBeforeCommit`
+| `KafkaConstants.LAST_POLL_RECORD` | `kafka.LAST_POLL_RECORD` |
`CamelKafkaLastPollRecord`
+| `KafkaConstants.TIMESTAMP` | `kafka.TIMESTAMP` | `CamelKafkaTimestamp`
+| `KafkaConstants.OVERRIDE_TIMESTAMP` | `kafka.OVERRIDE_TIMESTAMP` |
`CamelKafkaOverrideTimestamp`
+| `KafkaConstants.KAFKA_RECORD_META` | `kafka.RECORD_META` |
`CamelKafkaRecordMeta`
+|===
+
+`KafkaConstants.MANUAL_COMMIT` was already `Camel`-prefixed
+(`CamelKafkaManualCommit`) and is unchanged. The non-header constants in the
+class (`KAFKA_DEFAULT_ENCODER`, `KAFKA_STRING_ENCODER`,
+`KAFKA_DEFAULT_SERIALIZER`, `KAFKA_DEFAULT_DESERIALIZER`,
+`PARTITIONER_RANGE_ASSIGNOR`, `KAFKA_SUBSCRIBE_ADAPTER`) are not Exchange
+headers and are unchanged.
+
+`KafkaConstants.OVERRIDE_TOPIC` and `KafkaConstants.OVERRIDE_TIMESTAMP` are
+producer-read headers consulted by `KafkaProducer.evaluateTopic()` and the
+timestamp override path. The rename also closes the cross-transport
+propagation gap where these headers, in their previous `kafka.*` form,
+would pass through `HttpHeaderFilterStrategy` (which only filters the
+`Camel` prefix) on an upstream HTTP / REST consumer wired to a Kafka
+producer in the same route. With the new `CamelKafka*` values, the default
+filter strategy strips them at the transport boundary.
+
+Routes that reference the constants symbolically (for example
+`setHeader(KafkaConstants.OVERRIDE_TOPIC, ...)` or
+`${headers[KafkaConstants.TOPIC]}` via the symbolic name) continue to work
+without changes. Routes that set or read the headers by their literal
+string values (for example `setHeader("kafka.OVERRIDE_TOPIC", ...)` or
+Simple expressions such as `${headers[kafka.TOPIC]}`) must be updated to
+use the new values:
+
+[source,java]
+----
+// before
+from("platform-http:/api/events")
+ .setHeader("kafka.OVERRIDE_TOPIC", constant("events.topic"))
+ .to("kafka:default?brokers=localhost:9092");
+
+// after
+from("platform-http:/api/events")
+ .setHeader(KafkaConstants.OVERRIDE_TOPIC, constant("events.topic"))
+ .to("kafka:default?brokers=localhost:9092");
+----
+
+A route that bridges an untrusted HTTP / REST consumer to a Kafka producer
+should also continue to apply the standard transport-boundary mitigation,
+which now also covers the kafka headers:
+
+[source,java]
+----
+from("platform-http:/api/events")
+ .removeHeaders("Camel*")
+ .to("kafka:default?brokers=localhost:9092");
+----
+
+The bundled Kafka Connect-style transformers under
+`org.apache.camel.component.kafka.transform` (`RegexRouter`,
+`TimestampRouter`, `MessageTimestampRouter`, `ValueToKey`) have been
+updated to use the renamed `KafkaConstants` values internally and are
+unaffected at the call site.
+
+The local copies of these constants in
+`org.apache.camel.tracing.decorators.KafkaSpanDecorator` (deprecated since
+4.19.0) and `org.apache.camel.telemetry.decorators.KafkaSpanDecorator` have
+been kept in sync with the new values so that Kafka span tagging continues
+to work for routes that have migrated to the new header names.
+
+The generated Endpoint DSL header accessors on
+`KafkaEndpointBuilderFactory$KafkaHeaderNameBuilder` (for example
+`kafkaOverrideTopic()`, `kafkaTopic()`, `kafkaPartitionKey()`) keep their
+method names; only the returned string value reflects the new
+`CamelKafka*` convention.
+
=== Jackson dataformat documentation pages renamed
The Jackson 2.x and Jackson 3.x lines ship the same dataformat names
(`jackson`, `jacksonXml`,
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 9dc46314a149..843c2d5e97e3 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -5737,10 +5737,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: producer
*
- * @return the name of the header {@code kafka.PARTITION_KEY}.
+ * @return the name of the header {@code KafkaPartitionKey}.
*/
public String kafkaPartitionKey() {
- return "kafka.PARTITION_KEY";
+ return "CamelKafkaPartitionKey";
}
/**
* The partition where the message was stored.
@@ -5749,10 +5749,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: consumer
*
- * @return the name of the header {@code kafka.PARTITION}.
+ * @return the name of the header {@code KafkaPartition}.
*/
public String kafkaPartition() {
- return "kafka.PARTITION";
+ return "CamelKafkaPartition";
}
/**
* Producer: The key of the message in order to ensure that all related
@@ -5764,10 +5764,10 @@ public interface KafkaEndpointBuilderFactory {
* Required: true
* Group: common
*
- * @return the name of the header {@code kafka.KEY}.
+ * @return the name of the header {@code KafkaKey}.
*/
public String kafkaKey() {
- return "kafka.KEY";
+ return "CamelKafkaKey";
}
/**
* The topic from where the message originated.
@@ -5776,10 +5776,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: consumer
*
- * @return the name of the header {@code kafka.TOPIC}.
+ * @return the name of the header {@code KafkaTopic}.
*/
public String kafkaTopic() {
- return "kafka.TOPIC";
+ return "CamelKafkaTopic";
}
/**
* The topic to which send the message (override and takes precedence),
@@ -5789,10 +5789,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: producer
*
- * @return the name of the header {@code kafka.OVERRIDE_TOPIC}.
+ * @return the name of the header {@code KafkaOverrideTopic}.
*/
public String kafkaOverrideTopic() {
- return "kafka.OVERRIDE_TOPIC";
+ return "CamelKafkaOverrideTopic";
}
/**
* The offset of the message.
@@ -5801,10 +5801,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: consumer
*
- * @return the name of the header {@code kafka.OFFSET}.
+ * @return the name of the header {@code KafkaOffset}.
*/
public String kafkaOffset() {
- return "kafka.OFFSET";
+ return "CamelKafkaOffset";
}
/**
* The record headers.
@@ -5813,10 +5813,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: consumer
*
- * @return the name of the header {@code kafka.HEADERS}.
+ * @return the name of the header {@code KafkaHeaders}.
*/
public String kafkaHeaders() {
- return "kafka.HEADERS";
+ return "CamelKafkaHeaders";
}
/**
* Whether or not it's the last record before commit (only available if
@@ -5826,11 +5826,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: consumer
*
- * @return the name of the header {@code
- * kafka.LAST_RECORD_BEFORE_COMMIT}.
+ * @return the name of the header {@code KafkaLastRecordBeforeCommit}.
*/
public String kafkaLastRecordBeforeCommit() {
- return "kafka.LAST_RECORD_BEFORE_COMMIT";
+ return "CamelKafkaLastRecordBeforeCommit";
}
/**
* Indicates the last record within the current poll request (only
@@ -5841,10 +5840,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: consumer
*
- * @return the name of the header {@code kafka.LAST_POLL_RECORD}.
+ * @return the name of the header {@code KafkaLastPollRecord}.
*/
public String kafkaLastPollRecord() {
- return "kafka.LAST_POLL_RECORD";
+ return "CamelKafkaLastPollRecord";
}
/**
* The timestamp of the message.
@@ -5853,10 +5852,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: consumer
*
- * @return the name of the header {@code kafka.TIMESTAMP}.
+ * @return the name of the header {@code KafkaTimestamp}.
*/
public String kafkaTimestamp() {
- return "kafka.TIMESTAMP";
+ return "CamelKafkaTimestamp";
}
/**
* The ProducerRecord also has an associated timestamp. If the user did
@@ -5867,10 +5866,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: producer
*
- * @return the name of the header {@code kafka.OVERRIDE_TIMESTAMP}.
+ * @return the name of the header {@code KafkaOverrideTimestamp}.
*/
public String kafkaOverrideTimestamp() {
- return "kafka.OVERRIDE_TIMESTAMP";
+ return "CamelKafkaOverrideTimestamp";
}
/**
* The metadata (only configured if recordMetadata endpoint parameter
is
@@ -5880,10 +5879,10 @@ public interface KafkaEndpointBuilderFactory {
*
* Group: producer
*
- * @return the name of the header {@code kafka.RECORD_META}.
+ * @return the name of the header {@code KafkaRecordMeta}.
*/
public String kafkaRecordMeta() {
- return "kafka.RECORD_META";
+ return "CamelKafkaRecordMeta";
}
/**
* Can be used for forcing manual offset commit when using Kafka