Repository: kafka
Updated Branches:
  refs/heads/trunk 34e379f10 -> c1fdf575d


KAFKA-5033; Set default retries for the idempotent producer to be infinite

Author: Apurva Mehta <[email protected]>

Reviewers: Jason Gustafson <[email protected]>

Closes #3091 from apurvam/KAFKA-5033-bump-retries-for-idempotent-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1fdf575
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1fdf575
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1fdf575

Branch: refs/heads/trunk
Commit: c1fdf575deed0398e1692be16cacf24308afd5d5
Parents: 34e379f
Author: Apurva Mehta <[email protected]>
Authored: Thu May 18 13:06:56 2017 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Thu May 18 13:07:06 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/producer/KafkaProducer.java   | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fdf575/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index aeef92f..71fb077 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -374,8 +374,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
             userConfiguredRetries = true;
         }
         if (idempotenceEnabled && !userConfiguredRetries) {
-            log.info("Overriding the default retries config to " + 3 + " since 
the idempotent producer is enabled.");
-            return 3;
+            // We recommend setting infinite retries when the idempotent 
producer is enabled, so it makes sense to make
+            // this the default.
+            log.info("Overriding the default retries config to the recommended 
value of {} since the idempotent " +
+                    "producer is enabled.", Integer.MAX_VALUE);
+            return Integer.MAX_VALUE;
         }
         if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) 
== 0) {
             throw new ConfigException("Must set " + 
ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent 
producer.");
@@ -389,7 +392,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             userConfiguredInflights = true;
         }
         if (idempotenceEnabled && !userConfiguredInflights) {
-            log.info("Overriding the default " + 
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 since 
idempontence is enabled.");
+            log.info("Overriding the default {} to 1 since idempontence is 
enabled.", ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
             return 1;
         }
         if (idempotenceEnabled && 
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) != 1) {
@@ -407,13 +410,13 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         }
 
         if (idempotenceEnabled && !userConfiguredAcks) {
-            log.info("Overriding the default " + ProducerConfig.ACKS_CONFIG + 
" to all since idempotence is enabled");
+            log.info("Overriding the default {} to all since idempotence is 
enabled.", ProducerConfig.ACKS_CONFIG);
             return -1;
         }
 
         if (idempotenceEnabled && acks != -1) {
             throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG 
+ " to all in order to use the idempotent " +
-                    "producer. Otherwise we cannot guarantee idempotence");
+                    "producer. Otherwise we cannot guarantee idempotence.");
         }
         return acks;
     }

Reply via email to