This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fef9aebb191 KAFKA-18276 Migrate ProducerRebootstrapTest to new test
infra (#19046)
fef9aebb191 is described below
commit fef9aebb1915fec79644fa9ec78191f88ea49877
Author: ClarkChen <[email protected]>
AuthorDate: Mon Mar 24 01:09:17 2025 +0800
KAFKA-18276 Migrate ProducerRebootstrapTest to new test infra (#19046)
The PR changed three things.
* Migrated `ProducerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ProducerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
Default `ProducerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the producer with enabled rebootstrap
<img width="1549" alt="Screenshot 2025-03-17 at 10 46 03 PM"
src="https://github.com/user-attachments/assets/547840a6-d79d-4db4-98c0-9b05ed04cf60"
/>
The test case for the producer with disabled rebootstrap
<img width="1552" alt="Screenshot 2025-03-17 at 10 46 47 PM"
src="https://github.com/user-attachments/assets/2248e809-d9d5-4f3b-a24f-ba1aa0fef728"
/>
Reviewers: TengYao Chi <[email protected]>, Ken Huang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/ClientRebootstrapTest.java | 74 ++++++++++++++++++++++
.../kafka/api/ProducerRebootstrapTest.scala | 56 ----------------
2 files changed, 74 insertions(+), 56 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
index 9c21c09f50c..081995a6734 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.clients;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
@@ -26,10 +28,12 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ClientRebootstrapTest {
@@ -94,4 +98,74 @@ public class ClientRebootstrapTest {
// Since the brokers cached during the bootstrap are offline, the
admin client needs to wait the default timeout for other threads.
admin.close(Duration.ZERO);
}
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
+ }
+ )
+ public void testProducerRebootstrap(ClusterInstance clusterInstance)
throws ExecutionException, InterruptedException {
+ try (var admin = clusterInstance.admin()) {
+ admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short)
REPLICAS)));
+ }
+
+ var broker0 = 0;
+ var broker1 = 1;
+
+ // It's ok to shut the leader down, cause the reelection is small
enough to the producer timeout.
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var producer = clusterInstance.producer()) {
+ // Only the broker 1 is available for the producer during the
bootstrap.
+ var recordMetadata0 = producer.send(new ProducerRecord<>(TOPIC,
"value 0".getBytes())).get();
+ assertEquals(0, recordMetadata0.offset());
+
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ // Current broker 1 is offline.
+ // However, the broker 0 from the bootstrap list is online.
+ // Should be able to produce records.
+ var recordMetadata1 = producer.send(new ProducerRecord<>(TOPIC,
"value 1".getBytes())).get();
+ assertEquals(0, recordMetadata1.offset());
+ }
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
+ }
+ )
+ public void testProducerRebootstrapDisabled(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ try (var admin = clusterInstance.admin()) {
+ admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short)
REPLICAS)));
+ }
+
+ var broker0 = 0;
+ var broker1 = 1;
+
+ // It's ok to shut the leader down, cause the reelection is small
enough to the producer timeout.
+ clusterInstance.shutdownBroker(broker0);
+
+ var producer =
clusterInstance.producer(Map.of(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"none"));
+
+ // Only the broker 1 is available for the producer during the
bootstrap.
+ var recordMetadata0 = producer.send(new ProducerRecord<>(TOPIC, "value
0".getBytes())).get();
+ assertEquals(0, recordMetadata0.offset());
+
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ // The broker 1, originally cached during the bootstrap, is offline.
+ // As a result, the producer will throw a TimeoutException when trying
to send a message.
+ assertThrows(TimeoutException.class, () -> producer.send(new
ProducerRecord<>(TOPIC, "value 1".getBytes())).get(5, TimeUnit.SECONDS));
+ // Since the brokers cached during the bootstrap are offline, the
producer needs to wait the default timeout for other threads.
+ producer.close(Duration.ZERO);
+ }
}
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
deleted file mode 100644
index aa8f46d7997..00000000000
--- a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-class ProducerRebootstrapTest extends RebootstrapTest {
- @ParameterizedTest(name =
"{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
- @ValueSource(booleans = Array(false, true))
- def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
- // It's ok to shut the leader down, cause the reelection is small enough
to the producer timeout.
- server1.shutdown()
- server1.awaitShutdown()
-
- val producer = createProducer(configOverrides =
clientOverrides(useRebootstrapTriggerMs))
-
- // Only the server 0 is available for the producer during the bootstrap.
- val recordMetadata0 = producer.send(new ProducerRecord(topic, part, "key
0".getBytes, "value 0".getBytes)).get()
- assertEquals(0, recordMetadata0.offset())
-
- server0.shutdown()
- server0.awaitShutdown()
- server1.startup()
-
- // The server 0, originally cached during the bootstrap, is offline.
- // However, the server 1 from the bootstrap list is online.
- // Should be able to produce records.
- val recordMetadata1 = producer.send(new ProducerRecord(topic, part, "key
1".getBytes, "value 1".getBytes)).get()
- assertEquals(0, recordMetadata1.offset())
-
- server1.shutdown()
- server1.awaitShutdown()
- server0.startup()
-
- // The same situation, but the server 1 has gone and server 0 is back.
- val recordMetadata2 = producer.send(new ProducerRecord(topic, part, "key
1".getBytes, "value 1".getBytes)).get()
- assertEquals(1, recordMetadata2.offset())
- }
-}