Repository: kafka
Updated Branches:
  refs/heads/trunk 2554a8dd4 -> 3bcadbfb4


KAFKA-3353; Remove deprecated producer configs

These configs have been deprecated since 0.9.0.0:
block.on.buffer.full, metadata.fetch.timeout.ms and timeout.ms

Author: Ismael Juma <[email protected]>

Reviewers: Jason Gustafson <[email protected]>

Closes #2987 from ijuma/kafka-3353-remove-deprecated-producer-configs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bcadbfb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bcadbfb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bcadbfb

Branch: refs/heads/trunk
Commit: 3bcadbfb474f6caccc939fb3775a6f969d136af7
Parents: 2554a8d
Author: Ismael Juma <[email protected]>
Authored: Mon May 8 10:00:04 2017 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Mon May 8 10:00:04 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 45 +-------------------
 .../kafka/clients/producer/ProducerConfig.java  | 43 -------------------
 docs/upgrade.html                               |  2 +
 3 files changed, 4 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 286387b..f812389 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -270,8 +270,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.totalMemorySize = 
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = 
CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
 
-            this.maxBlockTimeMs = configureMaxBlockTime(config, 
userProvidedConfigs);
-            this.requestTimeoutMs = configureRequestTimeout(config, 
userProvidedConfigs);
+            this.maxBlockTimeMs = 
config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            this.requestTimeoutMs = 
config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             this.transactionManager = configureTransactionState(config);
             int retries = configureRetries(config, transactionManager != null);
             int maxInflightRequests = configureInflightRequests(config, 
transactionManager != null);
@@ -335,47 +335,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         return serializer instanceof ExtendedSerializer ? 
(ExtendedSerializer<T>) serializer : new 
ExtendedSerializer.Wrapper<>(serializer);
     }
 
