This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8500e4d8af KAFKA-19849 Move ThrottledChannelExpirationTest to server
module (#21328)
b8500e4d8af is described below
commit b8500e4d8af08be5c8fe55cef65085cbcccaba81
Author: Lan Ding <[email protected]>
AuthorDate: Tue Jan 20 13:50:37 2026 +0800
KAFKA-19849 Move ThrottledChannelExpirationTest to server module (#21328)
JIRA link:
[KAFKA-19849](https://issues.apache.org/jira/browse/KAFKA-19849) Rewrite
and migrate `ThrottledChannelExpirationTest` to `kafka.server`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../server/ThrottledChannelExpirationTest.scala | 85 ----------------------
...st.java => ThrottledChannelExpirationTest.java} | 47 +++++++++++-
2 files changed, 45 insertions(+), 87 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
deleted file mode 100644
index 2e2b32ee5ba..00000000000
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-
-import java.util.Collections
-import java.util.concurrent.DelayQueue
-import org.apache.kafka.common.metrics.MetricConfig
-import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.server.config.ClientQuotaManagerConfig
-import org.apache.kafka.server.quota.{ClientQuotaManager, QuotaType,
ThrottleCallback, ThrottledChannel}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test}
-
-class ThrottledChannelExpirationTest {
- private val time = new MockTime
- private var numCallbacksForStartThrottling: Int = 0
- private var numCallbacksForEndThrottling: Int = 0
- private val metrics = new org.apache.kafka.common.metrics.Metrics(new
MetricConfig(),
-
Collections.emptyList(),
- time)
- private val callback = new ThrottleCallback {
- override def startThrottling(): Unit = {
- numCallbacksForStartThrottling += 1
- }
-
- override def endThrottling(): Unit = {
- numCallbacksForEndThrottling += 1
- }
- }
-
- @BeforeEach
- def beforeMethod(): Unit = {
- numCallbacksForStartThrottling = 0
- numCallbacksForEndThrottling = 0
- }
-
- @Test
- def testCallbackInvocationAfterExpiration(): Unit = {
- val clientMetrics = new ClientQuotaManager(new ClientQuotaManagerConfig(),
metrics, QuotaType.PRODUCE, time, "")
-
- val delayQueue = new DelayQueue[ThrottledChannel]()
- val reaper = new clientMetrics.ThrottledChannelReaper(delayQueue, "")
- try {
- // Add 4 elements to the queue out of order. Add 2 elements with the
same expire timestamp.
- val channel1 = new ThrottledChannel(time, 10, callback)
- val channel2 = new ThrottledChannel(time, 30, callback)
- val channel3 = new ThrottledChannel(time, 30, callback)
- val channel4 = new ThrottledChannel(time, 20, callback)
- delayQueue.add(channel1)
- delayQueue.add(channel2)
- delayQueue.add(channel3)
- delayQueue.add(channel4)
- assertEquals(4, numCallbacksForStartThrottling)
-
- for (itr <- 1 to 3) {
- time.sleep(10)
- reaper.doWork()
- assertEquals(itr, numCallbacksForEndThrottling)
- }
- reaper.doWork()
- assertEquals(4, numCallbacksForEndThrottling)
- assertEquals(0, delayQueue.size())
- reaper.doWork()
- assertEquals(4, numCallbacksForEndThrottling)
- } finally {
- clientMetrics.shutdown()
- }
- }
-}
diff --git
a/server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelTest.java
b/server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelExpirationTest.java
similarity index 50%
rename from
server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelTest.java
rename to
server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelExpirationTest.java
index 64f37bdc378..8766fcdbba0 100644
---
a/server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelTest.java
+++
b/server/src/test/java/org/apache/kafka/server/quota/ThrottledChannelExpirationTest.java
@@ -14,29 +14,72 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.server.quota;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
import org.junit.jupiter.api.Test;
+import java.util.List;
+import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class ThrottledChannelTest {
-
+public class ThrottledChannelExpirationTest {
private final MockTime time = new MockTime();
+ private final Metrics metrics = new Metrics(new MetricConfig(), List.of(),
time);
+ private int numCallbacksForStartThrottling = 0;
+ private int numCallbacksForEndThrottling = 0;
private final ThrottleCallback callback = new ThrottleCallback() {
@Override
public void startThrottling() {
+ numCallbacksForStartThrottling++;
}
@Override
public void endThrottling() {
+ numCallbacksForEndThrottling++;
}
};
+ @Test
+ public void testCallbackInvocationAfterExpiration() {
+ ClientQuotaManager clientMetrics = new ClientQuotaManager(new
ClientQuotaManagerConfig(), metrics, QuotaType.PRODUCE, time, "");
+
+ DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>();
+ var reaper = clientMetrics.new ThrottledChannelReaper(delayQueue, "");
+ try {
+ // Add 4 elements to the queue out of order. Add 2 elements with
the same expire timestamp.
+ ThrottledChannel channel1 = new ThrottledChannel(time, 10,
callback);
+ ThrottledChannel channel2 = new ThrottledChannel(time, 30,
callback);
+ ThrottledChannel channel3 = new ThrottledChannel(time, 30,
callback);
+ ThrottledChannel channel4 = new ThrottledChannel(time, 20,
callback);
+ delayQueue.add(channel1);
+ delayQueue.add(channel2);
+ delayQueue.add(channel3);
+ delayQueue.add(channel4);
+ assertEquals(4, numCallbacksForStartThrottling);
+
+ for (int i = 1; i <= 3; i++) {
+ time.sleep(10);
+ reaper.doWork();
+ assertEquals(i, numCallbacksForEndThrottling);
+ }
+ reaper.doWork();
+ assertEquals(4, numCallbacksForEndThrottling);
+ assertEquals(0, delayQueue.size());
+ reaper.doWork();
+ assertEquals(4, numCallbacksForEndThrottling);
+ } finally {
+ clientMetrics.shutdown();
+ }
+ }
+
@Test
public void testThrottledChannelDelay() {
ThrottledChannel channel1 = new ThrottledChannel(time, 10, callback);