Repository: kafka Updated Branches: refs/heads/trunk 34e379f10 -> c1fdf575d
KAFKA-5033; Set default retries for the idempotent producer to be infinite Author: Apurva Mehta <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3091 from apurvam/KAFKA-5033-bump-retries-for-idempotent-producer Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1fdf575 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1fdf575 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1fdf575 Branch: refs/heads/trunk Commit: c1fdf575deed0398e1692be16cacf24308afd5d5 Parents: 34e379f Author: Apurva Mehta <[email protected]> Authored: Thu May 18 13:06:56 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Thu May 18 13:07:06 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/clients/producer/KafkaProducer.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fdf575/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index aeef92f..71fb077 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -374,8 +374,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> { userConfiguredRetries = true; } if (idempotenceEnabled && !userConfiguredRetries) { - log.info("Overriding the default retries config to " + 3 + " since the idempotent producer is enabled."); - return 3; + // We recommend setting infinite retries when the idempotent producer is enabled, so it makes sense to make + // this the default. + log.info("Overriding the default retries config to the recommended value of {} since the idempotent " + + "producer is enabled.", Integer.MAX_VALUE); + return Integer.MAX_VALUE; } if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) { throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer."); @@ -389,7 +392,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { userConfiguredInflights = true; } if (idempotenceEnabled && !userConfiguredInflights) { - log.info("Overriding the default " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 since idempontence is enabled."); + log.info("Overriding the default {} to 1 since idempontence is enabled.", ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); return 1; } if (idempotenceEnabled && config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) != 1) { @@ -407,13 +410,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } if (idempotenceEnabled && !userConfiguredAcks) { - log.info("Overriding the default " + ProducerConfig.ACKS_CONFIG + " to all since idempotence is enabled"); + log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG); return -1; } if (idempotenceEnabled && acks != -1) { throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " + - "producer. Otherwise we cannot guarantee idempotence"); + "producer. Otherwise we cannot guarantee idempotence."); } return acks; }
