This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bd1f02b2bec MINOR: Move MockTimer to server-common (#13954)
bd1f02b2bec is described below
commit bd1f02b2beca69a0937bb3df6f1e0ebcc3d9bfeb
Author: David Jacot <[email protected]>
AuthorDate: Thu Jul 6 14:56:05 2023 +0200
MINOR: Move MockTimer to server-common (#13954)
This patch rewrites MockTimer in Java and moves it from core to
server-common. This continues the work started in
https://github.com/apache/kafka/pull/13820.
Reviewers: Divij Vaidya <[email protected]>
---
checkstyle/import-control-server-common.xml | 8 +-
.../AbstractCoordinatorConcurrencyTest.scala | 2 +-
.../coordinator/group/GroupCoordinatorTest.scala | 2 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../scala/unit/kafka/utils/timer/MockTimer.scala | 72 -----------------
.../apache/kafka/server/util/timer/MockTimer.java | 89 ++++++++++++++++++++++
6 files changed, 98 insertions(+), 77 deletions(-)
diff --git a/checkstyle/import-control-server-common.xml
b/checkstyle/import-control-server-common.xml
index 350f2820968..2e65ec601a3 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -82,13 +82,17 @@
<allow pkg="org.apache.kafka.server.authorizer" />
</subpackage>
- <!-- InterBrokerSendThread uses some clients classes that are not part
of the public -->
- <!-- API but are still relatively common -->
<subpackage name="util">
+ <!-- InterBrokerSendThread uses some clients classes that are not
part of the public -->
+ <!-- API but are still relatively common -->
<allow class="org.apache.kafka.clients.ClientRequest" />
<allow class="org.apache.kafka.clients.ClientResponse" />
<allow class="org.apache.kafka.clients.KafkaClient" />
<allow class="org.apache.kafka.clients.RequestCompletionHandler" />
+
+ <subpackage name="timer">
+ <allow class="org.apache.kafka.server.util.MockTime" />
+ </subpackage>
</subpackage>
</subpackage>
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 23961be7fb8..255b8dbb866 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -25,12 +25,12 @@ import
kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.log.UnifiedLog
import kafka.server._
import kafka.utils._
-import kafka.utils.timer.MockTimer
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
RecordConversionStats}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index ee11fbd8a3d..787e76d6aef 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -21,7 +21,6 @@ import java.util.{Optional, OptionalInt}
import kafka.common.OffsetAndMetadata
import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig,
ReplicaManager, RequestLocal}
import kafka.utils._
-import kafka.utils.timer.MockTimer
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
@@ -37,6 +36,7 @@ import
org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
+import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.AppendOrigin
import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index bcb7cfb327f..e8cad3e942e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -32,7 +32,6 @@ import kafka.log._
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
import kafka.server.epoch.util.MockBlockingSender
-import kafka.utils.timer.MockTimer
import kafka.utils.{Pool, TestUtils}
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.errors.{InvalidPidMappingException,
KafkaStorageException}
@@ -71,6 +70,7 @@ import kafka.log.remote.RemoteLogManager
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction}
import
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
MetadataResponseTopic}
+import org.apache.kafka.server.util.timer.MockTimer
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
deleted file mode 100644
index cee7dc097f1..00000000000
--- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.utils.timer
-
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.server.util.timer.{Timer, TimerTask, TimerTaskEntry}
-
-import scala.collection.mutable
-
-class MockTimer(val time: MockTime = new MockTime) extends Timer {
-
- private val taskQueue = mutable.PriorityQueue.empty[TimerTaskEntry](new
Ordering[TimerTaskEntry] {
- override def compare(x: TimerTaskEntry, y: TimerTaskEntry): Int =
java.lang.Long.compare(x.expirationMs, y.expirationMs)
- }.reverse)
-
- def add(timerTask: TimerTask): Unit = {
- if (timerTask.delayMs <= 0)
- timerTask.run()
- else {
- taskQueue synchronized {
- taskQueue.enqueue(new TimerTaskEntry(timerTask, timerTask.delayMs +
time.milliseconds))
- }
- }
- }
-
- def advanceClock(timeoutMs: Long): Boolean = {
- time.sleep(timeoutMs)
-
- var executed = false
- val now = time.milliseconds
-
- var hasMore = true
- while (hasMore) {
- hasMore = false
- val head = taskQueue synchronized {
- if (taskQueue.nonEmpty && now > taskQueue.head.expirationMs) {
- val entry = Some(taskQueue.dequeue())
- hasMore = taskQueue.nonEmpty
- entry
- } else
- None
- }
- head.foreach { taskEntry =>
- if (!taskEntry.cancelled) {
- val task = taskEntry.timerTask
- task.run()
- executed = true
- }
- }
- }
- executed
- }
-
- def size: Int = taskQueue.size
-
- override def close(): Unit = {}
-
-}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
new file mode 100644
index 00000000000..460fd56690f
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.server.util.MockTime;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+public class MockTimer implements Timer {
+ private final MockTime time;
+ private final PriorityQueue<TimerTaskEntry> taskQueue = new
PriorityQueue<>(
+ Comparator.comparingLong(entry -> entry.expirationMs)
+ );
+
+ public MockTimer() {
+ this(new MockTime());
+ }
+
+ public MockTimer(MockTime time) {
+ this.time = time;
+ }
+
+ @Override
+ public void add(TimerTask timerTask) {
+ if (timerTask.delayMs <= 0) {
+ timerTask.run();
+ } else {
+ synchronized (taskQueue) {
+ taskQueue.add(new TimerTaskEntry(timerTask, timerTask.delayMs
+ time.milliseconds()));
+ }
+ }
+ }
+
+ @Override
+ public boolean advanceClock(long timeoutMs) throws InterruptedException {
+ time.sleep(timeoutMs);
+
+ final long now = time.milliseconds();
+ boolean executed = false;
+ boolean hasMore = true;
+
+ while (hasMore) {
+ hasMore = false;
+ TimerTaskEntry taskEntry = null;
+
+ synchronized (taskQueue) {
+ if (!taskQueue.isEmpty() && now >
taskQueue.peek().expirationMs) {
+ taskEntry = taskQueue.poll();
+ hasMore = !taskQueue.isEmpty();
+ }
+ }
+
+ if (taskEntry != null) {
+ if (!taskEntry.cancelled()) {
+ taskEntry.timerTask.run();
+ executed = true;
+ }
+ }
+ }
+
+ return executed;
+ }
+
+ public MockTime time() {
+ return time;
+ }
+
+ public int size() {
+ return taskQueue.size();
+ }
+
+ @Override
+ public void close() throws Exception {}
+}