This is an automated email from the ASF dual-hosted git repository.

oscerd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f8914a219ad4 CAMEL-23584: camel-kafka - align Exchange header constant 
names with Camel naming convention
f8914a219ad4 is described below

commit f8914a219ad473811c01562b5b85e83ba583b583
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri May 29 07:00:34 2026 +0200

    CAMEL-23584: camel-kafka - align Exchange header constant names with Camel 
naming convention
    
    Renames the Exchange header string values in KafkaConstants from the
    non-Camel-prefixed "kafka.*" namespace to the project-wide "CamelKafka*"
    PascalCase convention, completing the camel-kafka sub-task under the
    CAMEL-23577 umbrella header-naming-convention sweep. The Java field names
    are unchanged, so routes using the symbolic constants are unaffected;
    routes hard-coding the literal "kafka.*" header strings must move to the
    new "CamelKafka*" values, documented as a potential breaking change in the
    4.21 upgrade guide. Also keeps the camel-tracing / camel-telemetry
    KafkaSpanDecorator copies and the bundled transform classes in sync, and
    regenerates the catalog, endpoint DSL, and important-headers artifacts.
    
    Tracker: CAMEL-23577
    
    Closes #23602
---
 .../org/apache/camel/catalog/components/kafka.json | 24 +++---
 .../camel/catalog/main/important-headers.json      |  8 +-
 .../org/apache/camel/component/kafka/kafka.json    | 24 +++---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 24 +++---
 .../camel/component/kafka/KafkaConstants.java      | 24 +++---
 .../component/kafka/KafkaHeaderDeserializer.java   |  2 +-
 .../kafka/transform/MessageTimestampRouter.java    |  5 +-
 .../component/kafka/transform/RegexRouter.java     |  5 +-
 .../component/kafka/transform/TimestampRouter.java |  5 +-
 .../component/kafka/transform/ValueToKey.java      |  3 +-
 .../kafka/integration/KafkaConsumerFullIT.java     |  2 +-
 .../component/kafka/transform/RegexRouterTest.java |  5 +-
 .../kafka/clients/producer/KafkaProducerTest.java  |  9 ++-
 .../apache/camel/opentelemetry2/SpanKindTest.java  |  6 +-
 .../opentelemetry2/mock/MockKafkaProducer.java     |  6 +-
 .../telemetry/decorators/KafkaSpanDecorator.java   | 10 +--
 .../tracing/decorators/KafkaSpanDecorator.java     | 10 +--
 .../apache/camel/util/ImportantHeaderUtils.java    |  8 +-
 .../ROOT/pages/camel-4x-upgrade-guide-4_21.adoc    | 92 ++++++++++++++++++++++
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  | 49 ++++++------
 20 files changed, 209 insertions(+), 112 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 3cde0c9ee7c8..37f24b4c2b1c 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -155,18 +155,18 @@
     "useGlobalSslContextParameters": { "index": 128, "kind": "property", 
"displayName": "Use Global Ssl Context Parameters", "group": "security", 
"label": "security", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Enable usage of global SSL context 
parameters." }
   },
   "headers": {
-    "kafka.PARTITION_KEY": { "index": 0, "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Explicitly specify the partition", 
"constantName": "org.apache.camel.component.kafka.KafkaConstants#PARTITION_KEY" 
},
-    "kafka.PARTITION": { "index": 1, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The partition where the 
message was stored", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#PARTITION" },
-    "kafka.KEY": { "index": 2, "kind": "header", "displayName": "", "group": 
"common", "label": "", "required": true, "javaType": "Object", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, "important": 
true, "description": "Producer: The key of the message in order to ensure that 
all related message goes in the same partition. Consumer: The key of the 
message if configured", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#KEY" },
-    "kafka.TOPIC": { "index": 3, "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "important": true, "description": "The topic from where the message 
originated", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#TOPIC" },
-    "kafka.OVERRIDE_TOPIC": { "index": 4, "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The topic to which send the message (override 
and takes precedence), and the header is not preserved.", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#OVERRIDE_TOPIC" },
-    "kafka.OFFSET": { "index": 5, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The offset of the message", 
"constantName": "org.apache.camel.component.kafka.KafkaConstants#OFFSET" },
-    "kafka.HEADERS": { "index": 6, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"org.apache.kafka.common.header.Headers", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": "The 
record headers", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#HEADERS" },
-    "kafka.LAST_RECORD_BEFORE_COMMIT": { "index": 7, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Whether or not it's the last record 
before commit (only available if autoCommitEnable endpoint parameter is 
false)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#LAST_RECORD_BEFORE_COMMIT" },
-    "kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName": 
"", "group": "consumer", "label": "consumer", "required": false, "javaType": 
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Indicates the last record within the current 
poll request (only available if autoCommitEnable endpoint parameter is false or 
allowManualCommit is true)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...]
-    "kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The timestamp of the message", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
-    "kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The ProducerRecord also has an 
associated timestamp. If the user did provide a timestamp, the producer will 
stamp the record with the provided timestamp and the header is not preserved.", 
"constantName": "org.apache.camel.componen [...]
-    "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"List<RecordMetadata>", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The metadata (only 
configured if recordMetadata endpoint parameter is true)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
+    "CamelKafkaPartitionKey": { "index": 0, "kind": "header", "displayName": 
"", "group": "producer", "label": "producer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Explicitly specify the partition", 
"constantName": "org.apache.camel.component.kafka.KafkaConstants#PARTITION_KEY" 
},
+    "CamelKafkaPartition": { "index": 1, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The partition where the 
message was stored", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#PARTITION" },
+    "CamelKafkaKey": { "index": 2, "kind": "header", "displayName": "", 
"group": "common", "label": "", "required": true, "javaType": "Object", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "important": true, "description": "Producer: The key of the message in 
order to ensure that all related message goes in the same partition. Consumer: 
The key of the message if configured", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#KEY" },
+    "CamelKafkaTopic": { "index": 3, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The topic from where the 
message originated", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#TOPIC" },
+    "CamelKafkaOverrideTopic": { "index": 4, "kind": "header", "displayName": 
"", "group": "producer", "label": "producer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The topic to which send the message (override 
and takes precedence), and the header is not preserved.", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#OVERRIDE_TOPIC" },
+    "CamelKafkaOffset": { "index": 5, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The offset of the message", 
"constantName": "org.apache.camel.component.kafka.KafkaConstants#OFFSET" },
+    "CamelKafkaHeaders": { "index": 6, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"org.apache.kafka.common.header.Headers", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": "The 
record headers", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#HEADERS" },
+    "CamelKafkaLastRecordBeforeCommit": { "index": 7, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Whether or not it's the last record 
before commit (only available if autoCommitEnable endpoint parameter is 
false)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#LAST_RECORD_BEFORE_COMMIT" },
+    "CamelKafkaLastPollRecord": { "index": 8, "kind": "header", "displayName": 
"", "group": "consumer", "label": "consumer", "required": false, "javaType": 
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Indicates the last record within the current 
poll request (only available if autoCommitEnable endpoint parameter is false or 
allowManualCommit is true)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#LAST_POLL [...]
+    "CamelKafkaTimestamp": { "index": 9, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The timestamp of the message", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
+    "CamelKafkaOverrideTimestamp": { "index": 10, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The ProducerRecord also has an 
associated timestamp. If the user did provide a timestamp, the producer will 
stamp the record with the provided timestamp and the header is not preserved.", 
"constantName": "org.apache.camel.compo [...]
+    "CamelKafkaRecordMeta": { "index": 11, "kind": "header", "displayName": 
"", "group": "producer", "label": "producer", "required": false, "javaType": 
"List<RecordMetadata>", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The metadata (only 
configured if recordMetadata endpoint parameter is true)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
     "CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName": 
"", "group": "consumer", "label": "consumer", "required": false, "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"description": "Can be used for forcing manual offset commit when using Kafka 
consumer.", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" }
   },
   "properties": {
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
index e88ef5eb65d7..d5eefe63ecb6 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
@@ -9,6 +9,10 @@
   "CamelFtpReplyString",
   "CamelHttpResponseCode",
   "CamelHttpResponseText",
+  "CamelKafkaKey",
+  "CamelKafkaOffset",
+  "CamelKafkaPartition",
+  "CamelKafkaTopic",
   "CamelMqttTopic",
   "CamelNatsDeliveryCounter",
   "CamelNatsSID",
@@ -23,9 +27,5 @@
   "CamelSqlUpdateCount",
   "CamelSshExitValue",
   "Content-Type",
-  "kafka.KEY",
-  "kafka.OFFSET",
-  "kafka.PARTITION",
-  "kafka.TOPIC",
   "websocket.eventType"
 ]
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
index 3cde0c9ee7c8..37f24b4c2b1c 100644
--- 
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
@@ -155,18 +155,18 @@
     "useGlobalSslContextParameters": { "index": 128, "kind": "property", 
"displayName": "Use Global Ssl Context Parameters", "group": "security", 
"label": "security", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Enable usage of global SSL context 
parameters." }
   },
   "headers": {
-    "kafka.PARTITION_KEY": { "index": 0, "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Explicitly specify the partition", 
"constantName": "org.apache.camel.component.kafka.KafkaConstants#PARTITION_KEY" 
},
-    "kafka.PARTITION": { "index": 1, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The partition where the 
message was stored", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#PARTITION" },
-    "kafka.KEY": { "index": 2, "kind": "header", "displayName": "", "group": 
"common", "label": "", "required": true, "javaType": "Object", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, "important": 
true, "description": "Producer: The key of the message in order to ensure that 
all related message goes in the same partition. Consumer: The key of the 
message if configured", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#KEY" },
-    "kafka.TOPIC": { "index": 3, "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "important": true, "description": "The topic from where the message 
originated", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#TOPIC" },
-    "kafka.OVERRIDE_TOPIC": { "index": 4, "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The topic to which send the message (override 
and takes precedence), and the header is not preserved.", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#OVERRIDE_TOPIC" },
-    "kafka.OFFSET": { "index": 5, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The offset of the message", 
"constantName": "org.apache.camel.component.kafka.KafkaConstants#OFFSET" },
-    "kafka.HEADERS": { "index": 6, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"org.apache.kafka.common.header.Headers", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": "The 
record headers", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#HEADERS" },
-    "kafka.LAST_RECORD_BEFORE_COMMIT": { "index": 7, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Whether or not it's the last record 
before commit (only available if autoCommitEnable endpoint parameter is 
false)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#LAST_RECORD_BEFORE_COMMIT" },
-    "kafka.LAST_POLL_RECORD": { "index": 8, "kind": "header", "displayName": 
"", "group": "consumer", "label": "consumer", "required": false, "javaType": 
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Indicates the last record within the current 
poll request (only available if autoCommitEnable endpoint parameter is false or 
allowManualCommit is true)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#LAST_POLL_R [...]
-    "kafka.TIMESTAMP": { "index": 9, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The timestamp of the message", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
-    "kafka.OVERRIDE_TIMESTAMP": { "index": 10, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The ProducerRecord also has an 
associated timestamp. If the user did provide a timestamp, the producer will 
stamp the record with the provided timestamp and the header is not preserved.", 
"constantName": "org.apache.camel.componen [...]
-    "kafka.RECORD_META": { "index": 11, "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"List<RecordMetadata>", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The metadata (only 
configured if recordMetadata endpoint parameter is true)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
+    "CamelKafkaPartitionKey": { "index": 0, "kind": "header", "displayName": 
"", "group": "producer", "label": "producer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Explicitly specify the partition", 
"constantName": "org.apache.camel.component.kafka.KafkaConstants#PARTITION_KEY" 
},
+    "CamelKafkaPartition": { "index": 1, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The partition where the 
message was stored", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#PARTITION" },
+    "CamelKafkaKey": { "index": 2, "kind": "header", "displayName": "", 
"group": "common", "label": "", "required": true, "javaType": "Object", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "important": true, "description": "Producer: The key of the message in 
order to ensure that all related message goes in the same partition. Consumer: 
The key of the message if configured", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#KEY" },
+    "CamelKafkaTopic": { "index": 3, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The topic from where the 
message originated", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#TOPIC" },
+    "CamelKafkaOverrideTopic": { "index": 4, "kind": "header", "displayName": 
"", "group": "producer", "label": "producer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The topic to which send the message (override 
and takes precedence), and the header is not preserved.", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#OVERRIDE_TOPIC" },
+    "CamelKafkaOffset": { "index": 5, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "important": true, "description": "The offset of the message", 
"constantName": "org.apache.camel.component.kafka.KafkaConstants#OFFSET" },
+    "CamelKafkaHeaders": { "index": 6, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"org.apache.kafka.common.header.Headers", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": "The 
record headers", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#HEADERS" },
+    "CamelKafkaLastRecordBeforeCommit": { "index": 7, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Whether or not it's the last record 
before commit (only available if autoCommitEnable endpoint parameter is 
false)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#LAST_RECORD_BEFORE_COMMIT" },
+    "CamelKafkaLastPollRecord": { "index": 8, "kind": "header", "displayName": 
"", "group": "consumer", "label": "consumer", "required": false, "javaType": 
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Indicates the last record within the current 
poll request (only available if autoCommitEnable endpoint parameter is false or 
allowManualCommit is true)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#LAST_POLL [...]
+    "CamelKafkaTimestamp": { "index": 9, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The timestamp of the message", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#TIMESTAMP" },
+    "CamelKafkaOverrideTimestamp": { "index": 10, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The ProducerRecord also has an 
associated timestamp. If the user did provide a timestamp, the producer will 
stamp the record with the provided timestamp and the header is not preserved.", 
"constantName": "org.apache.camel.compo [...]
+    "CamelKafkaRecordMeta": { "index": 11, "kind": "header", "displayName": 
"", "group": "producer", "label": "producer", "required": false, "javaType": 
"List<RecordMetadata>", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The metadata (only 
configured if recordMetadata endpoint parameter is true)", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#KAFKA_RECORD_META" },
     "CamelKafkaManualCommit": { "index": 12, "kind": "header", "displayName": 
"", "group": "consumer", "label": "consumer", "required": false, "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommit", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"description": "Can be used for forcing manual offset commit when using Kafka 
consumer.", "constantName": 
"org.apache.camel.component.kafka.KafkaConstants#MANUAL_COMMIT" }
   },
   "properties": {
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c9a43fbc8338..5bfe3fce1016 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -942,10 +942,10 @@ Here is the minimal route you need to read messages from 
Kafka.
 ----
 from("kafka:test?brokers=localhost:9092")
     .log("Message received from Kafka : ${body}")
-    .log("    on the topic ${headers[kafka.TOPIC]}")
-    .log("    on the partition ${headers[kafka.PARTITION]}")
-    .log("    with the offset ${headers[kafka.OFFSET]}")
-    .log("    with the key ${headers[kafka.KEY]}")
+    .log("    on the topic ${headers[CamelKafkaTopic]}")
+    .log("    on the partition ${headers[CamelKafkaPartition]}")
+    .log("    with the offset ${headers[CamelKafkaOffset]}")
+    .log("    with the key ${headers[CamelKafkaKey]}")
 ----
 
 If you need to consume messages from multiple topics, you can use a comma 
separated list of topic names.
@@ -954,10 +954,10 @@ If you need to consume messages from multiple topics, you 
can use a comma separa
 ----
 from("kafka:test,test1,test2?brokers=localhost:9092")
     .log("Message received from Kafka : ${body}")
-    .log("    on the topic ${headers[kafka.TOPIC]}")
-    .log("    on the partition ${headers[kafka.PARTITION]}")
-    .log("    with the offset ${headers[kafka.OFFSET]}")
-    .log("    with the key ${headers[kafka.KEY]}")
+    .log("    on the topic ${headers[CamelKafkaTopic]}")
+    .log("    on the partition ${headers[CamelKafkaPartition]}")
+    .log("    with the offset ${headers[CamelKafkaOffset]}")
+    .log("    with the key ${headers[CamelKafkaKey]}")
 ----
 
 It's also possible to subscribe to multiple topics giving a pattern as the 
topic name and using the `topicIsPattern` option.
@@ -966,10 +966,10 @@ It's also possible to subscribe to multiple topics giving 
a pattern as the topic
 ----
 from("kafka:test.*?brokers=localhost:9092&topicIsPattern=true")
     .log("Message received from Kafka : ${body}")
-    .log("    on the topic ${headers[kafka.TOPIC]}")
-    .log("    on the partition ${headers[kafka.PARTITION]}")
-    .log("    with the offset ${headers[kafka.OFFSET]}")
-    .log("    with the key ${headers[kafka.KEY]}")
+    .log("    on the topic ${headers[CamelKafkaTopic]}")
+    .log("    on the partition ${headers[CamelKafkaPartition]}")
+    .log("    with the offset ${headers[CamelKafkaOffset]}")
+    .log("    with the key ${headers[CamelKafkaKey]}")
 ----
 
 When consuming messages from Kafka, you can use your own offset management and 
not delegate this management to Kafka.
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 2737ff371a46..13e3307d65b7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -21,39 +21,39 @@ import org.apache.camel.spi.Metadata;
 public final class KafkaConstants {
 
     @Metadata(label = "producer", description = "Explicitly specify the 
partition", javaType = "Integer")
-    public static final String PARTITION_KEY = "kafka.PARTITION_KEY";
+    public static final String PARTITION_KEY = "CamelKafkaPartitionKey";
     @Metadata(label = "consumer", description = "The partition where the 
message was stored", javaType = "Integer",
               important = true)
-    public static final String PARTITION = "kafka.PARTITION";
+    public static final String PARTITION = "CamelKafkaPartition";
     @Metadata(description = "Producer: The key of the message in order to 
ensure that all related message goes in the same partition. "
                             + "Consumer: The key of the message if configured",
               javaType = "Object", required = true, important = true)
-    public static final String KEY = "kafka.KEY";
+    public static final String KEY = "CamelKafkaKey";
     @Metadata(label = "consumer", description = "The topic from where the 
message originated", javaType = "String",
               important = true)
-    public static final String TOPIC = "kafka.TOPIC";
+    public static final String TOPIC = "CamelKafkaTopic";
     @Metadata(label = "producer",
               description = "The topic to which send the message (override and 
takes precedence), and the header is not preserved.",
               javaType = "String")
-    public static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC";
+    public static final String OVERRIDE_TOPIC = "CamelKafkaOverrideTopic";
     @Metadata(label = "consumer", description = "The offset of the message", 
javaType = "Long", important = true)
-    public static final String OFFSET = "kafka.OFFSET";
+    public static final String OFFSET = "CamelKafkaOffset";
     @Metadata(label = "consumer", description = "The record headers", javaType 
= "org.apache.kafka.common.header.Headers")
-    public static final String HEADERS = "kafka.HEADERS";
+    public static final String HEADERS = "CamelKafkaHeaders";
     @Metadata(label = "consumer",
               description = "Whether or not it's the last record before commit 
(only available if `autoCommitEnable` endpoint parameter is `false`)",
               javaType = "Boolean")
-    public static final String LAST_RECORD_BEFORE_COMMIT = 
"kafka.LAST_RECORD_BEFORE_COMMIT";
+    public static final String LAST_RECORD_BEFORE_COMMIT = 
"CamelKafkaLastRecordBeforeCommit";
     @Metadata(label = "consumer", description = "Indicates the last record 
within the current poll request " +
                                                 "(only available if 
`autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is 
`true`)",
               javaType = "Boolean")
-    public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD";
+    public static final String LAST_POLL_RECORD = "CamelKafkaLastPollRecord";
     @Metadata(label = "consumer", description = "The timestamp of the 
message", javaType = "Long")
-    public static final String TIMESTAMP = "kafka.TIMESTAMP";
+    public static final String TIMESTAMP = "CamelKafkaTimestamp";
     @Metadata(label = "producer", description = "The ProducerRecord also has 
an associated timestamp. " +
                                                 "If the user did provide a 
timestamp, the producer will stamp the  record with the provided timestamp and 
the header is not preserved.",
               javaType = "Long")
-    public static final String OVERRIDE_TIMESTAMP = "kafka.OVERRIDE_TIMESTAMP";
+    public static final String OVERRIDE_TIMESTAMP = 
"CamelKafkaOverrideTimestamp";
 
     @Deprecated
     public static final String KAFKA_DEFAULT_ENCODER = 
"kafka.serializer.DefaultEncoder";
@@ -66,7 +66,7 @@ public final class KafkaConstants {
     @Metadata(label = "producer",
               description = "The metadata (only configured if `recordMetadata` 
endpoint parameter is `true`)",
               javaType = "List<RecordMetadata>")
-    public static final String KAFKA_RECORD_META = "kafka.RECORD_META";
+    public static final String KAFKA_RECORD_META = "CamelKafkaRecordMeta";
     @Metadata(label = "consumer", description = "Can be used for forcing 
manual offset commit when using Kafka consumer.",
               javaType = 
"org.apache.camel.component.kafka.consumer.KafkaManualCommit")
     public static final String MANUAL_COMMIT = "CamelKafkaManualCommit";
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderDeserializer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderDeserializer.java
index bbe8eeb32623..e758d38dea14 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderDeserializer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderDeserializer.java
@@ -85,7 +85,7 @@ public class KafkaHeaderDeserializer implements Processor {
      * Exclude special Kafka headers from auto deserialization.
      */
     private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
-        return !entry.getKey().equals("kafka.HEADERS") && 
!entry.getKey().equals("CamelKafkaManualCommit");
+        return !entry.getKey().equals(KafkaConstants.HEADERS) && 
!entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
     }
 
     public void setEnabled(String enabled) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/MessageTimestampRouter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/MessageTimestampRouter.java
index 0ad6718b1183..9009188d9a20 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/MessageTimestampRouter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/MessageTimestampRouter.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 public class MessageTimestampRouter {
@@ -54,7 +55,7 @@ public class MessageTimestampRouter {
         }
 
         Object rawTimestamp = null;
-        String topicName = ex.getMessage().getHeader("kafka.TOPIC", 
String.class);
+        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
         for (String key : splittedKeys) {
             if (ObjectHelper.isNotEmpty(key)) {
                 rawTimestamp = body.get(key);
@@ -82,7 +83,7 @@ public class MessageTimestampRouter {
                 replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
                 updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
             }
-            ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
+            ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
updatedTopic);
         }
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/RegexRouter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/RegexRouter.java
index 582b9761bf63..f72893f5f5e0 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/RegexRouter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/RegexRouter.java
@@ -21,6 +21,7 @@ import java.util.regex.Pattern;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 public class RegexRouter {
@@ -28,12 +29,12 @@ public class RegexRouter {
     public void process(
             @ExchangeProperty("regex") String regex, 
@ExchangeProperty("replacement") String replacement, Exchange ex) {
         Pattern regexPattern = Pattern.compile(regex);
-        String topicName = ex.getMessage().getHeader("kafka.TOPIC", 
String.class);
+        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
         if (ObjectHelper.isNotEmpty(topicName)) {
             final Matcher matcher = regexPattern.matcher(topicName);
             if (matcher.matches()) {
                 final String topicUpdated = matcher.replaceFirst(replacement);
-                ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", 
topicUpdated);
+                ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
topicUpdated);
             }
         }
         String ceType = ex.getMessage().getHeader("ce-type", String.class);
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/TimestampRouter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/TimestampRouter.java
index 8987568a455d..5c8e4fe42ea8 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/TimestampRouter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/TimestampRouter.java
@@ -25,6 +25,7 @@ import java.util.regex.Pattern;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 public class TimestampRouter {
@@ -40,7 +41,7 @@ public class TimestampRouter {
         fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
 
         Long timestamp = null;
-        String topicName = ex.getMessage().getHeader("kafka.TOPIC", 
String.class);
+        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
         Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
         if (rawTimestamp instanceof Long longValue) {
             timestamp = longValue;
@@ -61,7 +62,7 @@ public class TimestampRouter {
                 replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
                 updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
             }
-            ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
+            ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
updatedTopic);
         }
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/ValueToKey.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/ValueToKey.java
index 9adf1067c1a8..b2fc89b9a2c8 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/ValueToKey.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/transform/ValueToKey.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
 import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 public class ValueToKey {
@@ -47,7 +48,7 @@ public class ValueToKey {
             }
         }
 
-        ex.getMessage().setHeader("kafka.KEY", key);
+        ex.getMessage().setHeader(KafkaConstants.KEY, key);
     }
 
     boolean filterNames(String fieldName, List<String> splittedFields) {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 26a50f7782b6..9a195bb2a8aa 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -161,7 +161,7 @@ public class KafkaConsumerFullIT extends 
BaseKafkaTestSupport {
         to.assertIsSatisfied(3000);
 
         Map<String, Object> headers = 
to.getExchanges().get(0).getIn().getHeaders();
-        assertTrue(headers.containsKey(KafkaConstants.TOPIC), "Should receive 
KafkaEndpoint populated kafka.TOPIC header");
+        assertTrue(headers.containsKey(KafkaConstants.TOPIC), "Should receive 
KafkaEndpoint populated CamelKafkaTopic header");
         assertEquals(TOPIC, headers.get(KafkaConstants.TOPIC), "Topic name 
received");
     }
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/transform/RegexRouterTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/transform/RegexRouterTest.java
index 84d859a81508..473bfa05b776 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/transform/RegexRouterTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/transform/RegexRouterTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka.transform;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Assertions;
@@ -41,10 +42,10 @@ class RegexRouterTest {
     void shouldReplaceFieldToPlainJson() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setHeader("kafka.TOPIC", topic);
+        exchange.getMessage().setHeader(KafkaConstants.TOPIC, topic);
 
         processor.process(".*ll.*", "newTopic", exchange);
 
-        Assertions.assertEquals("newTopic", 
exchange.getMessage().getHeader("kafka.OVERRIDE_TOPIC"));
+        Assertions.assertEquals("newTopic", 
exchange.getMessage().getHeader(KafkaConstants.OVERRIDE_TOPIC));
     }
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index d66338148810..1cea7b40ff6b 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.component.kafka.KafkaProducer;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -56,8 +57,8 @@ public class KafkaProducerTest {
         camelProducer.setKafkaProducer(kafkaProducer);
         when(exchange.getIn()).thenReturn(message);
         when(exchange.getContext()).thenReturn(context);
-        when(message.getHeader("kafka.PARTITION_KEY", 
Integer.class)).thenReturn(0);
-        when(message.getHeader("kafka.KEY")).thenReturn("key");
+        when(message.getHeader(KafkaConstants.PARTITION_KEY, 
Integer.class)).thenReturn(0);
+        when(message.getHeader(KafkaConstants.KEY)).thenReturn("key");
     }
 
     @AfterEach
@@ -67,9 +68,9 @@ public class KafkaProducerTest {
 
     @Test
     public void testSendOverrideTopic() throws Exception {
-        
when(message.removeHeader("kafka.OVERRIDE_TOPIC")).thenReturn("overridden-topic");
+        
when(message.removeHeader(KafkaConstants.OVERRIDE_TOPIC)).thenReturn("overridden-topic");
         camelProducer.process(exchange);
-        when(message.removeHeader("kafka.OVERRIDE_TOPIC")).thenReturn(new 
TextNode("overridden-topic-jackson"));
+        
when(message.removeHeader(KafkaConstants.OVERRIDE_TOPIC)).thenReturn(new 
TextNode("overridden-topic-jackson"));
         camelProducer.process(exchange);
         List<ProducerRecord<Object, Object>> records = kafkaProducer.history();
         assertThat(records.get(0).topic(), Is.is("overridden-topic"));
diff --git 
a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanKindTest.java
 
b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanKindTest.java
index 7746b7515bd3..604016b0351c 100644
--- 
a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanKindTest.java
+++ 
b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanKindTest.java
@@ -105,9 +105,9 @@ public class SpanKindTest extends 
OpenTelemetryTracerTestSupport {
 
         // Send with Kafka headers that would normally be set before/during 
sending
         template.sendBodyAndHeaders("direct:kafkaProducer", "test message",
-                Map.of("kafka.KEY", "test-key",
-                        "kafka.PARTITION", 0,
-                        "kafka.OFFSET", "12345"));
+                Map.of("CamelKafkaKey", "test-key",
+                        "CamelKafkaPartition", 0,
+                        "CamelKafkaOffset", "12345"));
 
         mockEndpoint.assertIsSatisfied();
 
diff --git 
a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/mock/MockKafkaProducer.java
 
b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/mock/MockKafkaProducer.java
index 16baf5c1baa1..04a8f135081a 100644
--- 
a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/mock/MockKafkaProducer.java
+++ 
b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/mock/MockKafkaProducer.java
@@ -33,9 +33,9 @@ class MockKafkaProducer extends DefaultProducer {
     public void process(Exchange exchange) throws Exception {
         // Simulate Kafka response with partition, offset, and key
         // These headers would normally be set by the real Kafka producer
-        exchange.getMessage().setHeader("kafka.PARTITION", 0);
-        exchange.getMessage().setHeader("kafka.OFFSET", "12345");
-        exchange.getMessage().setHeader("kafka.KEY", "test-key");
+        exchange.getMessage().setHeader("CamelKafkaPartition", 0);
+        exchange.getMessage().setHeader("CamelKafkaOffset", "12345");
+        exchange.getMessage().setHeader("CamelKafkaKey", "test-key");
         exchange.getMessage().setBody("Kafka Response: " + 
exchange.getIn().getBody());
     }
 }
diff --git 
a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/KafkaSpanDecorator.java
 
b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/KafkaSpanDecorator.java
index c7905527fdfb..2a798e40f9f3 100644
--- 
a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/KafkaSpanDecorator.java
+++ 
b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/KafkaSpanDecorator.java
@@ -30,11 +30,11 @@ public class KafkaSpanDecorator extends 
AbstractMessagingSpanDecorator {
     /**
      * Constants copied from {@link 
org.apache.camel.component.kafka.KafkaConstants}
      */
-    protected static final String PARTITION_KEY = "kafka.PARTITION_KEY";
-    protected static final String PARTITION = "kafka.PARTITION";
-    protected static final String KEY = "kafka.KEY";
-    protected static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC";
-    protected static final String OFFSET = "kafka.OFFSET";
+    protected static final String PARTITION_KEY = "CamelKafkaPartitionKey";
+    protected static final String PARTITION = "CamelKafkaPartition";
+    protected static final String KEY = "CamelKafkaKey";
+    protected static final String OVERRIDE_TOPIC = "CamelKafkaOverrideTopic";
+    protected static final String OFFSET = "CamelKafkaOffset";
 
     @Override
     public String getComponent() {
diff --git 
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/KafkaSpanDecorator.java
 
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/KafkaSpanDecorator.java
index e72f62fa9424..de2db9687d8c 100644
--- 
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/KafkaSpanDecorator.java
+++ 
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/KafkaSpanDecorator.java
@@ -31,11 +31,11 @@ public class KafkaSpanDecorator extends 
AbstractMessagingSpanDecorator {
     /**
      * Constants copied from {@link 
org.apache.camel.component.kafka.KafkaConstants}
      */
-    protected static final String PARTITION_KEY = "kafka.PARTITION_KEY";
-    protected static final String PARTITION = "kafka.PARTITION";
-    protected static final String KEY = "kafka.KEY";
-    protected static final String OVERRIDE_TOPIC = "kafka.OVERRIDE_TOPIC";
-    protected static final String OFFSET = "kafka.OFFSET";
+    protected static final String PARTITION_KEY = "CamelKafkaPartitionKey";
+    protected static final String PARTITION = "CamelKafkaPartition";
+    protected static final String KEY = "CamelKafkaKey";
+    protected static final String OVERRIDE_TOPIC = "CamelKafkaOverrideTopic";
+    protected static final String OFFSET = "CamelKafkaOffset";
 
     @Override
     public String getComponent() {
diff --git 
a/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java 
b/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
index e3c243efd5a1..14f22f6bd80e 100644
--- 
a/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
+++ 
b/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
@@ -37,6 +37,10 @@ public final class ImportantHeaderUtils {
                     "CamelFtpReplyString",
                     "CamelHttpResponseCode",
                     "CamelHttpResponseText",
+                    "CamelKafkaKey",
+                    "CamelKafkaOffset",
+                    "CamelKafkaPartition",
+                    "CamelKafkaTopic",
                     "CamelMqttTopic",
                     "CamelNatsDeliveryCounter",
                     "CamelNatsSID",
@@ -51,10 +55,6 @@ public final class ImportantHeaderUtils {
                     "CamelSqlUpdateCount",
                     "CamelSshExitValue",
                     "Content-Type",
-                    "kafka.KEY",
-                    "kafka.OFFSET",
-                    "kafka.PARTITION",
-                    "kafka.TOPIC",
                     "websocket.eventType"
             // IMPORTANT-HEADER-KEYS: END
             )));
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
index 17122c8aa8b4..a75f995c5473 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc
@@ -1486,6 +1486,98 @@ Routes that set the header by its literal string value 
(for example
 As a consequence, the generated Endpoint DSL header accessor `await()` on
 `MiloClientHeaderNameBuilder` has been renamed to `miloAwait()`.
 
+=== camel-kafka - potential breaking change
+
+The Exchange header constants in `KafkaConstants` used header values in the
+lowercase / dotted `kafka.*` namespace, outside the `Camel` namespace, and
+were therefore not filtered by the default `HeaderFilterStrategy` on
+upstream HTTP / REST consumers. They have been renamed to follow the Camel
+naming convention used across the rest of the component catalog. The Java
+field names are unchanged; only the header string values have changed:
+
+[options="header"]
+|===
+| Constant | Previous value | New value
+| `KafkaConstants.PARTITION_KEY` | `kafka.PARTITION_KEY` | 
`CamelKafkaPartitionKey`
+| `KafkaConstants.PARTITION` | `kafka.PARTITION` | `CamelKafkaPartition`
+| `KafkaConstants.KEY` | `kafka.KEY` | `CamelKafkaKey`
+| `KafkaConstants.TOPIC` | `kafka.TOPIC` | `CamelKafkaTopic`
+| `KafkaConstants.OVERRIDE_TOPIC` | `kafka.OVERRIDE_TOPIC` | 
`CamelKafkaOverrideTopic`
+| `KafkaConstants.OFFSET` | `kafka.OFFSET` | `CamelKafkaOffset`
+| `KafkaConstants.HEADERS` | `kafka.HEADERS` | `CamelKafkaHeaders`
+| `KafkaConstants.LAST_RECORD_BEFORE_COMMIT` | 
`kafka.LAST_RECORD_BEFORE_COMMIT` | `CamelKafkaLastRecordBeforeCommit`
+| `KafkaConstants.LAST_POLL_RECORD` | `kafka.LAST_POLL_RECORD` | 
`CamelKafkaLastPollRecord`
+| `KafkaConstants.TIMESTAMP` | `kafka.TIMESTAMP` | `CamelKafkaTimestamp`
+| `KafkaConstants.OVERRIDE_TIMESTAMP` | `kafka.OVERRIDE_TIMESTAMP` | 
`CamelKafkaOverrideTimestamp`
+| `KafkaConstants.KAFKA_RECORD_META` | `kafka.RECORD_META` | 
`CamelKafkaRecordMeta`
+|===
+
+`KafkaConstants.MANUAL_COMMIT` was already `Camel`-prefixed
+(`CamelKafkaManualCommit`) and is unchanged. The non-header constants in the
+class (`KAFKA_DEFAULT_ENCODER`, `KAFKA_STRING_ENCODER`,
+`KAFKA_DEFAULT_SERIALIZER`, `KAFKA_DEFAULT_DESERIALIZER`,
+`PARTITIONER_RANGE_ASSIGNOR`, `KAFKA_SUBSCRIBE_ADAPTER`) are not Exchange
+headers and are unchanged.
+
+`KafkaConstants.OVERRIDE_TOPIC` and `KafkaConstants.OVERRIDE_TIMESTAMP` are
+producer-read headers consulted by `KafkaProducer.evaluateTopic()` and the
+timestamp override path. The rename also closes the cross-transport
+propagation gap where these headers, in their previous `kafka.*` form,
+would pass through `HttpHeaderFilterStrategy` (which only filters the
+`Camel` prefix) on an upstream HTTP / REST consumer wired to a Kafka
+producer in the same route. With the new `CamelKafka*` values, the default
+filter strategy strips them at the transport boundary.
+
+Routes that reference the constants symbolically (for example
+`setHeader(KafkaConstants.OVERRIDE_TOPIC, ...)` or
+`${headers[KafkaConstants.TOPIC]}` via the symbolic name) continue to work
+without changes. Routes that set or read the headers by their literal
+string values (for example `setHeader("kafka.OVERRIDE_TOPIC", ...)` or
+Simple expressions such as `${headers[kafka.TOPIC]}`) must be updated to
+use the new values:
+
+[source,java]
+----
+// before
+from("platform-http:/api/events")
+    .setHeader("kafka.OVERRIDE_TOPIC", constant("events.topic"))
+    .to("kafka:default?brokers=localhost:9092");
+
+// after
+from("platform-http:/api/events")
+    .setHeader(KafkaConstants.OVERRIDE_TOPIC, constant("events.topic"))
+    .to("kafka:default?brokers=localhost:9092");
+----
+
+A route that bridges an untrusted HTTP / REST consumer to a Kafka producer
+should also continue to apply the standard transport-boundary mitigation,
+which now also covers the kafka headers:
+
+[source,java]
+----
+from("platform-http:/api/events")
+    .removeHeaders("Camel*")
+    .to("kafka:default?brokers=localhost:9092");
+----
+
+The bundled Kafka Connect-style transformers under
+`org.apache.camel.component.kafka.transform` (`RegexRouter`,
+`TimestampRouter`, `MessageTimestampRouter`, `ValueToKey`) have been
+updated to use the renamed `KafkaConstants` values internally and are
+unaffected at the call site.
+
+The local copies of these constants in
+`org.apache.camel.tracing.decorators.KafkaSpanDecorator` (deprecated since
+4.19.0) and `org.apache.camel.telemetry.decorators.KafkaSpanDecorator` have
+been kept in sync with the new values so that Kafka span tagging continues
+to work for routes that have migrated to the new header names.
+
+The generated Endpoint DSL header accessors on
+`KafkaEndpointBuilderFactory$KafkaHeaderNameBuilder` (for example
+`kafkaOverrideTopic()`, `kafkaTopic()`, `kafkaPartitionKey()`) keep their
+method names; only the returned string value reflects the new
+`CamelKafka*` convention.
+
 === Jackson dataformat documentation pages renamed
 
 The Jackson 2.x and Jackson 3.x lines ship the same dataformat names 
(`jackson`, `jacksonXml`,
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 9dc46314a149..843c2d5e97e3 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -5737,10 +5737,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: producer
          * 
-         * @return the name of the header {@code kafka.PARTITION_KEY}.
+         * @return the name of the header {@code KafkaPartitionKey}.
          */
         public String kafkaPartitionKey() {
-            return "kafka.PARTITION_KEY";
+            return "CamelKafkaPartitionKey";
         }
         /**
          * The partition where the message was stored.
@@ -5749,10 +5749,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: consumer
          * 
-         * @return the name of the header {@code kafka.PARTITION}.
+         * @return the name of the header {@code KafkaPartition}.
          */
         public String kafkaPartition() {
-            return "kafka.PARTITION";
+            return "CamelKafkaPartition";
         }
         /**
          * Producer: The key of the message in order to ensure that all related
@@ -5764,10 +5764,10 @@ public interface KafkaEndpointBuilderFactory {
          * Required: true
          * Group: common
          * 
-         * @return the name of the header {@code kafka.KEY}.
+         * @return the name of the header {@code KafkaKey}.
          */
         public String kafkaKey() {
-            return "kafka.KEY";
+            return "CamelKafkaKey";
         }
         /**
          * The topic from where the message originated.
@@ -5776,10 +5776,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: consumer
          * 
-         * @return the name of the header {@code kafka.TOPIC}.
+         * @return the name of the header {@code KafkaTopic}.
          */
         public String kafkaTopic() {
-            return "kafka.TOPIC";
+            return "CamelKafkaTopic";
         }
         /**
          * The topic to which send the message (override and takes precedence),
@@ -5789,10 +5789,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: producer
          * 
-         * @return the name of the header {@code kafka.OVERRIDE_TOPIC}.
+         * @return the name of the header {@code KafkaOverrideTopic}.
          */
         public String kafkaOverrideTopic() {
-            return "kafka.OVERRIDE_TOPIC";
+            return "CamelKafkaOverrideTopic";
         }
         /**
          * The offset of the message.
@@ -5801,10 +5801,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: consumer
          * 
-         * @return the name of the header {@code kafka.OFFSET}.
+         * @return the name of the header {@code KafkaOffset}.
          */
         public String kafkaOffset() {
-            return "kafka.OFFSET";
+            return "CamelKafkaOffset";
         }
         /**
          * The record headers.
@@ -5813,10 +5813,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: consumer
          * 
-         * @return the name of the header {@code kafka.HEADERS}.
+         * @return the name of the header {@code KafkaHeaders}.
          */
         public String kafkaHeaders() {
-            return "kafka.HEADERS";
+            return "CamelKafkaHeaders";
         }
         /**
          * Whether or not it's the last record before commit (only available if
@@ -5826,11 +5826,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: consumer
          * 
-         * @return the name of the header {@code
-         * kafka.LAST_RECORD_BEFORE_COMMIT}.
+         * @return the name of the header {@code KafkaLastRecordBeforeCommit}.
          */
         public String kafkaLastRecordBeforeCommit() {
-            return "kafka.LAST_RECORD_BEFORE_COMMIT";
+            return "CamelKafkaLastRecordBeforeCommit";
         }
         /**
          * Indicates the last record within the current poll request (only
@@ -5841,10 +5840,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: consumer
          * 
-         * @return the name of the header {@code kafka.LAST_POLL_RECORD}.
+         * @return the name of the header {@code KafkaLastPollRecord}.
          */
         public String kafkaLastPollRecord() {
-            return "kafka.LAST_POLL_RECORD";
+            return "CamelKafkaLastPollRecord";
         }
         /**
          * The timestamp of the message.
@@ -5853,10 +5852,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: consumer
          * 
-         * @return the name of the header {@code kafka.TIMESTAMP}.
+         * @return the name of the header {@code KafkaTimestamp}.
          */
         public String kafkaTimestamp() {
-            return "kafka.TIMESTAMP";
+            return "CamelKafkaTimestamp";
         }
         /**
          * The ProducerRecord also has an associated timestamp. If the user did
@@ -5867,10 +5866,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: producer
          * 
-         * @return the name of the header {@code kafka.OVERRIDE_TIMESTAMP}.
+         * @return the name of the header {@code KafkaOverrideTimestamp}.
          */
         public String kafkaOverrideTimestamp() {
-            return "kafka.OVERRIDE_TIMESTAMP";
+            return "CamelKafkaOverrideTimestamp";
         }
         /**
          * The metadata (only configured if recordMetadata endpoint parameter 
is
@@ -5880,10 +5879,10 @@ public interface KafkaEndpointBuilderFactory {
          * 
          * Group: producer
          * 
-         * @return the name of the header {@code kafka.RECORD_META}.
+         * @return the name of the header {@code KafkaRecordMeta}.
          */
         public String kafkaRecordMeta() {
-            return "kafka.RECORD_META";
+            return "CamelKafkaRecordMeta";
         }
         /**
          * Can be used for forcing manual offset commit when using Kafka

Reply via email to