This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1547204baa2 KAFKA-18914 Migrate ConsumerRebootstrapTest to use new 
test infra (#19154)
1547204baa2 is described below

commit 1547204baa2f4cf6ee55b0d7ab1b508a9e6d09e0
Author: ClarkChen <[email protected]>
AuthorDate: Wed Mar 26 01:53:42 2025 +0800

    KAFKA-18914 Migrate ConsumerRebootstrapTest to use new test infra (#19154)
    
    Migrate ConsumerRebootstrapTest to the new test infra and remove the old
    Scala test.
    
    The PR changed three things.
    * Migrated `ConsumerRebootstrapTest` to new test infra and removed the
    old Scala test.
    * Updated the original test case to cover rebootstrap scenarios.
    * Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in
    the `client-integration-tests` module.
    * Removed the `RebootstrapTest.scala`.
    
    Default `ConsumerRebootstrap` config:
    > properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
    "rebootstrap");
    
    
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
    "300000");
    
    
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
    "10000");
    
    
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
    "30000");
    properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
    properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
    "1000L");
    
    The test case for the consumer with enabled rebootstrap
    ![Screenshot 2025-03-22 at 9 48
    13 
PM](https://github.com/user-attachments/assets/8470549f-a24c-43fa-ae44-789cbf422a63)
    
    
    The test case for the consumer with disabled rebootstrap
    ![Screenshot 2025-03-22 at 9 47
    22 
PM](https://github.com/user-attachments/assets/0a183464-6a74-449f-8e71-d641a6ea5bb1)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/ClientRebootstrapTest.java       | 133 ++++++++++++++++++-
 .../kafka/api/ConsumerRebootstrapTest.scala        | 146 ---------------------
 .../integration/kafka/api/RebootstrapTest.scala    |  69 ----------
 3 files changed, 129 insertions(+), 219 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
index 081995a6734..2f11e13377d 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
@@ -17,9 +17,14 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.Type;
@@ -38,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ClientRebootstrapTest {
     private static final String TOPIC = "topic";
+    private static final int PARTITIONS = 1;
     private static final int REPLICAS = 2;
 
     @ClusterTest(
@@ -55,7 +61,7 @@ public class ClientRebootstrapTest {
         clusterInstance.shutdownBroker(broker0);
 
         try (var admin = clusterInstance.admin()) {
-            admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 
REPLICAS)));
+            admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) 
REPLICAS)));
 
             // Only the broker 1 is available for the admin client during the 
bootstrap.
             assertDoesNotThrow(() -> admin.listTopics().names().get(timeout, 
TimeUnit.SECONDS).contains(TOPIC));
@@ -84,7 +90,7 @@ public class ClientRebootstrapTest {
         clusterInstance.shutdownBroker(broker0);
 
         var admin = 
clusterInstance.admin(Map.of(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
 "none"));
-        admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS)));
+        admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) 
REPLICAS)));
 
         // Only the broker 1 is available for the admin client during the 
bootstrap.
         assertDoesNotThrow(() -> admin.listTopics().names().get(60, 
TimeUnit.SECONDS).contains(TOPIC));
@@ -109,7 +115,7 @@ public class ClientRebootstrapTest {
     )
     public void testProducerRebootstrap(ClusterInstance clusterInstance) 
throws ExecutionException, InterruptedException {
         try (var admin = clusterInstance.admin()) {
-            admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 
REPLICAS)));
+            admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) 
REPLICAS)));
         }
 
         var broker0 = 0;
@@ -144,7 +150,7 @@ public class ClientRebootstrapTest {
     )
     public void testProducerRebootstrapDisabled(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
         try (var admin = clusterInstance.admin()) {
-            admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 
REPLICAS)));
+            admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) 
REPLICAS)));
         }
 
         var broker0 = 0;
@@ -168,4 +174,123 @@ public class ClientRebootstrapTest {
         // Since the brokers cached during the bootstrap are offline, the 
producer needs to wait the default timeout for other threads.
         producer.close(Duration.ZERO);
     }
