This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a15387f  KAFKA-9274: Revert deprecation of `retries` for producer and 
admin clients (#9333)
a15387f is described below

commit a15387f34d142684859c2a57fcbef25edcdce25a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Sep 30 12:13:34 2020 -0700

    KAFKA-9274: Revert deprecation of `retries` for producer and admin clients 
(#9333)
    
    Reviewer: John Roesler <[email protected]>
---
 .../apache/kafka/clients/CommonClientConfigs.java  | 10 ++--------
 .../kafka/clients/admin/AdminClientConfig.java     |  6 ------
 .../kafka/clients/admin/KafkaAdminClient.java      |  1 -
 .../kafka/clients/producer/KafkaProducer.java      |  1 -
 .../kafka/clients/producer/ProducerConfig.java     | 13 ++++--------
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 23 +++++-----------------
 .../kafka/clients/producer/KafkaProducerTest.java  |  1 -
 .../connect/storage/KafkaStatusBackingStore.java   |  1 -
 .../main/scala/kafka/tools/ConsoleProducer.scala   |  4 +---
 .../server/DynamicBrokerReconfigurationTest.scala  |  1 -
 .../kafka/admin/ConsumerGroupCommandTest.scala     |  2 --
 .../unit/kafka/server/LogDirFailureTest.scala      |  3 ---
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  2 --
 docs/streams/upgrade-guide.html                    |  6 +-----
 docs/upgrade.html                                  |  5 ++---
 .../kafka/log4jappender/KafkaLog4jAppender.java    |  1 -
 .../apache/kafka/streams/StreamsConfigTest.java    |  3 ---
 .../tests/StreamsBrokerDownResilienceTest.java     |  2 --
 .../kafka/streams/tests/StreamsOptimizedTest.java  |  1 -
 .../streams/tests/StreamsStandByReplicaTest.java   |  2 --
 .../org/apache/kafka/tools/VerifiableProducer.java |  1 -
 21 files changed, 15 insertions(+), 74 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 754d273..1930cf3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -78,9 +78,9 @@ public class CommonClientConfigs {
     public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = 
"reconnect.backoff.max.ms";
     public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum 
amount of time in milliseconds to wait when reconnecting to a broker that has 
repeatedly failed to connect. If provided, the backoff per host will increase 
exponentially for each consecutive connection failure, up to this maximum. 
After calculating the backoff increase, 20% random jitter is added to avoid 
connection storms.";
 
-    @Deprecated
     public static final String RETRIES_CONFIG = "retries";
-    public static final String RETRIES_DOC = "(Deprecated) Setting a value 
greater than zero will cause the client to resend any request that fails with a 
potentially transient error.";
+    public static final String RETRIES_DOC = "Setting a value greater than 
zero will cause the client to resend any request that fails with a potentially 
transient error." +
+        " It is recommended to set the value to either zero or `MAX_VALUE` and 
use corresponding timeout parameters to control how long a client should retry 
a request.";
 
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to 
wait before attempting to retry a failed request to a given topic partition. 
This avoids repeatedly sending requests in a tight loop under some failure 
scenarios.";
@@ -194,10 +194,4 @@ public class CommonClientConfigs {
                 CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
                 ClientDnsLookup.USE_ALL_DNS_IPS);
     }
-
-    public static void warnIfDeprecatedRetriesValue(AbstractConfig config) {
-        if (config.originals().containsKey(RETRIES_CONFIG)) {
-            log.warn("Configuration '{}' is deprecated and will be removed in 
future version.", RETRIES_CONFIG);
-        }
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index c775faa..814f6e2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -113,11 +113,6 @@ public class AdminClientConfig extends AbstractConfig {
     private static final String SECURITY_PROTOCOL_DOC = 
CommonClientConfigs.SECURITY_PROTOCOL_DOC;
     private static final String METRICS_RECORDING_LEVEL_DOC = 
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
 
-    /**
-     * <code>retries</code>
-     * @deprecated since 2.7
-     */
-    @Deprecated
     public static final String RETRIES_CONFIG = 
CommonClientConfigs.RETRIES_CONFIG;
     public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
 
@@ -227,7 +222,6 @@ public class AdminClientConfig extends AbstractConfig {
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, 
Object> parsedValues) {
         CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
-        CommonClientConfigs.warnIfDeprecatedRetriesValue(this);
         return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, 
parsedValues);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3f39bbe..00ad9c5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -549,7 +549,6 @@ public class KafkaAdminClient extends AdminClient {
         return new LogContext("[AdminClient clientId=" + clientId + "] ");
     }
 
-    @SuppressWarnings("deprecation")
     private KafkaAdminClient(AdminClientConfig config,
                              String clientId,
                              Time time,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 13af2f1..12ecc5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -441,7 +441,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     }
 
     // visible for testing
-    @SuppressWarnings("deprecation")
     Sender newSender(LogContext logContext, KafkaClient kafkaClient, 
ProducerMetadata metadata) {
         int maxInflightRequests = configureInflightRequests(producerConfig);
         int requestTimeoutMs = 
producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 3e399c0..2be8ced 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -126,7 +126,7 @@ public class ProducerConfig extends AbstractConfig {
             + "after a call to <code>send()</code> returns. This limits the 
total time that a record will be delayed "
             + "prior to sending, the time to await acknowledgement from the 
broker (if expected), and the time allowed "
             + "for retriable send failures. The producer may report failure to 
send a record earlier than this config if "
-            + "either an unrecoverable error is encountered, the retries have 
been exhausted (deprecated), "
+            + "either an unrecoverable error is encountered, the retries have 
been exhausted, "
             + "or the record is added to a batch which reached an earlier 
delivery expiration deadline. "
             + "The value of this config should be greater than or equal to the 
sum of <code>" + REQUEST_TIMEOUT_MS_CONFIG + "</code> "
             + "and <code>" + LINGER_MS_CONFIG + "</code>.";
@@ -203,13 +203,9 @@ public class ProducerConfig extends AbstractConfig {
                                                                             + 
" Note that if this setting is set to be greater than 1 and there are failed 
sends, there is a risk of"
                                                                             + 
" message re-ordering due to retries (i.e., if retries are enabled).";
 
-    /**
-     * <code>retries</code>
-     * @deprecated since 2.7
-     */
-    @Deprecated
+    /** <code>retries</code> */
     public static final String RETRIES_CONFIG = 
CommonClientConfigs.RETRIES_CONFIG;
-    private static final String RETRIES_DOC = "(Deprecated) Setting a value 
greater than zero will cause the client to resend any record whose send fails 
with a potentially transient error."
+    private static final String RETRIES_DOC = "Setting a value greater than 
zero will cause the client to resend any record whose send fails with a 
potentially transient error."
             + " Note that this retry is no different than if the client resent 
the record upon receiving the error."
             + " Allowing retries without setting <code>" + 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change 
the"
             + " ordering of records because if two batches are sent to a 
single partition, and the first fails and is retried but the second"
@@ -251,7 +247,7 @@ public class ProducerConfig extends AbstractConfig {
     public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', 
the producer will ensure that exactly one copy of each message is written in 
the stream. If 'false', producer "
                                                         + "retries due to 
broker failures, etc., may write duplicates of the retried message in the 
stream. "
                                                         + "Note that enabling 
idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> 
to be less than or equal to 5, "
-                                                        + "<code>" + 
RETRIES_CONFIG + "</code> (deprecated) to be greater than 0 and <code>" + 
ACKS_CONFIG + "</code> must be 'all'. If these values "
+                                                        + "<code>" + 
RETRIES_CONFIG + "</code> to be greater than 0 and <code>" + ACKS_CONFIG + 
"</code> must be 'all'. If these values "
                                                         + "are not explicitly 
set by the user, suitable values will be chosen. If incompatible values are 
set, "
                                                         + "a 
<code>ConfigException</code> will be thrown.";
 
@@ -439,7 +435,6 @@ public class ProducerConfig extends AbstractConfig {
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, 
Object> parsedValues) {
         CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
-        CommonClientConfigs.warnIfDeprecatedRetriesValue(this);
         Map<String, Object> refinedConfigs = 
CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
         maybeOverrideEnableIdempotence(refinedConfigs);
         maybeOverrideClientId(refinedConfigs);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a4ae6f4..bf273d6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -970,7 +970,6 @@ public class KafkaAdminClientTest {
         }
     }
 
-    @Deprecated
     @Test
     public void testMetadataRetries() throws Exception {
         // We should continue retrying on metadata update failures in spite of 
retry configuration
@@ -1970,8 +1969,7 @@ public class KafkaAdminClientTest {
     @Test
     public void testDescribeCluster() throws Exception {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
-                AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
-                AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100")) {
+                AdminClientConfig.RETRIES_CONFIG, "2")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             // Prepare the metadata response used for the first describe 
cluster
@@ -2010,8 +2008,7 @@ public class KafkaAdminClientTest {
     @Test
     public void testListConsumerGroups() throws Exception {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
-                AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
-                AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "500")) {
+                AdminClientConfig.RETRIES_CONFIG, "2")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             // Empty metadata response should be retried
@@ -2123,8 +2120,7 @@ public class KafkaAdminClientTest {
         final Time time = new MockTime();
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
-                AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0",
-                AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "0")) {
+                AdminClientConfig.RETRIES_CONFIG, "0")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             // Empty metadata causes the request to fail since we have no list 
of brokers
@@ -2137,7 +2133,6 @@ public class KafkaAdminClientTest {
                             Collections.emptyList()));
 
             final ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups();
-            time.sleep(1L);
             TestUtils.assertFutureError(result.all(), KafkaException.class);
         }
     }
@@ -2210,7 +2205,6 @@ public class KafkaAdminClientTest {
         }
     }
 
-    @Deprecated
     @Test
     public void testOffsetCommitNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -2282,7 +2276,6 @@ public class KafkaAdminClientTest {
         }
     }
 
-    @Deprecated
     @Test
     public void testDescribeConsumerGroupNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -2583,7 +2576,6 @@ public class KafkaAdminClientTest {
         }
     }
 
-    @Deprecated
     @Test
     public void testListConsumerGroupOffsetsNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -2695,7 +2687,6 @@ public class KafkaAdminClientTest {
         }
     }
 
-    @Deprecated
     @Test
     public void testDeleteConsumerGroupsNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -2859,7 +2850,6 @@ public class KafkaAdminClientTest {
         }
     }
 
-    @Deprecated
     @Test
     public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -3144,7 +3134,6 @@ public class KafkaAdminClientTest {
         }
     }
 
-    @Deprecated
     @Test
     public void testRemoveMembersFromGroupNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -4465,8 +4454,7 @@ public class KafkaAdminClientTest {
         long defaultApiTimeout = 60000;
         MockTime time = new MockTime();
 
-        try (AdminClientUnitTestEnv env = mockClientEnv(time,
-            AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
String.valueOf(defaultApiTimeout))) {
+        try (AdminClientUnitTestEnv env = mockClientEnv(time, 
AdminClientConfig.RETRIES_CONFIG, "0")) {
 
             // Provide only one prepared response from node 1
             env.kafkaClient().prepareResponseFrom(
@@ -4668,8 +4656,7 @@ public class KafkaAdminClientTest {
         long defaultApiTimeout = 60000;
         MockTime time = new MockTime();
 
-        try (AdminClientUnitTestEnv env = mockClientEnv(time,
-            AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
String.valueOf(defaultApiTimeout))) {
+        try (AdminClientUnitTestEnv env = mockClientEnv(time, 
AdminClientConfig.RETRIES_CONFIG, "0")) {
 
             env.kafkaClient().prepareResponseFrom(
                 prepareDescribeLogDirsResponse(Errors.NONE, "/data"),
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index de0208b..68667b9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -139,7 +139,6 @@ public class KafkaProducerTest {
             Collections.emptySet());
     private final int defaultMetadataIdleMs = 5 * 60 * 1000;
 
-    @SuppressWarnings("deprecation")
     @Test
     public void testOverwriteAcksAndRetriesForIdempotentProducers() {
         Properties props = new Properties();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index c4f9e90..5d6057d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -148,7 +148,6 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
         this.statusTopic = statusTopic;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public void configure(final WorkerConfig config) {
         this.statusTopic = 
config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 9be01fe..d1e034a 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.producer.{KafkaProducer, 
ProducerConfig, Produce
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.utils.Utils
 
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 object ConsoleProducer {
@@ -79,7 +78,6 @@ object ConsoleProducer {
     props
   }
 
-  @nowarn("cat=deprecation")
   def producerProps(config: ProducerConfig): Properties = {
     val props =
       if (config.options.has(config.producerConfigOpt))
@@ -148,7 +146,7 @@ object ConsoleProducer {
       .describedAs("size")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(200)
-    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", 
"(Deprecated) Brokers can fail receiving the message for multiple reasons, and 
being unavailable transiently is just one of them. This property specifies the 
number of retries before the producer give up and drop this message.")
+    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", 
"Brokers can fail receiving the message for multiple reasons, and being 
unavailable transiently is just one of them. This property specifies the number 
of retries before the producer give up and drop this message.")
       .withRequiredArg
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(3)
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index e6f1bd0..2c390c4 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1665,7 +1665,6 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { 
_requestTimeoutMs = timeoutMs; this }
     def deliveryTimeoutMs(timeoutMs: Int): ProducerBuilder = { 
_deliveryTimeoutMs= timeoutMs; this }
 
-    @nowarn("cat=deprecation")
     override def build(): KafkaProducer[String, String] = {
       val producerProps = propsOverride
       producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers)
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 1056ee8..3a64052 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -32,7 +32,6 @@ import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.junit.{After, Before}
 
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
@@ -84,7 +83,6 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness 
{
     new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
   }
 
-  @nowarn("cat=deprecation")
   def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
     val opts = new ConsumerGroupCommandOptions(args)
     val service = new ConsumerGroupService(opts, 
Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala 
b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6b120be..a60e57a 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -33,7 +33,6 @@ import org.junit.Assert.{assertEquals, assertFalse, 
assertTrue}
 import org.junit.{Before, Test}
 import org.scalatest.Assertions.fail
 
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 /**
@@ -107,7 +106,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
     testProduceAfterLogDirFailureOnLeader(Checkpoint)
   }
 
-  @nowarn("cat=deprecation")
   @Test
   def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = {
     this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
@@ -139,7 +137,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
     }
   }
 
-  @nowarn("cat=deprecation")
   def testProduceErrorsFromLogDirFailureOnLeader(failureType: 
LogDirFailureType): Unit = {
     // Disable retries to allow exception to bubble up for validation
     this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d9ade95..c13f033 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -64,7 +64,6 @@ import org.apache.zookeeper.data.ACL
 import org.junit.Assert._
 import org.scalatest.Assertions.fail
 
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.{Map, Seq, mutable}
@@ -585,7 +584,6 @@ object TestUtils extends Logging {
   /**
    * Create a (new) producer with a few pre-configured properties.
    */
-  @nowarn("cat=deprecation")
   def createProducer[K, V](brokerList: String,
                            acks: Int = -1,
                            maxBlockMs: Long = 60 * 1000L,
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 461304b..699a8ab 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -101,14 +101,10 @@
         The old variable is deprecated. Note, that the parameter name itself 
is not affected.
         (Cf. <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name";>KIP-629</a>.)
     </p>
-
     <p>
         The configuration parameter <code>retries</code> is deprecated in 
favor of the new parameter <code>task.timeout.ms</code>.
         Kafka Streams' runtime ignores <code>retries</code> if set, however, 
it would still forward the parameter
-        to its internal clients. Note though, that <code>retries</code> is 
deprecated for the producer and admin client, too.
-        Thus, instead of setting <code>retries</code>, you should configure 
the corresponding client timeouts, namely
-        <code>delivery.timeout.ms</code> and <code>max.block.ms</code> for the 
producer and
-        <code>default.api.timeout.ms</code> for the admin client.
+        to its internal clients.
     </p>
     <p>
         We added <code>SlidingWindows</code> as an option for 
<code>windowedBy()</code> windowed aggregations as described in
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4d515bb..bc53fc2 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -21,10 +21,9 @@
 
 <h5><a id="upgrade_270_notable" href="#upgrade_270_notable">Notable changes in 
2.7.0</a></h5>
 <ul>
-    <li>The configuration parameter <code>retries</code> is deprecated for the 
producer, admin, and Kafka Streams clients
+    <li>The configuration parameter <code>retries</code> is deprecated for the 
Kafka Streams client
         via <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams";>KIP-572</a>.
-        You should use the producer's <code>delivery.timeout.ms</code> and 
<code>max.block.ms</code>, admin's
-        <code>default.api.timeout.ms</code>, and Kafka Streams' new 
<code>task.timeout.ms</code> parameters instead.
+        You should use the new <code>task.timeout.ms</code> parameters instead.
         Note that parameter <code>retry.backoff.ms</code> is not impacted by 
this change.
     </li>
     <li>Altering non-reconfigurable configs of existent listeners causes 
<code>InvalidRequestException</code>.
diff --git 
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
 
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index 97a1ec9..23272a2 100644
--- 
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++ 
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -272,7 +272,6 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
         this.sslEngineFactoryClass = sslEngineFactoryClass;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public void activateOptions() {
         // check for config parameter validity
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 66703a4..387b56e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -693,21 +693,18 @@ public class StreamsConfigTest {
         
assertThat(producerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), 
is(nullValue()));
     }
 
-    @Deprecated
     @Test
     public void shouldNotOverrideUserConfigRetriesIfExactlyAlphaOnceEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
     }
 
-    @Deprecated
     @Test
     public void shouldNotOverrideUserConfigRetriesIfExactlyBetaOnceEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
EXACTLY_ONCE_BETA);
         shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
     }
 
-    @Deprecated
     private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
         final int numberOfRetries = 42;
         props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index c1fef62..828bf9b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -45,7 +45,6 @@ public class StreamsBrokerDownResilienceTest {
 
     private static final String SINK_TOPIC = "streamsResilienceSink";
 
-    @SuppressWarnings("deprecation") // TODO revisit in follow up PR
     public static void main(final String[] args) throws IOException {
         if (args.length < 2) {
             System.err.println("StreamsBrokerDownResilienceTest are expecting 
two parameters: propFile, additionalConfigs; but only see " + args.length + " 
parameter");
@@ -123,7 +122,6 @@ public class StreamsBrokerDownResilienceTest {
         });
     }
 
-    @SuppressWarnings("deprecation") // TODO revisit in follow up PR
     private static boolean confirmCorrectConfigs(final Properties properties) {
         return 
properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG))
 &&
                
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG))
 &&
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
index 882978b..da063c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -49,7 +49,6 @@ import static java.time.Duration.ofMillis;
 
 public class StreamsOptimizedTest {
 
-    @SuppressWarnings("deprecation") // TODO revisit in follow up PR
     public static void main(final String[] args) throws Exception {
         if (args.length < 1) {
             System.err.println("StreamsOptimizedTest requires one argument 
(properties-file) but no provided: ");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index 4da2b2d..9b94b01 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -43,7 +43,6 @@ import java.util.Set;
 
 public class StreamsStandByReplicaTest {
 
-    @SuppressWarnings("deprecation") // TODO revisit in follow up PR
     public static void main(final String[] args) throws IOException {
         if (args.length < 2) {
             System.err.println("StreamsStandByReplicaTest are expecting two 
parameters: " +
@@ -159,7 +158,6 @@ public class StreamsStandByReplicaTest {
         streams.close(Duration.ofSeconds(10));
     }
 
-    @SuppressWarnings("deprecation") // TODO revisit in follow up PR
     private static boolean confirmCorrectConfigs(final Properties properties) {
         return 
properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG))
 &&
                
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG))
 &&
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 3862255..ee863d4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -217,7 +217,6 @@ public class VerifiableProducer implements AutoCloseable {
     }
 
     /** Construct a VerifiableProducer object from command-line arguments. */
-    @SuppressWarnings("deprecation")
     public static VerifiableProducer createFromArgs(ArgumentParser parser, 
String[] args) throws ArgumentParserException {
         Namespace res = parser.parseArgs(args);
 

Reply via email to