This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 93d71b6e80358b964b7c88a8b0769268f997095b Author: Claus Ibsen <[email protected]> AuthorDate: Mon Jan 2 10:39:59 2023 +0100 CAMEL-18841: camel-kafka: producer idempotence is not enabled by default --- .../org/apache/camel/catalog/components/kafka.json | 12 +-- .../org/apache/camel/component/kafka/kafka.json | 12 +-- .../camel/component/kafka/KafkaConfiguration.java | 55 +++++++---- .../camel/component/kafka/KafkaComponentTest.java | 1 - .../ROOT/pages/camel-3x-upgrade-guide-3_20.adoc | 9 ++ .../dsl/KafkaComponentBuilderFactory.java | 71 ++++++++------ .../endpoint/dsl/KafkaEndpointBuilderFactory.java | 106 +++++++++++++-------- 7 files changed, 168 insertions(+), 98 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json index 0be3446cdd2..32f7963ec71 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json @@ -71,7 +71,7 @@ "compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4", "zstd" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to spe [...] "connectionMaxIdleMs": { "kind": "property", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by this [...] "deliveryTimeoutMs": { "kind": "property", "displayName": "Delivery Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "120000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "An upper bound on the time to report success or failure after a call to send() [...] - "enableIdempotence": { "kind": "property", "displayName": "Enable Idempotence", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If set to 'true' the producer will ensure that exactly one copy of each message is written i [...] + "enableIdempotence": { "kind": "property", "displayName": "Enable Idempotence", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "When set to 'true', the producer will ensure that exactly one copy of each message is written [...] "headerSerializer": { "kind": "property", "displayName": "Header Serializer", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.serde.KafkaHeaderSerializer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom KafkaHeaderSerializer to serialize kafka headers [...] "key": { "kind": "property", "displayName": "Key", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The record key (or null if no key is specified). If this option has been configured then it take precedence over header KafkaConstants#KEY" }, "keySerializer": { "kind": "property", "displayName": "Key Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for keys (defaults t [...] @@ -91,9 +91,9 @@ "receiveBufferBytes": { "kind": "property", "displayName": "Receive Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "65536", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data." }, "reconnectBackoffMs": { "kind": "property", "displayName": "Reconnect Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "50", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The amount of time to wait before attempting to reconnect to a given host. This [...] "recordMetadata": { "kind": "property", "displayName": "Record Metadata", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the producer should store the RecordMetadata results from sending to Kafka. The results are [...] - "requestRequiredAcks": { "kind": "property", "displayName": "Request Required Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "-1", "0", "1", "all" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer req [...] + "requestRequiredAcks": { "kind": "property", "displayName": "Request Required Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", "1" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "all", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer r [...] "requestTimeoutMs": { "kind": "property", "displayName": "Request Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The amount of time the broker will wait trying to meet the request.required.acks [...] - "retries": { "kind": "property", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a pote [...] + "retries": { "kind": "property", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient err [...] "retryBackoffMs": { "kind": "property", "displayName": "Retry Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Before each retry, the producer refreshes the metadata of relevant topics to see if a n [...] "sendBufferBytes": { "kind": "property", "displayName": "Send Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Socket write buffer size" }, "valueSerializer": { "kind": "property", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for messages." }, @@ -193,7 +193,7 @@ "compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4", "zstd" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to sp [...] "connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by thi [...] "deliveryTimeoutMs": { "kind": "parameter", "displayName": "Delivery Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "120000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "An upper bound on the time to report success or failure after a call to send( [...] - "enableIdempotence": { "kind": "parameter", "displayName": "Enable Idempotence", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If set to 'true' the producer will ensure that exactly one copy of each message is written [...] + "enableIdempotence": { "kind": "parameter", "displayName": "Enable Idempotence", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "When set to 'true', the producer will ensure that exactly one copy of each message is writte [...] "headerSerializer": { "kind": "parameter", "displayName": "Header Serializer", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.serde.KafkaHeaderSerializer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom KafkaHeaderSerializer to serialize kafka headers [...] "key": { "kind": "parameter", "displayName": "Key", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The record key (or null if no key is specified). If this option has been configured then it take precedence over header KafkaConstants#KEY" }, "keySerializer": { "kind": "parameter", "displayName": "Key Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for keys (defaults [...] @@ -212,9 +212,9 @@ "receiveBufferBytes": { "kind": "parameter", "displayName": "Receive Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "65536", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data." }, "reconnectBackoffMs": { "kind": "parameter", "displayName": "Reconnect Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "50", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The amount of time to wait before attempting to reconnect to a given host. This [...] "recordMetadata": { "kind": "parameter", "displayName": "Record Metadata", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the producer should store the RecordMetadata results from sending to Kafka. The results ar [...] - "requestRequiredAcks": { "kind": "parameter", "displayName": "Request Required Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "-1", "0", "1", "all" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer re [...] + "requestRequiredAcks": { "kind": "parameter", "displayName": "Request Required Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", "1" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "all", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer [...] "requestTimeoutMs": { "kind": "parameter", "displayName": "Request Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The amount of time the broker will wait trying to meet the request.required.acks [...] - "retries": { "kind": "parameter", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a pot [...] + "retries": { "kind": "parameter", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient er [...] "retryBackoffMs": { "kind": "parameter", "displayName": "Retry Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Before each retry, the producer refreshes the metadata of relevant topics to see if a [...] "sendBufferBytes": { "kind": "parameter", "displayName": "Send Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Socket write buffer size" }, "valueSerializer": { "kind": "parameter", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for messages." }, diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index 0be3446cdd2..32f7963ec71 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -71,7 +71,7 @@ "compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4", "zstd" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to spe [...] "connectionMaxIdleMs": { "kind": "property", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by this [...] "deliveryTimeoutMs": { "kind": "property", "displayName": "Delivery Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "120000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "An upper bound on the time to report success or failure after a call to send() [...] - "enableIdempotence": { "kind": "property", "displayName": "Enable Idempotence", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If set to 'true' the producer will ensure that exactly one copy of each message is written i [...] + "enableIdempotence": { "kind": "property", "displayName": "Enable Idempotence", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "When set to 'true', the producer will ensure that exactly one copy of each message is written [...] "headerSerializer": { "kind": "property", "displayName": "Header Serializer", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.serde.KafkaHeaderSerializer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom KafkaHeaderSerializer to serialize kafka headers [...] "key": { "kind": "property", "displayName": "Key", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The record key (or null if no key is specified). If this option has been configured then it take precedence over header KafkaConstants#KEY" }, "keySerializer": { "kind": "property", "displayName": "Key Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for keys (defaults t [...] @@ -91,9 +91,9 @@ "receiveBufferBytes": { "kind": "property", "displayName": "Receive Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "65536", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data." }, "reconnectBackoffMs": { "kind": "property", "displayName": "Reconnect Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "50", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The amount of time to wait before attempting to reconnect to a given host. This [...] "recordMetadata": { "kind": "property", "displayName": "Record Metadata", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the producer should store the RecordMetadata results from sending to Kafka. The results are [...] - "requestRequiredAcks": { "kind": "property", "displayName": "Request Required Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "-1", "0", "1", "all" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer req [...] + "requestRequiredAcks": { "kind": "property", "displayName": "Request Required Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", "1" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "all", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer r [...] "requestTimeoutMs": { "kind": "property", "displayName": "Request Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The amount of time the broker will wait trying to meet the request.required.acks [...] - "retries": { "kind": "property", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a pote [...] + "retries": { "kind": "property", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient err [...] "retryBackoffMs": { "kind": "property", "displayName": "Retry Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Before each retry, the producer refreshes the metadata of relevant topics to see if a n [...] "sendBufferBytes": { "kind": "property", "displayName": "Send Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Socket write buffer size" }, "valueSerializer": { "kind": "property", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for messages." }, @@ -193,7 +193,7 @@ "compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4", "zstd" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to sp [...] "connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by thi [...] "deliveryTimeoutMs": { "kind": "parameter", "displayName": "Delivery Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "120000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "An upper bound on the time to report success or failure after a call to send( [...] - "enableIdempotence": { "kind": "parameter", "displayName": "Enable Idempotence", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If set to 'true' the producer will ensure that exactly one copy of each message is written [...] + "enableIdempotence": { "kind": "parameter", "displayName": "Enable Idempotence", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "When set to 'true', the producer will ensure that exactly one copy of each message is writte [...] "headerSerializer": { "kind": "parameter", "displayName": "Header Serializer", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.serde.KafkaHeaderSerializer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom KafkaHeaderSerializer to serialize kafka headers [...] "key": { "kind": "parameter", "displayName": "Key", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The record key (or null if no key is specified). If this option has been configured then it take precedence over header KafkaConstants#KEY" }, "keySerializer": { "kind": "parameter", "displayName": "Key Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for keys (defaults [...] @@ -212,9 +212,9 @@ "receiveBufferBytes": { "kind": "parameter", "displayName": "Receive Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "65536", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data." }, "reconnectBackoffMs": { "kind": "parameter", "displayName": "Reconnect Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "50", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The amount of time to wait before attempting to reconnect to a given host. This [...] "recordMetadata": { "kind": "parameter", "displayName": "Record Metadata", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the producer should store the RecordMetadata results from sending to Kafka. The results ar [...] - "requestRequiredAcks": { "kind": "parameter", "displayName": "Request Required Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "-1", "0", "1", "all" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer re [...] + "requestRequiredAcks": { "kind": "parameter", "displayName": "Request Required Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", "1" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "all", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer [...] "requestTimeoutMs": { "kind": "parameter", "displayName": "Request Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The amount of time the broker will wait trying to meet the request.required.acks [...] - "retries": { "kind": "parameter", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "0", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a pot [...] + "retries": { "kind": "parameter", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient er [...] "retryBackoffMs": { "kind": "parameter", "displayName": "Retry Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Before each retry, the producer refreshes the metadata of relevant topics to see if a [...] "sendBufferBytes": { "kind": "parameter", "displayName": "Send Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Socket write buffer size" }, "valueSerializer": { "kind": "parameter", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for messages." }, diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index e62bf74497f..5de11fced2c 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -173,8 +173,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware private String key; @UriParam(label = "producer") private Integer partitionKey; - @UriParam(label = "producer", enums = "-1,0,1,all", defaultValue = "1") - private String requestRequiredAcks = "1"; + @UriParam(label = "producer", enums = "all,-1,0,1", defaultValue = "all") + private String requestRequiredAcks = "all"; // buffer.memory @UriParam(label = "producer", defaultValue = "33554432") private Integer bufferMemorySize = 33554432; @@ -182,8 +182,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy,lz4,zstd") private String compressionCodec = "none"; // retries - @UriParam(label = "producer", defaultValue = "0") - private Integer retries = 0; + @UriParam(label = "producer") + private Integer retries; // use individual headers if exchange.body contains Iterable or similar of Message or Exchange @UriParam(label = "producer", defaultValue = "false") private boolean batchWithIndividualHeaders; @@ -236,7 +236,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware private Integer reconnectBackoffMs = 50; // enable.idempotence // reconnect.backoff.ms - @UriParam(label = "producer", defaultValue = "false") + @UriParam(label = "producer", defaultValue = "true") private boolean enableIdempotence; @UriParam(label = "producer", description = "To use a custom KafkaHeaderSerializer to serialize kafka headers values") private KafkaHeaderSerializer headerSerializer = new DefaultKafkaHeaderSerializer(); @@ -1302,16 +1302,19 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware /** * The number of acknowledgments the producer requires the leader to have received before considering a request - * complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set - * to zero then the producer will not wait for any acknowledgment from the server at all. The record will be - * immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received - * the record in this case, and the retries configuration will not take effect (as the client won't generally know - * of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the - * leader will write the record to its local log but will respond without awaiting full acknowledgement from all - * followers. In this case should the leader fail immediately after acknowledging the record but before the + * complete. This controls the durability of records that are sent. The following settings are allowed: + * + * acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record + * will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has + * received the record in this case, and the retries configuration will not take effect (as the client won't + * generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will + * mean the leader will write the record to its local log but will respond without awaiting full acknowledgement + * from all followers. In this case should the leader fail immediately after acknowledging the record but before the * followers have replicated it then the record will be lost. acks=all This means the leader will wait for the full * set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at - * least one in-sync replica remains alive. This is the strongest available guarantee. + * least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the + * acks=-1 setting. Note that enabling idempotence requires this config value to be 'all'. If conflicting + * configurations are set and idempotence is not explicitly enabled, idempotence is disabled. */ public void setRequestRequiredAcks(String requestRequiredAcks) { this.requestRequiredAcks = requestRequiredAcks; @@ -1324,9 +1327,16 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware /** * Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially * transient error. Note that this retry is no different than if the client resent the record upon receiving the - * error. Allowing retries will potentially change the ordering of records because if two records are sent to a - * single partition, and the first fails and is retried but the second succeeds, then the second record may appear - * first. + * error. Produce requests will be failed before the number of retries has been exhausted if the timeout configured + * by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave + * this config unset and instead use delivery.timeout.ms to control retry behavior. + * + * Enabling idempotence requires this config value to be greater than 0. If conflicting configurations are set and + * idempotence is not explicitly enabled, idempotence is disabled. + * + * Allowing retries while setting enable.idempotence to false and max.in.flight.requests.per.connection to 1 will + * potentially change the ordering of records because if two batches are sent to a single partition, and the first + * fails and is retried but the second succeeds, then the records in the second batch may appear first. */ public void setRetries(Integer retries) { this.retries = retries; @@ -1725,10 +1735,15 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware } /** - * If set to 'true' the producer will ensure that exactly one copy of each message is written in the stream. If - * 'false', producer retries may write duplicates of the retried message in the stream. If set to true this option - * will require max.in.flight.requests.per.connection to be set to 1 and retries cannot be zero and additionally - * acks must be set to 'all'. + * When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If + * 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the + * stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to + * 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be + * 'all'. + * + * Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set + * and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and + * conflicting configurations are set, a ConfigException is thrown. */ public void setEnableIdempotence(boolean enableIdempotence) { this.enableIdempotence = enableIdempotence; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index 61b0232c077..f01da289f0c 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -170,7 +170,6 @@ public class KafkaComponentTest extends CamelTestSupport { props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); - props.put(ProducerConfig.RETRIES_CONFIG, "0"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000"); props.put(ProducerConfig.LINGER_MS_CONFIG, "0"); diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_20.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_20.adoc index ff2a2b19c19..92b27f3a866 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_20.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_20.adoc @@ -46,6 +46,15 @@ The following 4 advanced options in fhir data format has been made available for Previously they were only possible to configure by manually creating a bean instance of the model, and configure them via Java code or classic Spring XML `<bean>` style. +=== camel-kafka + +The following options has changed default value as they were mistakenly out of sync with Apache Kafka. +This caused camel-kafka producer to not enable idempotence out of the box, that otherwise plain Apache Kafka client would do: + +- enableIdempotence: changed from `false` to `true` +- requestRequiredAcks: changed from `1` to `all` +- retries: changed from `0` to _unset_ + == Upgrading Camel 3.19 to 3.20 === camel-api diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index cf603033f8d..a8b0cd17663 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -992,16 +992,21 @@ public interface KafkaComponentBuilderFactory { return this; } /** - * If set to 'true' the producer will ensure that exactly one copy of + * When set to 'true', the producer will ensure that exactly one copy of * each message is written in the stream. If 'false', producer retries - * may write duplicates of the retried message in the stream. If set to - * true this option will require max.in.flight.requests.per.connection - * to be set to 1 and retries cannot be zero and additionally acks must - * be set to 'all'. + * due to broker failures, etc., may write duplicates of the retried + * message in the stream. Note that enabling idempotence requires + * max.in.flight.requests.per.connection to be less than or equal to 5 + * (with message ordering preserved for any allowable value), retries to + * be greater than 0, and acks must be 'all'. Idempotence is enabled by + * default if no conflicting configurations are set. If conflicting + * configurations are set and idempotence is not explicitly enabled, + * idempotence is disabled. If idempotence is explicitly enabled and + * conflicting configurations are set, a ConfigException is thrown. * * The option is a: <code>boolean</code> type. * - * Default: false + * Default: true * Group: producer * * @param enableIdempotence the value to set @@ -1385,25 +1390,29 @@ public interface KafkaComponentBuilderFactory { * The number of acknowledgments the producer requires the leader to * have received before considering a request complete. This controls * the durability of records that are sent. The following settings are - * common: acks=0 If set to zero then the producer will not wait for any - * acknowledgment from the server at all. The record will be immediately - * added to the socket buffer and considered sent. No guarantee can be - * made that the server has received the record in this case, and the - * retries configuration will not take effect (as the client won't - * generally know of any failures). The offset given back for each - * record will always be set to -1. acks=1 This will mean the leader - * will write the record to its local log but will respond without - * awaiting full acknowledgement from all followers. In this case should - * the leader fail immediately after acknowledging the record but before - * the followers have replicated it then the record will be lost. - * acks=all This means the leader will wait for the full set of in-sync - * replicas to acknowledge the record. This guarantees that the record - * will not be lost as long as at least one in-sync replica remains - * alive. This is the strongest available guarantee. + * allowed: acks=0 If set to zero then the producer will not wait for + * any acknowledgment from the server at all. The record will be + * immediately added to the socket buffer and considered sent. No + * guarantee can be made that the server has received the record in this + * case, and the retries configuration will not take effect (as the + * client won't generally know of any failures). The offset given back + * for each record will always be set to -1. acks=1 This will mean the + * leader will write the record to its local log but will respond + * without awaiting full acknowledgement from all followers. In this + * case should the leader fail immediately after acknowledging the + * record but before the followers have replicated it then the record + * will be lost. acks=all This means the leader will wait for the full + * set of in-sync replicas to acknowledge the record. This guarantees + * that the record will not be lost as long as at least one in-sync + * replica remains alive. This is the strongest available guarantee. + * This is equivalent to the acks=-1 setting. Note that enabling + * idempotence requires this config value to be 'all'. If conflicting + * configurations are set and idempotence is not explicitly enabled, + * idempotence is disabled. * * The option is a: <code>java.lang.String</code> type. * - * Default: 1 + * Default: all * Group: producer * * @param requestRequiredAcks the value to set @@ -1436,14 +1445,22 @@ public interface KafkaComponentBuilderFactory { * Setting a value greater than zero will cause the client to resend any * record whose send fails with a potentially transient error. Note that * this retry is no different than if the client resent the record upon - * receiving the error. Allowing retries will potentially change the - * ordering of records because if two records are sent to a single - * partition, and the first fails and is retried but the second - * succeeds, then the second record may appear first. + * receiving the error. Produce requests will be failed before the + * number of retries has been exhausted if the timeout configured by + * delivery.timeout.ms expires first before successful acknowledgement. + * Users should generally prefer to leave this config unset and instead + * use delivery.timeout.ms to control retry behavior. Enabling + * idempotence requires this config value to be greater than 0. If + * conflicting configurations are set and idempotence is not explicitly + * enabled, idempotence is disabled. Allowing retries while setting + * enable.idempotence to false and max.in.flight.requests.per.connection + * to 1 will potentially change the ordering of records because if two + * batches are sent to a single partition, and the first fails and is + * retried but the second succeeds, then the records in the second batch + * may appear first. * * The option is a: <code>java.lang.Integer</code> type. * - * Default: 0 * Group: producer * * @param retries the value to set diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index c3e44fe98fc..8a27a3f3692 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -2431,16 +2431,21 @@ public interface KafkaEndpointBuilderFactory { return this; } /** - * If set to 'true' the producer will ensure that exactly one copy of + * When set to 'true', the producer will ensure that exactly one copy of * each message is written in the stream. If 'false', producer retries - * may write duplicates of the retried message in the stream. If set to - * true this option will require max.in.flight.requests.per.connection - * to be set to 1 and retries cannot be zero and additionally acks must - * be set to 'all'. + * due to broker failures, etc., may write duplicates of the retried + * message in the stream. Note that enabling idempotence requires + * max.in.flight.requests.per.connection to be less than or equal to 5 + * (with message ordering preserved for any allowable value), retries to + * be greater than 0, and acks must be 'all'. Idempotence is enabled by + * default if no conflicting configurations are set. If conflicting + * configurations are set and idempotence is not explicitly enabled, + * idempotence is disabled. If idempotence is explicitly enabled and + * conflicting configurations are set, a ConfigException is thrown. * * The option is a: <code>boolean</code> type. * - * Default: false + * Default: true * Group: producer * * @param enableIdempotence the value to set @@ -2452,17 +2457,22 @@ public interface KafkaEndpointBuilderFactory { return this; } /** - * If set to 'true' the producer will ensure that exactly one copy of + * When set to 'true', the producer will ensure that exactly one copy of * each message is written in the stream. If 'false', producer retries - * may write duplicates of the retried message in the stream. If set to - * true this option will require max.in.flight.requests.per.connection - * to be set to 1 and retries cannot be zero and additionally acks must - * be set to 'all'. + * due to broker failures, etc., may write duplicates of the retried + * message in the stream. Note that enabling idempotence requires + * max.in.flight.requests.per.connection to be less than or equal to 5 + * (with message ordering preserved for any allowable value), retries to + * be greater than 0, and acks must be 'all'. Idempotence is enabled by + * default if no conflicting configurations are set. If conflicting + * configurations are set and idempotence is not explicitly enabled, + * idempotence is disabled. If idempotence is explicitly enabled and + * conflicting configurations are set, a ConfigException is thrown. * * The option will be converted to a <code>boolean</code> * type. * - * Default: false + * Default: true * Group: producer * * @param enableIdempotence the value to set @@ -3111,25 +3121,29 @@ public interface KafkaEndpointBuilderFactory { * The number of acknowledgments the producer requires the leader to * have received before considering a request complete. This controls * the durability of records that are sent. The following settings are - * common: acks=0 If set to zero then the producer will not wait for any - * acknowledgment from the server at all. The record will be immediately - * added to the socket buffer and considered sent. No guarantee can be - * made that the server has received the record in this case, and the - * retries configuration will not take effect (as the client won't - * generally know of any failures). The offset given back for each - * record will always be set to -1. acks=1 This will mean the leader - * will write the record to its local log but will respond without - * awaiting full acknowledgement from all followers. In this case should - * the leader fail immediately after acknowledging the record but before - * the followers have replicated it then the record will be lost. - * acks=all This means the leader will wait for the full set of in-sync - * replicas to acknowledge the record. This guarantees that the record - * will not be lost as long as at least one in-sync replica remains - * alive. This is the strongest available guarantee. + * allowed: acks=0 If set to zero then the producer will not wait for + * any acknowledgment from the server at all. The record will be + * immediately added to the socket buffer and considered sent. No + * guarantee can be made that the server has received the record in this + * case, and the retries configuration will not take effect (as the + * client won't generally know of any failures). The offset given back + * for each record will always be set to -1. acks=1 This will mean the + * leader will write the record to its local log but will respond + * without awaiting full acknowledgement from all followers. In this + * case should the leader fail immediately after acknowledging the + * record but before the followers have replicated it then the record + * will be lost. acks=all This means the leader will wait for the full + * set of in-sync replicas to acknowledge the record. This guarantees + * that the record will not be lost as long as at least one in-sync + * replica remains alive. This is the strongest available guarantee. + * This is equivalent to the acks=-1 setting. Note that enabling + * idempotence requires this config value to be 'all'. If conflicting + * configurations are set and idempotence is not explicitly enabled, + * idempotence is disabled. * * The option is a: <code>java.lang.String</code> type. * - * Default: 1 + * Default: all * Group: producer * * @param requestRequiredAcks the value to set @@ -3181,14 +3195,22 @@ public interface KafkaEndpointBuilderFactory { * Setting a value greater than zero will cause the client to resend any * record whose send fails with a potentially transient error. Note that * this retry is no different than if the client resent the record upon - * receiving the error. Allowing retries will potentially change the - * ordering of records because if two records are sent to a single - * partition, and the first fails and is retried but the second - * succeeds, then the second record may appear first. + * receiving the error. Produce requests will be failed before the + * number of retries has been exhausted if the timeout configured by + * delivery.timeout.ms expires first before successful acknowledgement. + * Users should generally prefer to leave this config unset and instead + * use delivery.timeout.ms to control retry behavior. Enabling + * idempotence requires this config value to be greater than 0. If + * conflicting configurations are set and idempotence is not explicitly + * enabled, idempotence is disabled. Allowing retries while setting + * enable.idempotence to false and max.in.flight.requests.per.connection + * to 1 will potentially change the ordering of records because if two + * batches are sent to a single partition, and the first fails and is + * retried but the second succeeds, then the records in the second batch + * may appear first. * * The option is a: <code>java.lang.Integer</code> type. * - * Default: 0 * Group: producer * * @param retries the value to set @@ -3202,15 +3224,23 @@ public interface KafkaEndpointBuilderFactory { * Setting a value greater than zero will cause the client to resend any * record whose send fails with a potentially transient error. Note that * this retry is no different than if the client resent the record upon - * receiving the error. Allowing retries will potentially change the - * ordering of records because if two records are sent to a single - * partition, and the first fails and is retried but the second - * succeeds, then the second record may appear first. + * receiving the error. Produce requests will be failed before the + * number of retries has been exhausted if the timeout configured by + * delivery.timeout.ms expires first before successful acknowledgement. + * Users should generally prefer to leave this config unset and instead + * use delivery.timeout.ms to control retry behavior. Enabling + * idempotence requires this config value to be greater than 0. If + * conflicting configurations are set and idempotence is not explicitly + * enabled, idempotence is disabled. Allowing retries while setting + * enable.idempotence to false and max.in.flight.requests.per.connection + * to 1 will potentially change the ordering of records because if two + * batches are sent to a single partition, and the first fails and is + * retried but the second succeeds, then the records in the second batch + * may appear first. * * The option will be converted to a * <code>java.lang.Integer</code> type. * - * Default: 0 * Group: producer * * @param retries the value to set