+
+    public void consumerRebootstrap(ClusterInstance clusterInstance, 
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+        clusterInstance.createTopic(TOPIC, PARTITIONS, (short) REPLICAS);
+
+        var broker0 = 0;
+        var broker1 = 1;
+        var partitions = List.of(new TopicPartition(TOPIC, 0));
+
+        try (var producer = 
clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
+            var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, 
"value 0".getBytes())).get();
+            assertEquals(0, recordMetadata.offset());
+        }
+
+        clusterInstance.shutdownBroker(broker0);
+
+        try (var consumer = 
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name))) {
+            // Only the server 1 is available for the consumer during the 
bootstrap.
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
+            TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll 
data.");
+
+            // Bring back the server 0 and shut down 1.
+            clusterInstance.shutdownBroker(broker1);
+            clusterInstance.startBroker(broker0);
+
+            try (var producer = 
clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
+                var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, 
"value 1".getBytes())).get();
+                assertEquals(1, recordMetadata.offset());
+            }
+
+            // The server 1 originally cached during the bootstrap, is offline.
+            // However, the server 0 from the bootstrap list is online.
+            TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll 
data.");
+        }
+    }
+
+    @ClusterTest(
+        brokers = REPLICAS,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+        })
+    public void testClassicConsumerRebootstrap(ClusterInstance 
clusterInstance) throws InterruptedException, ExecutionException {
+        consumerRebootstrap(clusterInstance, GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest(
+        brokers = REPLICAS,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+        })
+    public void testConsumerRebootstrap(ClusterInstance clusterInstance) 
throws InterruptedException, ExecutionException {
+        consumerRebootstrap(clusterInstance, GroupProtocol.CONSUMER);
+    }
+
+    public void consumerRebootstrapDisabled(ClusterInstance clusterInstance, 
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+        clusterInstance.createTopic(TOPIC, PARTITIONS, (short) REPLICAS);
+
+        var broker0 = 0;
+        var broker1 = 1;
+        var tp = new TopicPartition(TOPIC, 0);
+
+        try (var producer = 
clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
+            var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, 
"value 0".getBytes())).get();
+            assertEquals(0, recordMetadata.offset());
+        }
+
+        clusterInstance.shutdownBroker(broker0);
+
+        try (var consumer = clusterInstance.consumer(Map.of(
+            CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none",
+            ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name)
+        )) {
+            // Only the server 1 is available for the consumer during the 
bootstrap.
+            consumer.assign(List.of(tp));
+            consumer.seekToBeginning(List.of(tp));
+            TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll 
data.");
+
+            // Bring back the server 0 and shut down 1.
+            clusterInstance.shutdownBroker(broker1);
+            clusterInstance.startBroker(broker0);
+
+            try (var producer = 
clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
+                var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, 
"value 1".getBytes())).get();
+                assertEquals(1, recordMetadata.offset());
+            }
+
+            // The server 1 originally cached during the bootstrap, is offline.
+            // However, the server 0 from the bootstrap list is online.
+            assertEquals(0, consumer.poll(Duration.ofMillis(100)).count());
+        }
+    }
+
+    @ClusterTest(
+        brokers = REPLICAS,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
+        }
+    )
+    public void testClassicConsumerRebootstrapDisabled(ClusterInstance 
clusterInstance) throws InterruptedException, ExecutionException {
+        consumerRebootstrapDisabled(clusterInstance, GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest(
+        brokers = REPLICAS,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
+        }
+    )
+    public void testConsumerRebootstrapDisabled(ClusterInstance 
clusterInstance) throws InterruptedException, ExecutionException {
+        consumerRebootstrapDisabled(clusterInstance, GroupProtocol.CONSUMER);
+    }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
deleted file mode 100644
index ea9345f7265..00000000000
--- a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import kafka.api.ConsumerRebootstrapTest._
-import 
kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersAll
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
-import org.junit.jupiter.api.Disabled
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-
-import java.time.Duration
-import java.util.{Collections, stream}
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.TimeoutException
-
-class ConsumerRebootstrapTest extends RebootstrapTest {
-  @ParameterizedTest(name = RebootstrapTestName)
-  @MethodSource(Array("rebootstrapTestParams"))
-  def testRebootstrap(quorum: String, groupProtocol: String, 
useRebootstrapTriggerMs: Boolean): Unit = {
-    sendRecords(10, 0)
-
-    TestUtils.waitUntilTrue(
-      () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == 
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
-      "Timeout waiting for records to be replicated"
-    )
-
-    server1.shutdown()
-    server1.awaitShutdown()
-
-    val consumer = createConsumer(configOverrides = 
clientOverrides(useRebootstrapTriggerMs))
-
-    // Only the server 0 is available for the consumer during the bootstrap.
-    consumer.assign(Collections.singleton(tp))
-
-    consumeAndVerifyRecords(consumer, 10, 0)
-
-    // Bring back the server 1 and shut down 0.
-    server1.startup()
-
-    TestUtils.waitUntilTrue(
-      () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == 
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
-      "Timeout waiting for records to be replicated"
-    )
-
-    server0.shutdown()
-    server0.awaitShutdown()
-    sendRecords(10, 10)
-
-    // The server 0, originally cached during the bootstrap, is offline.
-    // However, the server 1 from the bootstrap list is online.
-    // Should be able to consume records.
-    consumeAndVerifyRecords(consumer, 10, 10, startingKeyAndValueIndex = 10, 
startingTimestamp = 10)
-
-    // Bring back the server 0 and shut down 1.
-    server0.startup()
-
-    TestUtils.waitUntilTrue(
-      () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == 
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
-      "Timeout waiting for records to be replicated"
-    )
-
-    server1.shutdown()
-    server1.awaitShutdown()
-    sendRecords(10, 20)
-
-    // The same situation, but the server 1 has gone and server 0 is back.
-    consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20, 
startingTimestamp = 20)
-  }
-
-  @Disabled
-  @ParameterizedTest(name = RebootstrapTestName)
-  @MethodSource(Array("rebootstrapTestParams"))
-  def testRebootstrapDisabled(quorum: String, groupProtocol: String, 
useRebootstrapTriggerMs: Boolean): Unit = {
-    server1.shutdown()
-    server1.awaitShutdown()
-
-    val configOverrides = clientOverrides(useRebootstrapTriggerMs)
-    configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"none")
-    if (useRebootstrapTriggerMs)
-      
configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
 "1000")
