This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 93d71b6e80358b964b7c88a8b0769268f997095b
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jan 2 10:39:59 2023 +0100

    CAMEL-18841: camel-kafka: producer idempotence is not enabled by default
---
 .../org/apache/camel/catalog/components/kafka.json |  12 +--
 .../org/apache/camel/component/kafka/kafka.json    |  12 +--
 .../camel/component/kafka/KafkaConfiguration.java  |  55 +++++++----
 .../camel/component/kafka/KafkaComponentTest.java  |   1 -
 .../ROOT/pages/camel-3x-upgrade-guide-3_20.adoc    |   9 ++
 .../dsl/KafkaComponentBuilderFactory.java          |  71 ++++++++------
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  | 106 +++++++++++++--------
 7 files changed, 168 insertions(+), 98 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 0be3446cdd2..32f7963ec71 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -71,7 +71,7 @@
     "compressionCodec": { "kind": "property", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4", "zstd" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to spe [...]
     "connectionMaxIdleMs": { "kind": "property", "displayName": "Connection 
Max Idle Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "540000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Close idle connections 
after the number of milliseconds specified by this [...]
     "deliveryTimeoutMs": { "kind": "property", "displayName": "Delivery 
Timeout Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "120000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "An upper bound on the 
time to report success or failure after a call to send() [...]
-    "enableIdempotence": { "kind": "property", "displayName": "Enable 
Idempotence", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If set to 'true' the producer will ensure that 
exactly one copy of each message is written i [...]
+    "enableIdempotence": { "kind": "property", "displayName": "Enable 
Idempotence", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "When set to 'true', the producer will ensure 
that exactly one copy of each message is written [...]
     "headerSerializer": { "kind": "property", "displayName": "Header 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.kafka.serde.KafkaHeaderSerializer", "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom KafkaHeaderSerializer to 
serialize kafka headers  [...]
     "key": { "kind": "property", "displayName": "Key", "group": "producer", 
"label": "producer", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The record key (or null 
if no key is specified). If this option has been configured then it take 
precedence over header KafkaConstants#KEY" },
     "keySerializer": { "kind": "property", "displayName": "Key Serializer", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for keys (defaults t [...]
@@ -91,9 +91,9 @@
     "receiveBufferBytes": { "kind": "property", "displayName": "Receive Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "65536", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The size of the TCP receive buffer (SO_RCVBUF) 
to use when reading data." },
     "reconnectBackoffMs": { "kind": "property", "displayName": "Reconnect 
Backoff Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "50", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The amount of time to 
wait before attempting to reconnect to a given host. This  [...]
     "recordMetadata": { "kind": "property", "displayName": "Record Metadata", 
"group": "producer", "label": "producer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the producer should store the 
RecordMetadata results from sending to Kafka. The results are [...]
-    "requestRequiredAcks": { "kind": "property", "displayName": "Request 
Required Acks", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "-1", "0", "1", 
"all" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "1", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of acknowledgments the producer req 
[...]
+    "requestRequiredAcks": { "kind": "property", "displayName": "Request 
Required Acks", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", 
"1" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "all", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of acknowledgments the producer r 
[...]
     "requestTimeoutMs": { "kind": "property", "displayName": "Request Timeout 
Ms", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "30000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The amount of time the broker will wait trying 
to meet the request.required.acks  [...]
-    "retries": { "kind": "property", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "0", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a pote [...]
+    "retries": { "kind": "property", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a potentially transient 
err [...]
     "retryBackoffMs": { "kind": "property", "displayName": "Retry Backoff Ms", 
"group": "producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "100", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Before each retry, the producer refreshes the 
metadata of relevant topics to see if a n [...]
     "sendBufferBytes": { "kind": "property", "displayName": "Send Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "131072", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Socket write buffer size" },
     "valueSerializer": { "kind": "property", "displayName": "Value 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for messages." },
@@ -193,7 +193,7 @@
     "compressionCodec": { "kind": "parameter", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4", "zstd" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to sp [...]
     "connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection 
Max Idle Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "540000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Close idle connections 
after the number of milliseconds specified by thi [...]
     "deliveryTimeoutMs": { "kind": "parameter", "displayName": "Delivery 
Timeout Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "120000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "An upper bound on the 
time to report success or failure after a call to send( [...]
-    "enableIdempotence": { "kind": "parameter", "displayName": "Enable 
Idempotence", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If set to 'true' the producer will ensure that 
exactly one copy of each message is written  [...]
+    "enableIdempotence": { "kind": "parameter", "displayName": "Enable 
Idempotence", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "When set to 'true', the producer will ensure 
that exactly one copy of each message is writte [...]
     "headerSerializer": { "kind": "parameter", "displayName": "Header 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.kafka.serde.KafkaHeaderSerializer", "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom KafkaHeaderSerializer to 
serialize kafka headers [...]
     "key": { "kind": "parameter", "displayName": "Key", "group": "producer", 
"label": "producer", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The record key (or null 
if no key is specified). If this option has been configured then it take 
precedence over header KafkaConstants#KEY" },
     "keySerializer": { "kind": "parameter", "displayName": "Key Serializer", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for keys (defaults  [...]
@@ -212,9 +212,9 @@
     "receiveBufferBytes": { "kind": "parameter", "displayName": "Receive 
Buffer Bytes", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "65536", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The size of the TCP 
receive buffer (SO_RCVBUF) to use when reading data." },
     "reconnectBackoffMs": { "kind": "parameter", "displayName": "Reconnect 
Backoff Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "50", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The amount of time to 
wait before attempting to reconnect to a given host. This [...]
     "recordMetadata": { "kind": "parameter", "displayName": "Record Metadata", 
"group": "producer", "label": "producer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the producer should store the 
RecordMetadata results from sending to Kafka. The results ar [...]
-    "requestRequiredAcks": { "kind": "parameter", "displayName": "Request 
Required Acks", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "-1", "0", "1", 
"all" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "1", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of acknowledgments the producer re 
[...]
+    "requestRequiredAcks": { "kind": "parameter", "displayName": "Request 
Required Acks", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", 
"1" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "all", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of acknowledgments the producer  
[...]
     "requestTimeoutMs": { "kind": "parameter", "displayName": "Request Timeout 
Ms", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "30000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The amount of time the broker will wait trying 
to meet the request.required.acks [...]
-    "retries": { "kind": "parameter", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "0", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a pot [...]
+    "retries": { "kind": "parameter", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a potentially transient 
er [...]
     "retryBackoffMs": { "kind": "parameter", "displayName": "Retry Backoff 
Ms", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "100", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Before each retry, the producer refreshes the 
metadata of relevant topics to see if a  [...]
     "sendBufferBytes": { "kind": "parameter", "displayName": "Send Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "131072", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Socket write buffer size" },
     "valueSerializer": { "kind": "parameter", "displayName": "Value 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for messages." },
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 0be3446cdd2..32f7963ec71 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -71,7 +71,7 @@
     "compressionCodec": { "kind": "property", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4", "zstd" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to spe [...]
     "connectionMaxIdleMs": { "kind": "property", "displayName": "Connection 
Max Idle Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "540000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Close idle connections 
after the number of milliseconds specified by this [...]
     "deliveryTimeoutMs": { "kind": "property", "displayName": "Delivery 
Timeout Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "120000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "An upper bound on the 
time to report success or failure after a call to send() [...]
-    "enableIdempotence": { "kind": "property", "displayName": "Enable 
Idempotence", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If set to 'true' the producer will ensure that 
exactly one copy of each message is written i [...]
+    "enableIdempotence": { "kind": "property", "displayName": "Enable 
Idempotence", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "When set to 'true', the producer will ensure 
that exactly one copy of each message is written [...]
     "headerSerializer": { "kind": "property", "displayName": "Header 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.kafka.serde.KafkaHeaderSerializer", "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom KafkaHeaderSerializer to 
serialize kafka headers  [...]
     "key": { "kind": "property", "displayName": "Key", "group": "producer", 
"label": "producer", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The record key (or null 
if no key is specified). If this option has been configured then it take 
precedence over header KafkaConstants#KEY" },
     "keySerializer": { "kind": "property", "displayName": "Key Serializer", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for keys (defaults t [...]
@@ -91,9 +91,9 @@
     "receiveBufferBytes": { "kind": "property", "displayName": "Receive Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "65536", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The size of the TCP receive buffer (SO_RCVBUF) 
to use when reading data." },
     "reconnectBackoffMs": { "kind": "property", "displayName": "Reconnect 
Backoff Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "50", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The amount of time to 
wait before attempting to reconnect to a given host. This  [...]
     "recordMetadata": { "kind": "property", "displayName": "Record Metadata", 
"group": "producer", "label": "producer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the producer should store the 
RecordMetadata results from sending to Kafka. The results are [...]
-    "requestRequiredAcks": { "kind": "property", "displayName": "Request 
Required Acks", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "-1", "0", "1", 
"all" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "1", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of acknowledgments the producer req 
[...]
+    "requestRequiredAcks": { "kind": "property", "displayName": "Request 
Required Acks", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", 
"1" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "all", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of acknowledgments the producer r 
[...]
     "requestTimeoutMs": { "kind": "property", "displayName": "Request Timeout 
Ms", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "30000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The amount of time the broker will wait trying 
to meet the request.required.acks  [...]
-    "retries": { "kind": "property", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "0", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a pote [...]
+    "retries": { "kind": "property", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a potentially transient 
err [...]
     "retryBackoffMs": { "kind": "property", "displayName": "Retry Backoff Ms", 
"group": "producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "100", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Before each retry, the producer refreshes the 
metadata of relevant topics to see if a n [...]
     "sendBufferBytes": { "kind": "property", "displayName": "Send Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "131072", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Socket write buffer size" },
     "valueSerializer": { "kind": "property", "displayName": "Value 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for messages." },
@@ -193,7 +193,7 @@
     "compressionCodec": { "kind": "parameter", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4", "zstd" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to sp [...]
     "connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection 
Max Idle Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "540000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Close idle connections 
after the number of milliseconds specified by thi [...]
     "deliveryTimeoutMs": { "kind": "parameter", "displayName": "Delivery 
Timeout Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "120000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "An upper bound on the 
time to report success or failure after a call to send( [...]
-    "enableIdempotence": { "kind": "parameter", "displayName": "Enable 
Idempotence", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If set to 'true' the producer will ensure that 
exactly one copy of each message is written  [...]
+    "enableIdempotence": { "kind": "parameter", "displayName": "Enable 
Idempotence", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "When set to 'true', the producer will ensure 
that exactly one copy of each message is writte [...]
     "headerSerializer": { "kind": "parameter", "displayName": "Header 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.kafka.serde.KafkaHeaderSerializer", "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom KafkaHeaderSerializer to 
serialize kafka headers [...]
     "key": { "kind": "parameter", "displayName": "Key", "group": "producer", 
"label": "producer", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The record key (or null 
if no key is specified). If this option has been configured then it take 
precedence over header KafkaConstants#KEY" },
     "keySerializer": { "kind": "parameter", "displayName": "Key Serializer", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for keys (defaults  [...]
@@ -212,9 +212,9 @@
     "receiveBufferBytes": { "kind": "parameter", "displayName": "Receive 
Buffer Bytes", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "65536", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The size of the TCP 
receive buffer (SO_RCVBUF) to use when reading data." },
     "reconnectBackoffMs": { "kind": "parameter", "displayName": "Reconnect 
Backoff Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "50", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The amount of time to 
wait before attempting to reconnect to a given host. This [...]
     "recordMetadata": { "kind": "parameter", "displayName": "Record Metadata", 
"group": "producer", "label": "producer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the producer should store the 
RecordMetadata results from sending to Kafka. The results ar [...]
-    "requestRequiredAcks": { "kind": "parameter", "displayName": "Request 
Required Acks", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "-1", "0", "1", 
"all" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "1", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of acknowledgments the producer re 
[...]
+    "requestRequiredAcks": { "kind": "parameter", "displayName": "Request 
Required Acks", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", 
"1" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "all", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of acknowledgments the producer  
[...]
     "requestTimeoutMs": { "kind": "parameter", "displayName": "Request Timeout 
Ms", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "30000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The amount of time the broker will wait trying 
to meet the request.required.acks [...]
-    "retries": { "kind": "parameter", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "0", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a pot [...]
+    "retries": { "kind": "parameter", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a potentially transient 
er [...]
     "retryBackoffMs": { "kind": "parameter", "displayName": "Retry Backoff 
Ms", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "100", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Before each retry, the producer refreshes the 
metadata of relevant topics to see if a  [...]
     "sendBufferBytes": { "kind": "parameter", "displayName": "Send Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "131072", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Socket write buffer size" },
     "valueSerializer": { "kind": "parameter", "displayName": "Value 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for messages." },
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index e62bf74497f..5de11fced2c 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -173,8 +173,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     private String key;
     @UriParam(label = "producer")
     private Integer partitionKey;
-    @UriParam(label = "producer", enums = "-1,0,1,all", defaultValue = "1")
-    private String requestRequiredAcks = "1";
+    @UriParam(label = "producer", enums = "all,-1,0,1", defaultValue = "all")
+    private String requestRequiredAcks = "all";
     // buffer.memory
     @UriParam(label = "producer", defaultValue = "33554432")
     private Integer bufferMemorySize = 33554432;
@@ -182,8 +182,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     @UriParam(label = "producer", defaultValue = "none", enums = 
"none,gzip,snappy,lz4,zstd")
     private String compressionCodec = "none";
     // retries
-    @UriParam(label = "producer", defaultValue = "0")
-    private Integer retries = 0;
+    @UriParam(label = "producer")
+    private Integer retries;
     // use individual headers if exchange.body contains Iterable or similar of 
Message or Exchange
     @UriParam(label = "producer", defaultValue = "false")
     private boolean batchWithIndividualHeaders;
@@ -236,7 +236,7 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     private Integer reconnectBackoffMs = 50;
     // enable.idempotence
     // reconnect.backoff.ms
-    @UriParam(label = "producer", defaultValue = "false")
+    @UriParam(label = "producer", defaultValue = "true")
     private boolean enableIdempotence;
     @UriParam(label = "producer", description = "To use a custom 
KafkaHeaderSerializer to serialize kafka headers values")
     private KafkaHeaderSerializer headerSerializer = new 
DefaultKafkaHeaderSerializer();
@@ -1302,16 +1302,19 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
 
     /**
      * The number of acknowledgments the producer requires the leader to have 
received before considering a request
-     * complete. This controls the durability of records that are sent. The 
following settings are common: acks=0 If set
-     * to zero then the producer will not wait for any acknowledgment from the 
server at all. The record will be
-     * immediately added to the socket buffer and considered sent. No 
guarantee can be made that the server has received
-     * the record in this case, and the retries configuration will not take 
effect (as the client won't generally know
-     * of any failures). The offset given back for each record will always be 
set to -1. acks=1 This will mean the
-     * leader will write the record to its local log but will respond without 
awaiting full acknowledgement from all
-     * followers. In this case should the leader fail immediately after 
acknowledging the record but before the
+     * complete. This controls the durability of records that are sent. The 
following settings are allowed:
+     *
+     * acks=0 If set to zero then the producer will not wait for any 
acknowledgment from the server at all. The record
+     * will be immediately added to the socket buffer and considered sent. No 
guarantee can be made that the server has
+     * received the record in this case, and the retries configuration will 
not take effect (as the client won't
+     * generally know of any failures). The offset given back for each record 
will always be set to -1. acks=1 This will
+     * mean the leader will write the record to its local log but will respond 
without awaiting full acknowledgement
+     * from all followers. In this case should the leader fail immediately 
after acknowledging the record but before the
      * followers have replicated it then the record will be lost. acks=all 
This means the leader will wait for the full
      * set of in-sync replicas to acknowledge the record. This guarantees that 
the record will not be lost as long as at
-     * least one in-sync replica remains alive. This is the strongest 
available guarantee.
+     * least one in-sync replica remains alive. This is the strongest 
available guarantee. This is equivalent to the
+     * acks=-1 setting. Note that enabling idempotence requires this config 
value to be 'all'. If conflicting
+     * configurations are set and idempotence is not explicitly enabled, 
idempotence is disabled.
      */
     public void setRequestRequiredAcks(String requestRequiredAcks) {
         this.requestRequiredAcks = requestRequiredAcks;
@@ -1324,9 +1327,16 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     /**
      * Setting a value greater than zero will cause the client to resend any 
record whose send fails with a potentially
      * transient error. Note that this retry is no different than if the 
client resent the record upon receiving the
-     * error. Allowing retries will potentially change the ordering of records 
because if two records are sent to a
-     * single partition, and the first fails and is retried but the second 
succeeds, then the second record may appear
-     * first.
+     * error. Produce requests will be failed before the number of retries has 
been exhausted if the timeout configured
+     * by delivery.timeout.ms expires first before successful acknowledgement. 
Users should generally prefer to leave
+     * this config unset and instead use delivery.timeout.ms to control retry 
behavior.
+     *
+     * Enabling idempotence requires this config value to be greater than 0. 
If conflicting configurations are set and
+     * idempotence is not explicitly enabled, idempotence is disabled.
+     *
+     * Allowing retries while setting enable.idempotence to false and 
max.in.flight.requests.per.connection to 1 will
+     * potentially change the ordering of records because if two batches are 
sent to a single partition, and the first
+     * fails and is retried but the second succeeds, then the records in the 
second batch may appear first.
      */
     public void setRetries(Integer retries) {
         this.retries = retries;
@@ -1725,10 +1735,15 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     }
 
     /**
-     * If set to 'true' the producer will ensure that exactly one copy of each 
message is written in the stream. If
-     * 'false', producer retries may write duplicates of the retried message 
in the stream. If set to true this option
-     * will require max.in.flight.requests.per.connection to be set to 1 and 
retries cannot be zero and additionally
-     * acks must be set to 'all'.
+     * When set to 'true', the producer will ensure that exactly one copy of 
each message is written in the stream. If
+     * 'false', producer retries due to broker failures, etc., may write 
duplicates of the retried message in the
+     * stream. Note that enabling idempotence requires 
max.in.flight.requests.per.connection to be less than or equal to
+     * 5 (with message ordering preserved for any allowable value), retries to 
be greater than 0, and acks must be
+     * 'all'.
+     *
+     * Idempotence is enabled by default if no conflicting configurations are 
set. If conflicting configurations are set
+     * and idempotence is not explicitly enabled, idempotence is disabled. If 
idempotence is explicitly enabled and
+     * conflicting configurations are set, a ConfigException is thrown.
      */
     public void setEnableIdempotence(boolean enableIdempotence) {
         this.enableIdempotence = enableIdempotence;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 61b0232c077..f01da289f0c 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -170,7 +170,6 @@ public class KafkaComponentTest extends CamelTestSupport {
         props.put(ProducerConfig.ACKS_CONFIG, "1");
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
         props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
-        props.put(ProducerConfig.RETRIES_CONFIG, "0");
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
         props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000");
         props.put(ProducerConfig.LINGER_MS_CONFIG, "0");
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_20.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_20.adoc
index ff2a2b19c19..92b27f3a866 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_20.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_20.adoc
@@ -46,6 +46,15 @@ The following 4 advanced options in fhir data format has 
been made available for
 Previously they were only possible to configure by manually creating a bean 
instance of the model,
 and configure them via Java code or classic Spring XML `<bean>` style.
 
+=== camel-kafka
+
+The following options has changed default value as they were mistakenly out of 
sync with Apache Kafka.
+This caused camel-kafka producer to not enable idempotence out of the box, 
that otherwise plain Apache Kafka client would do:
+
+- enableIdempotence: changed from `false` to `true`
+- requestRequiredAcks: changed from `1` to `all`
+- retries: changed from `0` to _unset_
+
 == Upgrading Camel 3.19 to 3.20
 
 === camel-api
diff --git 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index cf603033f8d..a8b0cd17663 100644
--- 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -992,16 +992,21 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
-         * If set to 'true' the producer will ensure that exactly one copy of
+         * When set to 'true', the producer will ensure that exactly one copy 
of
          * each message is written in the stream. If 'false', producer retries
-         * may write duplicates of the retried message in the stream. If set to
-         * true this option will require max.in.flight.requests.per.connection
-         * to be set to 1 and retries cannot be zero and additionally acks must
-         * be set to 'all'.
+         * due to broker failures, etc., may write duplicates of the retried
+         * message in the stream. Note that enabling idempotence requires
+         * max.in.flight.requests.per.connection to be less than or equal to 5
+         * (with message ordering preserved for any allowable value), retries 
to
+         * be greater than 0, and acks must be 'all'. Idempotence is enabled by
+         * default if no conflicting configurations are set. If conflicting
+         * configurations are set and idempotence is not explicitly enabled,
+         * idempotence is disabled. If idempotence is explicitly enabled and
+         * conflicting configurations are set, a ConfigException is thrown.
          * 
          * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
          * 
-         * Default: false
+         * Default: true
          * Group: producer
          * 
          * @param enableIdempotence the value to set
@@ -1385,25 +1390,29 @@ public interface KafkaComponentBuilderFactory {
          * The number of acknowledgments the producer requires the leader to
          * have received before considering a request complete. This controls
          * the durability of records that are sent. The following settings are
-         * common: acks=0 If set to zero then the producer will not wait for 
any
-         * acknowledgment from the server at all. The record will be 
immediately
-         * added to the socket buffer and considered sent. No guarantee can be
-         * made that the server has received the record in this case, and the
-         * retries configuration will not take effect (as the client won't
-         * generally know of any failures). The offset given back for each
-         * record will always be set to -1. acks=1 This will mean the leader
-         * will write the record to its local log but will respond without
-         * awaiting full acknowledgement from all followers. In this case 
should
-         * the leader fail immediately after acknowledging the record but 
before
-         * the followers have replicated it then the record will be lost.
-         * acks=all This means the leader will wait for the full set of in-sync
-         * replicas to acknowledge the record. This guarantees that the record
-         * will not be lost as long as at least one in-sync replica remains
-         * alive. This is the strongest available guarantee.
+         * allowed: acks=0 If set to zero then the producer will not wait for
+         * any acknowledgment from the server at all. The record will be
+         * immediately added to the socket buffer and considered sent. No
+         * guarantee can be made that the server has received the record in 
this
+         * case, and the retries configuration will not take effect (as the
+         * client won't generally know of any failures). The offset given back
+         * for each record will always be set to -1. acks=1 This will mean the
+         * leader will write the record to its local log but will respond
+         * without awaiting full acknowledgement from all followers. In this
+         * case should the leader fail immediately after acknowledging the
+         * record but before the followers have replicated it then the record
+         * will be lost. acks=all This means the leader will wait for the full
+         * set of in-sync replicas to acknowledge the record. This guarantees
+         * that the record will not be lost as long as at least one in-sync
+         * replica remains alive. This is the strongest available guarantee.
+         * This is equivalent to the acks=-1 setting. Note that enabling
+         * idempotence requires this config value to be 'all'. If conflicting
+         * configurations are set and idempotence is not explicitly enabled,
+         * idempotence is disabled.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
-         * Default: 1
+         * Default: all
          * Group: producer
          * 
          * @param requestRequiredAcks the value to set
@@ -1436,14 +1445,22 @@ public interface KafkaComponentBuilderFactory {
          * Setting a value greater than zero will cause the client to resend 
any
          * record whose send fails with a potentially transient error. Note 
that
          * this retry is no different than if the client resent the record upon
-         * receiving the error. Allowing retries will potentially change the
-         * ordering of records because if two records are sent to a single
-         * partition, and the first fails and is retried but the second
-         * succeeds, then the second record may appear first.
+         * receiving the error. Produce requests will be failed before the
+         * number of retries has been exhausted if the timeout configured by
+         * delivery.timeout.ms expires first before successful acknowledgement.
+         * Users should generally prefer to leave this config unset and instead
+         * use delivery.timeout.ms to control retry behavior. Enabling
+         * idempotence requires this config value to be greater than 0. If
+         * conflicting configurations are set and idempotence is not explicitly
+         * enabled, idempotence is disabled. Allowing retries while setting
+         * enable.idempotence to false and 
max.in.flight.requests.per.connection
+         * to 1 will potentially change the ordering of records because if two
+         * batches are sent to a single partition, and the first fails and is
+         * retried but the second succeeds, then the records in the second 
batch
+         * may appear first.
          * 
          * The option is a: &lt;code&gt;java.lang.Integer&lt;/code&gt; type.
          * 
-         * Default: 0
          * Group: producer
          * 
          * @param retries the value to set
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index c3e44fe98fc..8a27a3f3692 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -2431,16 +2431,21 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * If set to 'true' the producer will ensure that exactly one copy of
+         * When set to 'true', the producer will ensure that exactly one copy 
of
          * each message is written in the stream. If 'false', producer retries
-         * may write duplicates of the retried message in the stream. If set to
-         * true this option will require max.in.flight.requests.per.connection
-         * to be set to 1 and retries cannot be zero and additionally acks must
-         * be set to 'all'.
+         * due to broker failures, etc., may write duplicates of the retried
+         * message in the stream. Note that enabling idempotence requires
+         * max.in.flight.requests.per.connection to be less than or equal to 5
+         * (with message ordering preserved for any allowable value), retries 
to
+         * be greater than 0, and acks must be 'all'. Idempotence is enabled by
+         * default if no conflicting configurations are set. If conflicting
+         * configurations are set and idempotence is not explicitly enabled,
+         * idempotence is disabled. If idempotence is explicitly enabled and
+         * conflicting configurations are set, a ConfigException is thrown.
          * 
          * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
          * 
-         * Default: false
+         * Default: true
          * Group: producer
          * 
          * @param enableIdempotence the value to set
@@ -2452,17 +2457,22 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * If set to 'true' the producer will ensure that exactly one copy of
+         * When set to 'true', the producer will ensure that exactly one copy 
of
          * each message is written in the stream. If 'false', producer retries
-         * may write duplicates of the retried message in the stream. If set to
-         * true this option will require max.in.flight.requests.per.connection
-         * to be set to 1 and retries cannot be zero and additionally acks must
-         * be set to 'all'.
+         * due to broker failures, etc., may write duplicates of the retried
+         * message in the stream. Note that enabling idempotence requires
+         * max.in.flight.requests.per.connection to be less than or equal to 5
+         * (with message ordering preserved for any allowable value), retries 
to
+         * be greater than 0, and acks must be 'all'. Idempotence is enabled by
+         * default if no conflicting configurations are set. If conflicting
+         * configurations are set and idempotence is not explicitly enabled,
+         * idempotence is disabled. If idempotence is explicitly enabled and
+         * conflicting configurations are set, a ConfigException is thrown.
          * 
          * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
          * type.
          * 
-         * Default: false
+         * Default: true
          * Group: producer
          * 
          * @param enableIdempotence the value to set
@@ -3111,25 +3121,29 @@ public interface KafkaEndpointBuilderFactory {
          * The number of acknowledgments the producer requires the leader to
          * have received before considering a request complete. This controls
          * the durability of records that are sent. The following settings are
-         * common: acks=0 If set to zero then the producer will not wait for 
any
-         * acknowledgment from the server at all. The record will be 
immediately
-         * added to the socket buffer and considered sent. No guarantee can be
-         * made that the server has received the record in this case, and the
-         * retries configuration will not take effect (as the client won't
-         * generally know of any failures). The offset given back for each
-         * record will always be set to -1. acks=1 This will mean the leader
-         * will write the record to its local log but will respond without
-         * awaiting full acknowledgement from all followers. In this case 
should
-         * the leader fail immediately after acknowledging the record but 
before
-         * the followers have replicated it then the record will be lost.
-         * acks=all This means the leader will wait for the full set of in-sync
-         * replicas to acknowledge the record. This guarantees that the record
-         * will not be lost as long as at least one in-sync replica remains
-         * alive. This is the strongest available guarantee.
+         * allowed: acks=0 If set to zero then the producer will not wait for
+         * any acknowledgment from the server at all. The record will be
+         * immediately added to the socket buffer and considered sent. No
+         * guarantee can be made that the server has received the record in 
this
+         * case, and the retries configuration will not take effect (as the
+         * client won't generally know of any failures). The offset given back
+         * for each record will always be set to -1. acks=1 This will mean the
+         * leader will write the record to its local log but will respond
+         * without awaiting full acknowledgement from all followers. In this
+         * case should the leader fail immediately after acknowledging the
+         * record but before the followers have replicated it then the record
+         * will be lost. acks=all This means the leader will wait for the full
+         * set of in-sync replicas to acknowledge the record. This guarantees
+         * that the record will not be lost as long as at least one in-sync
+         * replica remains alive. This is the strongest available guarantee.
+         * This is equivalent to the acks=-1 setting. Note that enabling
+         * idempotence requires this config value to be 'all'. If conflicting
+         * configurations are set and idempotence is not explicitly enabled,
+         * idempotence is disabled.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
-         * Default: 1
+         * Default: all
          * Group: producer
          * 
          * @param requestRequiredAcks the value to set
@@ -3181,14 +3195,22 @@ public interface KafkaEndpointBuilderFactory {
          * Setting a value greater than zero will cause the client to resend 
any
          * record whose send fails with a potentially transient error. Note 
that
          * this retry is no different than if the client resent the record upon
-         * receiving the error. Allowing retries will potentially change the
-         * ordering of records because if two records are sent to a single
-         * partition, and the first fails and is retried but the second
-         * succeeds, then the second record may appear first.
+         * receiving the error. Produce requests will be failed before the
+         * number of retries has been exhausted if the timeout configured by
+         * delivery.timeout.ms expires first before successful acknowledgement.
+         * Users should generally prefer to leave this config unset and instead
+         * use delivery.timeout.ms to control retry behavior. Enabling
+         * idempotence requires this config value to be greater than 0. If
+         * conflicting configurations are set and idempotence is not explicitly
+         * enabled, idempotence is disabled. Allowing retries while setting
+         * enable.idempotence to false and 
max.in.flight.requests.per.connection
+         * to 1 will potentially change the ordering of records because if two
+         * batches are sent to a single partition, and the first fails and is
+         * retried but the second succeeds, then the records in the second 
batch
+         * may appear first.
          * 
          * The option is a: &lt;code&gt;java.lang.Integer&lt;/code&gt; type.
          * 
-         * Default: 0
          * Group: producer
          * 
          * @param retries the value to set
@@ -3202,15 +3224,23 @@ public interface KafkaEndpointBuilderFactory {
          * Setting a value greater than zero will cause the client to resend 
any
          * record whose send fails with a potentially transient error. Note 
that
          * this retry is no different than if the client resent the record upon
-         * receiving the error. Allowing retries will potentially change the
-         * ordering of records because if two records are sent to a single
-         * partition, and the first fails and is retried but the second
-         * succeeds, then the second record may appear first.
+         * receiving the error. Produce requests will be failed before the
+         * number of retries has been exhausted if the timeout configured by
+         * delivery.timeout.ms expires first before successful acknowledgement.
+         * Users should generally prefer to leave this config unset and instead
+         * use delivery.timeout.ms to control retry behavior. Enabling
+         * idempotence requires this config value to be greater than 0. If
+         * conflicting configurations are set and idempotence is not explicitly
+         * enabled, idempotence is disabled. Allowing retries while setting
+         * enable.idempotence to false and 
max.in.flight.requests.per.connection
+         * to 1 will potentially change the ordering of records because if two
+         * batches are sent to a single partition, and the first fails and is
+         * retried but the second succeeds, then the records in the second 
batch
+         * may appear first.
          * 
          * The option will be converted to a
          * &lt;code&gt;java.lang.Integer&lt;/code&gt; type.
          * 
-         * Default: 0
          * Group: producer
          * 
          * @param retries the value to set

Reply via email to