This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e828767062f KAFKA-18790 Fix testCustomQuotaCallback (#18906)
e828767062f is described below

commit e828767062feb163ea0506ac8c56dcf6f33268b4
Author: Ming-Yen Chung <mingyen...@gmail.com>
AuthorDate: Sat Feb 15 03:07:59 2025 +0800

    KAFKA-18790 Fix testCustomQuotaCallback (#18906)
    
    Frequently updating the trust store can cause unexpected termination of the 
AsyncConsumer background thread.
    
    1. To resolve this issue, reuse the same AdminClient instead of recreating 
it.
    2. Add error logging when fail to initialize resources for the consumer 
network thread.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../consumer/internals/ConsumerNetworkThread.java  |  2 ++
 .../kafka/api/CustomQuotaCallbackTest.scala        | 24 +++++++++-------------
 2 files changed, 12 insertions(+), 14 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 0e7b58acc21..a48289919b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -109,6 +109,8 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
                     log.error("Unexpected error caught in consumer network 
thread", e);
                 }
             }
+        } catch (final Throwable e) {
+            log.error("Failed to initialize resources for consumer network 
thread", e);
         } finally {
             cleanup();
         }
diff --git 
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala 
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 3d09c667371..006a5d085c5 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -40,9 +40,7 @@ import java.util.Properties
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.{lang, util}
-import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
-import scala.util.Using
 
 class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
 
@@ -57,8 +55,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness 
with SaslSetup {
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
   override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
   override protected val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  private val adminClients = new ArrayBuffer[Admin]()
   private var producerWithoutQuota: KafkaProducer[Array[Byte], Array[Byte]] = _
+  private var admin: Admin = _
 
   val defaultRequestQuota = 1000
   val defaultProduceQuota = 2000 * 1000 * 1000
@@ -82,7 +80,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness 
with SaslSetup {
 
   @AfterEach
   override def tearDown(): Unit = {
-    adminClients.foreach(_.close())
+    if (admin != null) admin.close()
     GroupedUserQuotaCallback.tearDown()
     super.tearDown()
     closeSasl()
@@ -196,13 +194,11 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
     topic: String, 
     listenerName: ListenerName = listenerName
   ): Unit = {
-    Using.resource(createAdminClient()) { admin =>
-      TestUtils.deleteTopicWithAdmin(
-        admin = admin,
-        topic = topic,
-        brokers = aliveBrokers,
-        controllers = controllerServers)
-    }
+    TestUtils.deleteTopicWithAdmin(
+      admin = createAdminClient(),
+      topic = topic,
+      brokers = aliveBrokers,
+      controllers = controllerServers)
   }
 
   private def createTopic(topic: String, numPartitions: Int, leader: Int): 
Unit = {
@@ -218,6 +214,7 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
   }
 
   private def createAdminClient(): Admin = {
+    if (admin != null) return admin
     val config = new util.HashMap[String, Object]
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, 
Object]].forEach { (key, value) =>
@@ -225,9 +222,8 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
     }
     config.put(SaslConfigs.SASL_JAAS_CONFIG,
       JaasModule.scramLoginModule(JaasTestUtils.KAFKA_SCRAM_ADMIN, 
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD).toString)
-    val adminClient = Admin.create(config)
-    adminClients += adminClient
-    adminClient
+    admin = Admin.create(config)
+    admin
   }
 
   private def produceWithoutThrottle(topic: String, numRecords: Int): Unit = {

Reply via email to