This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fef9aebb191 KAFKA-18276 Migrate ProducerRebootstrapTest to new test 
infra (#19046)
fef9aebb191 is described below

commit fef9aebb1915fec79644fa9ec78191f88ea49877
Author: ClarkChen <[email protected]>
AuthorDate: Mon Mar 24 01:09:17 2025 +0800

    KAFKA-18276 Migrate ProducerRebootstrapTest to new test infra (#19046)
    
    The PR changed three things.
    * Migrated `ProducerRebootstrapTest` to new test infra and removed the
    old Scala test.
    * Updated the original test case to cover rebootstrap scenarios.
    * Integrated `ProducerRebootstrapTest` into `ClientRebootstrapTest` in
    the `client-integration-tests` module.
    
    Default `ProducerRebootstrap` config:
    > properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
    "rebootstrap");
    
    
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
    "300000");
    
    
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
    "10000");
    
    
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
    "30000");
    properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
    properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
    "1000L");
    
    The test case for the producer with enabled rebootstrap
    <img width="1549" alt="Screenshot 2025-03-17 at 10 46 03 PM"
    
src="https://github.com/user-attachments/assets/547840a6-d79d-4db4-98c0-9b05ed04cf60";
    />
    
    The test case for the producer with disabled rebootstrap
    <img width="1552" alt="Screenshot 2025-03-17 at 10 46 47 PM"
    
src="https://github.com/user-attachments/assets/2248e809-d9d5-4f3b-a24f-ba1aa0fef728";
    />
    
    Reviewers: TengYao Chi <[email protected]>, Ken Huang
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/ClientRebootstrapTest.java       | 74 ++++++++++++++++++++++
 .../kafka/api/ProducerRebootstrapTest.scala        | 56 ----------------
 2 files changed, 74 insertions(+), 56 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
index 9c21c09f50c..081995a6734 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
@@ -26,10 +28,12 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ClientRebootstrapTest {
@@ -94,4 +98,74 @@ public class ClientRebootstrapTest {
         // Since the brokers cached during the bootstrap are offline, the 
admin client needs to wait the default timeout for other threads.
         admin.close(Duration.ZERO);
     }
+
+    @ClusterTest(
+        brokers = REPLICAS,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
+        }
+    )
+    public void testProducerRebootstrap(ClusterInstance clusterInstance) 
throws ExecutionException, InterruptedException {
+        try (var admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 
REPLICAS)));
+        }
+
+        var broker0 = 0;
+        var broker1 = 1;
+
+        // It's ok to shut the leader down, cause the reelection is small 
enough to the producer timeout.
+        clusterInstance.shutdownBroker(broker0);
+
+        try (var producer = clusterInstance.producer()) {
+            // Only the broker 1 is available for the producer during the 
bootstrap.
+            var recordMetadata0 = producer.send(new ProducerRecord<>(TOPIC, 
"value 0".getBytes())).get();
+            assertEquals(0, recordMetadata0.offset());
+
+            clusterInstance.shutdownBroker(broker1);
+            clusterInstance.startBroker(broker0);
+
+            // Current broker 1 is offline.
+            // However, the broker 0 from the bootstrap list is online.
+            // Should be able to produce records.
+            var recordMetadata1 = producer.send(new ProducerRecord<>(TOPIC, 
"value 1".getBytes())).get();
+            assertEquals(0, recordMetadata1.offset());
+        }
+    }
+
+    @ClusterTest(
+        brokers = REPLICAS,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
+        }
+    )
+    public void testProducerRebootstrapDisabled(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        try (var admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 
REPLICAS)));
+        }
+
+        var broker0 = 0;
+        var broker1 = 1;
+
+        // It's ok to shut the leader down, cause the reelection is small 
enough to the producer timeout.
+        clusterInstance.shutdownBroker(broker0);
+
+        var producer = 
clusterInstance.producer(Map.of(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
 "none"));
+
+        // Only the broker 1 is available for the producer during the 
bootstrap.
+        var recordMetadata0 = producer.send(new ProducerRecord<>(TOPIC, "value 
0".getBytes())).get();
+        assertEquals(0, recordMetadata0.offset());
+
+        clusterInstance.shutdownBroker(broker1);
+        clusterInstance.startBroker(broker0);
+
+        // The broker 1, originally cached during the bootstrap, is offline.
+        // As a result, the producer will throw a TimeoutException when trying 
to send a message.
+        assertThrows(TimeoutException.class, () -> producer.send(new 
ProducerRecord<>(TOPIC, "value 1".getBytes())).get(5, TimeUnit.SECONDS));
+        // Since the brokers cached during the bootstrap are offline, the 
producer needs to wait the default timeout for other threads.
+        producer.close(Duration.ZERO);
+    }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
deleted file mode 100644
index aa8f46d7997..00000000000
--- a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-class ProducerRebootstrapTest extends RebootstrapTest {
-  @ParameterizedTest(name = 
"{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
-  @ValueSource(booleans = Array(false, true))
-  def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
-    // It's ok to shut the leader down, cause the reelection is small enough 
to the producer timeout.
-    server1.shutdown()
-    server1.awaitShutdown()
-
-    val producer = createProducer(configOverrides = 
clientOverrides(useRebootstrapTriggerMs))
-
-    // Only the server 0 is available for the producer during the bootstrap.
-    val recordMetadata0 = producer.send(new ProducerRecord(topic, part, "key 
0".getBytes, "value 0".getBytes)).get()
-    assertEquals(0, recordMetadata0.offset())
-
-    server0.shutdown()
-    server0.awaitShutdown()
-    server1.startup()
-
-    // The server 0, originally cached during the bootstrap, is offline.
-    // However, the server 1 from the bootstrap list is online.
-    // Should be able to produce records.
-    val recordMetadata1 = producer.send(new ProducerRecord(topic, part, "key 
1".getBytes, "value 1".getBytes)).get()
-    assertEquals(0, recordMetadata1.offset())
-
-    server1.shutdown()
-    server1.awaitShutdown()
-    server0.startup()
-
-    // The same situation, but the server 1 has gone and server 0 is back.
-    val recordMetadata2 = producer.send(new ProducerRecord(topic, part, "key 
1".getBytes, "value 1".getBytes)).get()
-    assertEquals(1, recordMetadata2.offset())
-  }
-}

Reply via email to