-
-    val producer = createProducer(configOverrides = configOverrides)
-    val consumer = createConsumer(configOverrides = configOverrides)
-    val adminClient = createAdminClient(configOverrides = configOverrides)
-
-    // Only the server 0 is available during the bootstrap.
-    val recordMetadata0 = producer.send(new ProducerRecord(topic, part, 0L, 
"key 0".getBytes, "value 0".getBytes)).get(15, TimeUnit.SECONDS)
-    assertEquals(0, recordMetadata0.offset())
-    adminClient.listTopics().names().get(15, TimeUnit.SECONDS)
-    consumer.assign(Collections.singleton(tp))
-    consumeAndVerifyRecords(consumer, 1, 0)
-
-    server0.shutdown()
-    server0.awaitShutdown()
-    server1.startup()
-
-    assertThrows(classOf[TimeoutException], () => producer.send(new 
ProducerRecord(topic, part, "key 2".getBytes, "value 2".getBytes)).get(5, 
TimeUnit.SECONDS))
-    assertThrows(classOf[TimeoutException], () => 
adminClient.listTopics().names().get(5, TimeUnit.SECONDS))
-
-    val producer2 = createProducer(configOverrides = configOverrides)
-    producer2.send(new ProducerRecord(topic, part, 1L, "key 1".getBytes, 
"value 1".getBytes)).get(15, TimeUnit.SECONDS)
-    assertEquals(0, consumer.poll(Duration.ofSeconds(5)).count)
-  }
-
-  private def sendRecords(numRecords: Int, from: Int): Unit = {
-    val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
-    (from until (numRecords + from)).foreach { i =>
-      val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong, 
s"key $i".getBytes, s"value $i".getBytes)
-      producer.send(record)
-    }
-    producer.flush()
-    producer.close()
-  }
-}
-
-object ConsumerRebootstrapTest {
-
-  final val RebootstrapTestName = 
s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}"
-  def rebootstrapTestParams: stream.Stream[Arguments] = {
-    getTestQuorumAndGroupProtocolParametersAll
-      .flatMap { baseArgs =>
-        stream.Stream.of(
-          Arguments.of((baseArgs.get :+ true):_*),
-          Arguments.of((baseArgs.get :+ false):_*)
-        )
-      }
-  }
-}
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
deleted file mode 100644
index 2d84284cd6b..00000000000
--- a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import kafka.server.{KafkaBroker, KafkaConfig}
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
-
-import java.util.Properties
-
-abstract class RebootstrapTest extends AbstractConsumerTest {
-  override def brokerCount: Int = 2
-
-  def server0: KafkaBroker = serverForId(0).get
-  def server1: KafkaBroker = serverForId(1).get
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.doSetup(testInfo, createOffsetsTopic = true)
-
-    // Enable unclean leader election for the test topic
-    val topicProps = new Properties
-    topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
-
-    // create the test topic with all the brokers as replicas
-    createTopic(topic, 2, brokerCount, adminClientConfig = 
this.adminClientConfig, topicConfig = topicProps)
-  }
-
-  override def generateConfigs: Seq[KafkaConfig] = {
-    val overridingProps = new Properties()
-    
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 brokerCount.toString)
-
-    // In this test, fixed ports are necessary, because brokers must have the
-    // same port after the restart.
-    FixedPortTestUtils.createBrokerConfigs(brokerCount, 
enableControlledShutdown = false)
-      .map(KafkaConfig.fromProps(_, overridingProps))
-  }
-
-  def clientOverrides(useRebootstrapTriggerMs: Boolean): Properties = {
-    val overrides = new Properties()
-    if (useRebootstrapTriggerMs) {
-      
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
 "5000")
-    } else {
-      
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
 "3600000")
-      
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
"5000")
-      
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
 "5000")
-      overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
-      overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, 
"1000")
-    }
-    overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"rebootstrap")
-    overrides
-  }
-}

Reply via email to