This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8500e4d8af KAFKA-19849 Move ThrottledChannelExpirationTest to server 
module (#21328)
b8500e4d8af is described below

commit b8500e4d8af08be5c8fe55cef65085cbcccaba81
Author: Lan Ding <[email protected]>
AuthorDate: Tue Jan 20 13:50:37 2026 +0800

    KAFKA-19849 Move ThrottledChannelExpirationTest to server module (#21328)
    
    JIRA link:
    [KAFKA-19849](https://issues.apache.org/jira/browse/KAFKA-19849) Rewrite
    and migrate `ThrottledChannelExpirationTest`  to `kafka.server`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../server/ThrottledChannelExpirationTest.scala    | 85 ----------------------
 ...st.java => ThrottledChannelExpirationTest.java} | 47 +++++++++++-
 2 files changed, 45 insertions(+), 87 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
deleted file mode 100644
index 2e2b32ee5ba..00000000000
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-
-import java.util.Collections
-import java.util.concurrent.DelayQueue
-import org.apache.kafka.common.metrics.MetricConfig
-import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.server.config.ClientQuotaManagerConfig
-import org.apache.kafka.server.quota.{ClientQuotaManager, QuotaType, 
ThrottleCallback, ThrottledChannel}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
-
-class ThrottledChannelExpirationTest {
-  private val time = new MockTime
-  private var numCallbacksForStartThrottling: Int = 0
-  private var numCallbacksForEndThrottling: Int = 0
-  private val metrics = new org.apache.kafka.common.metrics.Metrics(new 
MetricConfig(),
-                                                                    
Collections.emptyList(),
-                                                                    time)
-  private val callback = new ThrottleCallback {
-    override def startThrottling(): Unit = {
-      numCallbacksForStartThrottling += 1
-    }
-
-    override def endThrottling(): Unit = {
-      numCallbacksForEndThrottling += 1
-    }
-  }
-
-  @BeforeEach
-  def beforeMethod(): Unit = {
-    numCallbacksForStartThrottling = 0
-    numCallbacksForEndThrottling = 0
-  }
-
-  @Test
-  def testCallbackInvocationAfterExpiration(): Unit = {
-    val clientMetrics = new ClientQuotaManager(new ClientQuotaManagerConfig(), 
metrics, QuotaType.PRODUCE, time, "")
-
-    val delayQueue = new DelayQueue[ThrottledChannel]()
-    val reaper = new clientMetrics.ThrottledChannelReaper(delayQueue, "")
-    try {
-      // Add 4 elements to the queue out of order. Add 2 elements with the 
same expire timestamp.
-      val channel1 = new ThrottledChannel(time, 10, callback)
-      val channel2 = new ThrottledChannel(time, 30, callback)
-      val channel3 = new ThrottledChannel(time, 30, callback)
-      val channel4 = new ThrottledChannel(time, 20, callback)
-      delayQueue.add(channel1)
-      delayQueue.add(channel2)
-      delayQueue.add(channel3)
-      delayQueue.add(channel4)
-      assertEquals(4, numCallbacksForStartThrottling)
-
-      for (itr <- 1 to 3) {
-        time.sleep(10)
-        reaper.doWork()
-        assertEquals(itr, numCallbacksForEndThrottling)
-      }
-      reaper.doWork()
-      assertEquals(4, numCallbacksForEndThrottling)
-      assertEquals(0, delayQueue.size())
-      reaper.doWork()
-      assertEquals(4, numCallbacksForEndThrottling)
-    } finally {
-      clientMetrics.shutdown()
-    }
-  }
-}
diff --git 
a/server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelTest.java 
b/server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelExpirationTest.java
similarity index 50%
rename from 
server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelTest.java
rename to 
server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelExpirationTest.java
index 64f37bdc378..8766fcdbba0 100644
--- 
a/server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelExpirationTest.java
@@ -14,29 +14,72 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.server.quota;
 
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
+import java.util.concurrent.DelayQueue;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class ThrottledChannelTest {
-
+public class ThrottledChannelExpirationTest {
     private final MockTime time = new MockTime();
+    private final Metrics metrics = new Metrics(new MetricConfig(), List.of(), 
time);
+    private int numCallbacksForStartThrottling = 0;
+    private int numCallbacksForEndThrottling = 0;
     private final ThrottleCallback callback = new ThrottleCallback() {
         @Override
         public void startThrottling() {
+            numCallbacksForStartThrottling++;
         }
 
         @Override
         public void endThrottling() {
+            numCallbacksForEndThrottling++;
         }
     };
 
+    @Test
+    public void testCallbackInvocationAfterExpiration() {
+        ClientQuotaManager clientMetrics = new ClientQuotaManager(new 
ClientQuotaManagerConfig(), metrics, QuotaType.PRODUCE, time, "");
+
+        DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>();
+        var reaper = clientMetrics.new ThrottledChannelReaper(delayQueue, "");
+        try {
+            // Add 4 elements to the queue out of order. Add 2 elements with 
the same expire timestamp.
+            ThrottledChannel channel1 = new ThrottledChannel(time, 10, 
callback);
+            ThrottledChannel channel2 = new ThrottledChannel(time, 30, 
callback);
+            ThrottledChannel channel3 = new ThrottledChannel(time, 30, 
callback);
+            ThrottledChannel channel4 = new ThrottledChannel(time, 20, 
callback);
+            delayQueue.add(channel1);
+            delayQueue.add(channel2);
+            delayQueue.add(channel3);
+            delayQueue.add(channel4);
+            assertEquals(4, numCallbacksForStartThrottling);
+
+            for (int i = 1; i <= 3; i++) {
+                time.sleep(10);
+                reaper.doWork();
+                assertEquals(i, numCallbacksForEndThrottling);
+            }
+            reaper.doWork();
+            assertEquals(4, numCallbacksForEndThrottling);
+            assertEquals(0, delayQueue.size());
+            reaper.doWork();
+            assertEquals(4, numCallbacksForEndThrottling);
+        } finally {
+            clientMetrics.shutdown();
+        }
+    }
+
     @Test
     public void testThrottledChannelDelay() {
         ThrottledChannel channel1 = new ThrottledChannel(time, 10, callback);

Reply via email to