Repository: kafka Updated Branches: refs/heads/trunk 2554a8dd4 -> 3bcadbfb4
KAFKA-3353; Remove deprecated producer configs These configs have been deprecated since 0.9.0.0: block.on.buffer.full, metadata.fetch.timeout.ms and timeout.ms Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #2987 from ijuma/kafka-3353-remove-deprecated-producer-configs Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bcadbfb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bcadbfb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bcadbfb Branch: refs/heads/trunk Commit: 3bcadbfb474f6caccc939fb3775a6f969d136af7 Parents: 2554a8d Author: Ismael Juma <[email protected]> Authored: Mon May 8 10:00:04 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Mon May 8 10:00:04 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 45 +------------------- .../kafka/clients/producer/ProducerConfig.java | 43 ------------------- docs/upgrade.html | 2 + 3 files changed, 4 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 286387b..f812389 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -270,8 +270,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - this.maxBlockTimeMs = configureMaxBlockTime(config, userProvidedConfigs); - this.requestTimeoutMs = configureRequestTimeout(config, userProvidedConfigs); + this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.transactionManager = configureTransactionState(config); int retries = configureRetries(config, transactionManager != null); int maxInflightRequests = configureInflightRequests(config, transactionManager != null); @@ -335,47 +335,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer); } - private static long configureMaxBlockTime(ProducerConfig config, Map<String, Object> userProvidedConfigs) { - /* check for user defined settings. - * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. - * This should be removed with release 0.9 when the deprecated configs are removed. - */ - if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { - log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); - boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); - if (blockOnBufferFull) { - return Long.MAX_VALUE; - } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { - log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); - return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - } else { - return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); - } - } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { - log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); - return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - } else { - return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); - } - } - - private static int configureRequestTimeout(ProducerConfig config, Map<String, Object> userProvidedConfigs) { - /* check for user defined settings. - * If the TIME_OUT config is set use that for request timeout. - * This should be removed with release 0.9 - */ - if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { - log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " + - ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); - return config.getInt(ProducerConfig.TIMEOUT_CONFIG); - } else { - return config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); - } - } - private static TransactionManager configureTransactionState(ProducerConfig config) { TransactionManager transactionManager = null; http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 4bceb95..12e8c64 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -49,16 +49,6 @@ public class ProducerConfig extends AbstractConfig { /** <code>bootstrap.servers</code> */ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; - /** <code>metadata.fetch.timeout.ms</code> */ - /** - * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG} - */ - @Deprecated - public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; - private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers " - + "host the topic's partitions. This config specifies the maximum time, in milliseconds, for this fetch " - + "to succeed before throwing an exception back to the client."; - /** <code>metadata.max.age.ms</code> */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; @@ -94,18 +84,6 @@ public class ProducerConfig extends AbstractConfig { + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."; - /** <code>timeout.ms</code> */ - - /** - * @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG} - */ - @Deprecated - public static final String TIMEOUT_CONFIG = "timeout.ms"; - private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " - + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the " - + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " - + "is measured on the server side and does not include the network latency of the request."; - /** <code>linger.ms</code> */ public static final String LINGER_MS_CONFIG = "linger.ms"; private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. " @@ -143,19 +121,6 @@ public class ProducerConfig extends AbstractConfig { + "These methods can be blocked either because the buffer is full or metadata unavailable." + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout."; - /** <code>block.on.buffer.full</code> */ - /** - * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}. - */ - @Deprecated - public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; - private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. " - + "By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the <code>" + MAX_BLOCK_MS_CONFIG + "</code> " - + "value to block, after which it will throw a TimeoutException. Setting this property to true will set the <code>" + MAX_BLOCK_MS_CONFIG + "</code> to Long.MAX_VALUE. " - + "<em>Also if this property is set to true, parameter <code>" + METADATA_FETCH_TIMEOUT_CONFIG + "</code> is no longer honored.</em>" - + "<p>This parameter is deprecated and will be removed in a future release. " - + "Parameter <code>" + MAX_BLOCK_MS_CONFIG + "</code> should be used instead."; - /** <code>buffer.memory</code> */ public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " @@ -256,7 +221,6 @@ public class ProducerConfig extends AbstractConfig { ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) - .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) @@ -267,16 +231,9 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) - .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) - .define(METADATA_FETCH_TIMEOUT_CONFIG, - Type.LONG, - 60 * 1000, - atLeast(0), - Importance.LOW, - METADATA_FETCH_TIMEOUT_DOC) .define(MAX_BLOCK_MS_CONFIG, Type.LONG, 60 * 1000, http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index 227c728..116d2ff 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -55,6 +55,8 @@ <ul> <li>Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to to retain the previous behavior should set the broker config <code>unclean.leader.election.enabled</code> to <code>false</code>.</li> + <li>Producer configs <code>block.on.buffer.full</code>, <code>metadata.fetch.timeout.ms</code> and <code>timeout.ms</code> have been + removed. They were initially deprecated in Kafka 0.9.0.0.</li> <li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement.</li>
