This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd1f02b2bec MINOR: Move MockTimer to server-common (#13954)
bd1f02b2bec is described below

commit bd1f02b2beca69a0937bb3df6f1e0ebcc3d9bfeb
Author: David Jacot <[email protected]>
AuthorDate: Thu Jul 6 14:56:05 2023 +0200

    MINOR: Move MockTimer to server-common (#13954)
    
    This patch rewrites MockTimer in Java and moves it from core to 
server-common. This continues the work started in 
https://github.com/apache/kafka/pull/13820.
    
    Reviewers: Divij Vaidya <[email protected]>
---
 checkstyle/import-control-server-common.xml        |  8 +-
 .../AbstractCoordinatorConcurrencyTest.scala       |  2 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  2 +-
 .../scala/unit/kafka/utils/timer/MockTimer.scala   | 72 -----------------
 .../apache/kafka/server/util/timer/MockTimer.java  | 89 ++++++++++++++++++++++
 6 files changed, 98 insertions(+), 77 deletions(-)

diff --git a/checkstyle/import-control-server-common.xml 
b/checkstyle/import-control-server-common.xml
index 350f2820968..2e65ec601a3 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -82,13 +82,17 @@
             <allow pkg="org.apache.kafka.server.authorizer" />
         </subpackage>
 
-        <!-- InterBrokerSendThread uses some clients classes that are not part 
of the public -->
-        <!-- API but are still relatively common -->
         <subpackage name="util">
+            <!-- InterBrokerSendThread uses some clients classes that are not 
part of the public -->
+            <!-- API but are still relatively common -->
             <allow class="org.apache.kafka.clients.ClientRequest" />
             <allow class="org.apache.kafka.clients.ClientResponse" />
             <allow class="org.apache.kafka.clients.KafkaClient" />
             <allow class="org.apache.kafka.clients.RequestCompletionHandler" />
+
+            <subpackage name="timer">
+                <allow class="org.apache.kafka.server.util.MockTime" />
+            </subpackage>
         </subpackage>
     </subpackage>
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 23961be7fb8..255b8dbb866 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -25,12 +25,12 @@ import 
kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.log.UnifiedLog
 import kafka.server._
 import kafka.utils._
-import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordConversionStats}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index ee11fbd8a3d..787e76d6aef 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -21,7 +21,6 @@ import java.util.{Optional, OptionalInt}
 import kafka.common.OffsetAndMetadata
 import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, 
ReplicaManager, RequestLocal}
 import kafka.utils._
-import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
@@ -37,6 +36,7 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
+import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.AppendOrigin
 import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index bcb7cfb327f..e8cad3e942e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -32,7 +32,6 @@ import kafka.log._
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
 import kafka.server.epoch.util.MockBlockingSender
-import kafka.utils.timer.MockTimer
 import kafka.utils.{Pool, TestUtils}
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.errors.{InvalidPidMappingException, 
KafkaStorageException}
@@ -71,6 +70,7 @@ import kafka.log.remote.RemoteLogManager
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
 AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction}
 import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
+import org.apache.kafka.server.util.timer.MockTimer
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala 
b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
deleted file mode 100644
index cee7dc097f1..00000000000
--- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.utils.timer
-
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.server.util.timer.{Timer, TimerTask, TimerTaskEntry}
-
-import scala.collection.mutable
-
-class MockTimer(val time: MockTime = new MockTime) extends Timer {
-
-  private val taskQueue = mutable.PriorityQueue.empty[TimerTaskEntry](new 
Ordering[TimerTaskEntry] {
-    override def compare(x: TimerTaskEntry, y: TimerTaskEntry): Int = 
java.lang.Long.compare(x.expirationMs, y.expirationMs)
-  }.reverse)
-
-  def add(timerTask: TimerTask): Unit = {
-    if (timerTask.delayMs <= 0)
-      timerTask.run()
-    else {
-      taskQueue synchronized {
-        taskQueue.enqueue(new TimerTaskEntry(timerTask, timerTask.delayMs + 
time.milliseconds))
-      }
-    }
-  }
-
-  def advanceClock(timeoutMs: Long): Boolean = {
-    time.sleep(timeoutMs)
-
-    var executed = false
-    val now = time.milliseconds
-
-    var hasMore = true
-    while (hasMore) {
-      hasMore = false
-      val head = taskQueue synchronized {
-        if (taskQueue.nonEmpty && now > taskQueue.head.expirationMs) {
-          val entry = Some(taskQueue.dequeue())
-          hasMore = taskQueue.nonEmpty
-          entry
-        } else
-          None
-      }
-      head.foreach { taskEntry =>
-        if (!taskEntry.cancelled) {
-          val task = taskEntry.timerTask
-          task.run()
-          executed = true
-        }
-      }
-    }
-    executed
-  }
-
-  def size: Int = taskQueue.size
-
-  override def close(): Unit = {}
-
-}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
new file mode 100644
index 00000000000..460fd56690f
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.server.util.MockTime;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+public class MockTimer implements Timer {
+    private final MockTime time;
+    private final PriorityQueue<TimerTaskEntry> taskQueue = new 
PriorityQueue<>(
+        Comparator.comparingLong(entry -> entry.expirationMs)
+    );
+
+    public MockTimer() {
+        this(new MockTime());
+    }
+
+    public MockTimer(MockTime time) {
+        this.time = time;
+    }
+
+    @Override
+    public void add(TimerTask timerTask) {
+        if (timerTask.delayMs <= 0) {
+            timerTask.run();
+        } else {
+            synchronized (taskQueue) {
+                taskQueue.add(new TimerTaskEntry(timerTask, timerTask.delayMs 
+ time.milliseconds()));
+            }
+        }
+    }
+
+    @Override
+    public boolean advanceClock(long timeoutMs) throws InterruptedException {
+        time.sleep(timeoutMs);
+
+        final long now = time.milliseconds();
+        boolean executed = false;
+        boolean hasMore = true;
+
+        while (hasMore) {
+            hasMore = false;
+            TimerTaskEntry taskEntry = null;
+
+            synchronized (taskQueue) {
+                if (!taskQueue.isEmpty() && now > 
taskQueue.peek().expirationMs) {
+                    taskEntry = taskQueue.poll();
+                    hasMore = !taskQueue.isEmpty();
+                }
+            }
+
+            if (taskEntry != null) {
+                if (!taskEntry.cancelled()) {
+                    taskEntry.timerTask.run();
+                    executed = true;
+                }
+            }
+        }
+
+        return executed;
+    }
+
+    public MockTime time() {
+        return time;
+    }
+
+    public int size() {
+        return taskQueue.size();
+    }
+
+    @Override
+    public void close() throws Exception {}
+}

Reply via email to