-    private static long configureMaxBlockTime(ProducerConfig config, 
Map<String, Object> userProvidedConfigs) {
-        /* check for user defined settings.
-         * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor 
METADATA_FETCH_TIMEOUT_CONFIG.
-         * This should be removed with release 0.9 when the deprecated configs 
are removed.
-         */
-        if 
(userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
-            log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is 
deprecated and will be removed soon. " +
-                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            boolean blockOnBufferFull = 
config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
-            if (blockOnBufferFull) {
-                return Long.MAX_VALUE;
-            } else if 
(userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) 
{
-                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " 
config is deprecated and will be removed soon. " +
-                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                return 
config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-            } else {
-                return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            }
-        } else if 
(userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) 
{
-            log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config 
is deprecated and will be removed soon. " +
-                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            return 
config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-        } else {
-            return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-        }
-    }
-
-    private static int configureRequestTimeout(ProducerConfig config, 
Map<String, Object> userProvidedConfigs) {
-        /* check for user defined settings.
-         * If the TIME_OUT config is set use that for request timeout.
-         * This should be removed with release 0.9
-         */
-        if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
-            log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated 
and will be removed soon. Please use " +
-                    ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            return config.getInt(ProducerConfig.TIMEOUT_CONFIG);
-        } else {
-            return config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-        }
-    }
-
     private static TransactionManager configureTransactionState(ProducerConfig 
config) {
 
         TransactionManager transactionManager = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 4bceb95..12e8c64 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -49,16 +49,6 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>bootstrap.servers</code> */
     public static final String BOOTSTRAP_SERVERS_CONFIG = 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
-    /** <code>metadata.fetch.timeout.ms</code> */
-    /**
-     * @deprecated This config will be removed in a future release. Please use 
{@link #MAX_BLOCK_MS_CONFIG}
-     */
-    @Deprecated
-    public static final String METADATA_FETCH_TIMEOUT_CONFIG = 
"metadata.fetch.timeout.ms";
-    private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time 
data is sent to a topic we must fetch metadata about that topic to know which 
servers "
-                                                             + "host the 
topic's partitions. This config specifies the maximum time, in milliseconds, 
for this fetch "
-                                                             + "to succeed 
before throwing an exception back to the client.";
-
     /** <code>metadata.max.age.ms</code> */
     public static final String METADATA_MAX_AGE_CONFIG = 
CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
     private static final String METADATA_MAX_AGE_DOC = 
CommonClientConfigs.METADATA_MAX_AGE_DOC;
@@ -94,18 +84,6 @@ public class ProducerConfig extends AbstractConfig {
                                            + " acknowledge the record. This 
guarantees that the record will not be lost as long as at least one in-sync 
replica"
                                            + " remains alive. This is the 
strongest available guarantee. This is equivalent to the acks=-1 setting.";
 
-    /** <code>timeout.ms</code> */
-
-    /**
-     * @deprecated This config will be removed in a future release. Please use 
{@link #REQUEST_TIMEOUT_MS_CONFIG}
-     */
-    @Deprecated
-    public static final String TIMEOUT_CONFIG = "timeout.ms";
-    private static final String TIMEOUT_DOC = "The configuration controls the 
maximum amount of time the server will wait for acknowledgments from followers 
to "
-                                              + "meet the acknowledgment 
requirements the producer has specified with the <code>acks</code> 
configuration. If the "
-                                              + "requested number of 
acknowledgments are not met when the timeout elapses an error will be returned. 
This timeout "
-                                              + "is measured on the server 
side and does not include the network latency of the request.";
-
     /** <code>linger.ms</code> */
     public static final String LINGER_MS_CONFIG = "linger.ms";
     private static final String LINGER_MS_DOC = "The producer groups together 
any records that arrive in between request transmissions into a single batched 
request. "
@@ -143,19 +121,6 @@ public class ProducerConfig extends AbstractConfig {
                                                     + "These methods can be 
blocked either because the buffer is full or metadata unavailable."
                                                     + "Blocking in the 
user-supplied serializers or partitioner will not be counted against this 
timeout.";
 
-    /** <code>block.on.buffer.full</code> */
-    /**
-     * @deprecated This config will be removed in a future release. Please use 
{@link #MAX_BLOCK_MS_CONFIG}.
-     */
-    @Deprecated
-    public static final String BLOCK_ON_BUFFER_FULL_CONFIG = 
"block.on.buffer.full";
-    private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory 
buffer is exhausted we must either stop accepting new records (block) or throw 
errors. "
-                                                           + "By default this 
setting is false and the producer will no longer throw a BufferExhaustException 
but instead will use the <code>" + MAX_BLOCK_MS_CONFIG + "</code> "
-                                                           + "value to block, 
after which it will throw a TimeoutException. Setting this property to true 
will set the <code>" + MAX_BLOCK_MS_CONFIG + "</code> to Long.MAX_VALUE. "
-                                                           + "<em>Also if this 
property is set to true, parameter <code>" + METADATA_FETCH_TIMEOUT_CONFIG + 
"</code> is no longer honored.</em>"
-                                                           + "<p>This 
parameter is deprecated and will be removed in a future release. "
-                                                           + "Parameter 
<code>" + MAX_BLOCK_MS_CONFIG + "</code> should be used instead.";
-
     /** <code>buffer.memory</code> */
     public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
     private static final String BUFFER_MEMORY_DOC = "The total bytes of memory 
the producer can use to buffer records waiting to be sent to the server. If 
records are "
@@ -256,7 +221,6 @@ public class ProducerConfig extends AbstractConfig {
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, 
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
-                                .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, 
atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, 
atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", 
Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 
1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
@@ -267,16 +231,9 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         MAX_REQUEST_SIZE_DOC)
-                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, 
Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, 
Type.LONG, 50L, atLeast(0L), Importance.LOW, 
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG, 
Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 
100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
-                                .define(METADATA_FETCH_TIMEOUT_CONFIG,
-                                        Type.LONG,
-                                        60 * 1000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        METADATA_FETCH_TIMEOUT_DOC)
                                 .define(MAX_BLOCK_MS_CONFIG,
                                         Type.LONG,
                                         60 * 1000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 227c728..116d2ff 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -55,6 +55,8 @@
 <ul>
     <li>Unclean leader election is now disabled by default. The new default 
favors durability over availability. Users who wish to
         to retain the previous behavior should set the broker config 
<code>unclean.leader.election.enabled</code> to <code>false</code>.</li>
+    <li>Producer configs <code>block.on.buffer.full</code>, 
<code>metadata.fetch.timeout.ms</code> and <code>timeout.ms</code> have been
+        removed. They were initially deprecated in Kafka 0.9.0.0.</li>
     <li>The <code>offsets.topic.replication.factor</code> broker config is now 
enforced upon auto topic creation. Internal
         auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE 
error until the cluster size meets this
         replication factor requirement.</li>

Reply via email to