This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 12a60b8cd99 KAFKA-17878 Move ActionQueue to server module (#17602)
12a60b8cd99 is described below

commit 12a60b8cd9989a3f128b26c854a547f0c4dd155b
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Oct 28 17:35:23 2024 +0500

    KAFKA-17878 Move ActionQueue to server module (#17602)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../server/builders/ReplicaManagerBuilder.java     |  2 +-
 .../java/kafka/server/share/DelayedShareFetch.java |  9 +--
 .../group/CoordinatorPartitionWriter.scala         |  7 ++-
 core/src/main/scala/kafka/server/ActionQueue.scala | 66 ----------------------
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  4 +-
 .../AbstractCoordinatorConcurrencyTest.scala       |  1 +
 .../coordinator/group/GroupCoordinatorTest.scala   |  3 +-
 .../java/org/apache/kafka/server/ActionQueue.java  | 33 +++++++++++
 .../apache/kafka/server/DelayedActionQueue.java    | 51 +++++++++++++++++
 10 files changed, 98 insertions(+), 80 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index beb0a25ffed..79dd542f943 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -21,7 +21,6 @@ import kafka.log.LogManager;
 import kafka.log.remote.RemoteLogManager;
 import kafka.server.AddPartitionsToTxnManager;
 import kafka.server.AlterPartitionManager;
-import kafka.server.DelayedActionQueue;
 import kafka.server.DelayedDeleteRecords;
 import kafka.server.DelayedElectLeader;
 import kafka.server.DelayedFetch;
@@ -38,6 +37,7 @@ import kafka.zk.KafkaZkClient;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.DelayedActionQueue;
 import org.apache.kafka.server.common.DirectoryEventHandler;
 import org.apache.kafka.server.util.Scheduler;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 73fd699cf31..732381a1976 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -135,12 +135,9 @@ public class DelayedShareFetch extends DelayedOperation {
             // then we should check if there is a pending share fetch request 
for the topic-partition and complete it.
             // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
             // we directly call delayedShareFetchPurgatory.checkAndComplete
-            replicaManager.addToActionQueue(() -> {
-                topicPartitionData.keySet().forEach(topicIdPartition ->
-                    replicaManager.completeDelayedShareFetchRequest(
-                        new 
DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), 
topicIdPartition.partition())));
-                return BoxedUnit.UNIT;
-            });
+            replicaManager.addToActionQueue(() -> 
topicPartitionData.keySet().forEach(topicIdPartition ->
+                replicaManager.completeDelayedShareFetchRequest(
+                    new DelayedShareFetchGroupKey(shareFetchData.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
         }
     }
 
diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 0e62168877e..79e0cbe630d 100644
--- 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -17,12 +17,13 @@
 package kafka.coordinator.group
 
 import kafka.cluster.PartitionListener
-import kafka.server.{ActionQueue, ReplicaManager, defaultError, genericError}
+import kafka.server.{ReplicaManager, defaultError, genericError}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter
+import org.apache.kafka.server.ActionQueue
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
VerificationGuard}
 
@@ -63,8 +64,8 @@ class CoordinatorPartitionWriter(
   // We use an action queue which directly executes actions. This is possible
   // here because we don't hold any conflicting locks.
   private val directActionQueue = new ActionQueue {
-    override def add(action: () => Unit): Unit = {
-      action()
+    override def add(action: Runnable): Unit = {
+      action.run()
     }
 
     override def tryCompleteActions(): Unit = {}
diff --git a/core/src/main/scala/kafka/server/ActionQueue.scala 
b/core/src/main/scala/kafka/server/ActionQueue.scala
deleted file mode 100644
index d0791a98633..00000000000
--- a/core/src/main/scala/kafka/server/ActionQueue.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import kafka.utils.Logging
-
-/**
- * The action queue is used to collect actions which need to be executed later.
- */
-trait ActionQueue {
-
-  /**
-   * add action to this queue.
-   * @param action action
-   */
-  def add(action: () => Unit): Unit
-
-  /**
-   * try to complete all delayed actions
-   */
-  def tryCompleteActions(): Unit
-}
-
-/**
- * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
- * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
- * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
- */
-class DelayedActionQueue extends Logging with ActionQueue {
-  private val queue = new ConcurrentLinkedQueue[() => Unit]()
-
-  def add(action: () => Unit): Unit = queue.add(action)
-
-  def tryCompleteActions(): Unit = {
-    val maxToComplete = queue.size()
-    var count = 0
-    var done = false
-    while (!done && count < maxToComplete) {
-      try {
-        val action = queue.poll()
-        if (action == null) done = true
-        else action()
-      } catch {
-        case e: Throwable =>
-          error("failed to complete delayed actions", e)
-      } finally count += 1
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index bdd6cc62b69..ff483f10584 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRec
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
 import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, NodeToControllerChannelManager}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DelayedActionQueue, NodeToControllerChannelManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, TopicIdPartition}
 import org.apache.kafka.server.config.ConfigType
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e8e8eb566cf..ca5acf79b50 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -55,7 +55,7 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel, 
Node, TopicIdParti
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 import org.apache.kafka.metadata.LeaderAndIsr
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
-import org.apache.kafka.server.common
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
 import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, 
TopicOptionalIdPartition}
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
@@ -776,7 +776,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
 
-  def addToActionQueue(action: () => Unit): Unit = 
defaultActionQueue.add(action)
+  def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
 
   /**
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index c00e580aa3d..ba64974437a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordValidationStats}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.ActionQueue
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.util.timer.{MockTimer, Timer}
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 10f5345819a..efb9e8e96c9 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -18,7 +18,7 @@
 package kafka.coordinator.group
 
 import java.util.{OptionalInt, OptionalLong}
-import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, 
KafkaConfig, KafkaRequestHandler, ReplicaManager}
+import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, 
KafkaRequestHandler, ReplicaManager}
 import kafka.utils._
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -36,6 +36,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig, 
OffsetAndMetadata}
+import org.apache.kafka.server.ActionQueue
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
diff --git a/server/src/main/java/org/apache/kafka/server/ActionQueue.java 
b/server/src/main/java/org/apache/kafka/server/ActionQueue.java
new file mode 100644
index 00000000000..c1f138da007
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ActionQueue.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+/**
+ * The action queue is used to collect actions which need to be executed later.
+ */
+public interface ActionQueue {
+    /**
+     * Add action to this queue.
+     * @param action action
+     */
+    void add(Runnable action);
+
+    /**
+     * Try to complete all delayed actions.
+     */
+    void tryCompleteActions();
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java 
b/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
new file mode 100644
index 00000000000..cf00dc1dcba
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes, so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+public class DelayedActionQueue implements ActionQueue {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DelayedActionQueue.class);
+    private final ConcurrentLinkedQueue<Runnable> queue = new 
ConcurrentLinkedQueue<>();
+
+    @Override
+    public void add(Runnable action) {
+        queue.add(action);
+    }
+
+    @Override
+    public void tryCompleteActions() {
+        int maxToComplete = queue.size();
+        for (int count = 0; count < maxToComplete; count++) {
+            try {
+                Runnable action = queue.poll();
+                if (action == null) return;
+                action.run();
+            } catch (Throwable e) {
+                LOGGER.error("failed to complete delayed actions", e);
+            }
+        }
+    }
+}

Reply via email to