This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 12a60b8cd99 KAFKA-17878 Move ActionQueue to server module (#17602)
12a60b8cd99 is described below
commit 12a60b8cd9989a3f128b26c854a547f0c4dd155b
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Oct 28 17:35:23 2024 +0500
KAFKA-17878 Move ActionQueue to server module (#17602)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../server/builders/ReplicaManagerBuilder.java | 2 +-
.../java/kafka/server/share/DelayedShareFetch.java | 9 +--
.../group/CoordinatorPartitionWriter.scala | 7 ++-
core/src/main/scala/kafka/server/ActionQueue.scala | 66 ----------------------
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 4 +-
.../AbstractCoordinatorConcurrencyTest.scala | 1 +
.../coordinator/group/GroupCoordinatorTest.scala | 3 +-
.../java/org/apache/kafka/server/ActionQueue.java | 33 +++++++++++
.../apache/kafka/server/DelayedActionQueue.java | 51 +++++++++++++++++
10 files changed, 98 insertions(+), 80 deletions(-)
diff --git
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index beb0a25ffed..79dd542f943 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -21,7 +21,6 @@ import kafka.log.LogManager;
import kafka.log.remote.RemoteLogManager;
import kafka.server.AddPartitionsToTxnManager;
import kafka.server.AlterPartitionManager;
-import kafka.server.DelayedActionQueue;
import kafka.server.DelayedDeleteRecords;
import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
@@ -38,6 +37,7 @@ import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.DelayedActionQueue;
import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 73fd699cf31..732381a1976 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -135,12 +135,9 @@ public class DelayedShareFetch extends DelayedOperation {
// then we should check if there is a pending share fetch request
for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite
call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
- replicaManager.addToActionQueue(() -> {
- topicPartitionData.keySet().forEach(topicIdPartition ->
- replicaManager.completeDelayedShareFetchRequest(
- new
DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(),
topicIdPartition.partition())));
- return BoxedUnit.UNIT;
- });
+ replicaManager.addToActionQueue(() ->
topicPartitionData.keySet().forEach(topicIdPartition ->
+ replicaManager.completeDelayedShareFetchRequest(
+ new DelayedShareFetchGroupKey(shareFetchData.groupId(),
topicIdPartition.topicId(), topicIdPartition.partition()))));
}
}
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 0e62168877e..79e0cbe630d 100644
---
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -17,12 +17,13 @@
package kafka.coordinator.group
import kafka.cluster.PartitionListener
-import kafka.server.{ActionQueue, ReplicaManager, defaultError, genericError}
+import kafka.server.{ReplicaManager, defaultError, genericError}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
+import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
VerificationGuard}
@@ -63,8 +64,8 @@ class CoordinatorPartitionWriter(
// We use an action queue which directly executes actions. This is possible
// here because we don't hold any conflicting locks.
private val directActionQueue = new ActionQueue {
- override def add(action: () => Unit): Unit = {
- action()
+ override def add(action: Runnable): Unit = {
+ action.run()
}
override def tryCompleteActions(): Unit = {}
diff --git a/core/src/main/scala/kafka/server/ActionQueue.scala
b/core/src/main/scala/kafka/server/ActionQueue.scala
deleted file mode 100644
index d0791a98633..00000000000
--- a/core/src/main/scala/kafka/server/ActionQueue.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import kafka.utils.Logging
-
-/**
- * The action queue is used to collect actions which need to be executed later.
- */
-trait ActionQueue {
-
- /**
- * add action to this queue.
- * @param action action
- */
- def add(action: () => Unit): Unit
-
- /**
- * try to complete all delayed actions
- */
- def tryCompleteActions(): Unit
-}
-
-/**
- * This queue is used to collect actions which need to be executed later. One
use case is that ReplicaManager#appendRecords
- * produces record changes so we need to check and complete delayed requests.
In order to avoid conflicting locking,
- * we add those actions to this queue and then complete them at the end of
KafkaApis.handle() or DelayedJoin.onExpiration.
- */
-class DelayedActionQueue extends Logging with ActionQueue {
- private val queue = new ConcurrentLinkedQueue[() => Unit]()
-
- def add(action: () => Unit): Unit = queue.add(action)
-
- def tryCompleteActions(): Unit = {
- val maxToComplete = queue.size()
- var count = 0
- var done = false
- while (!done && count < maxToComplete) {
- try {
- val action = queue.poll()
- if (action == null) done = true
- else action()
- } catch {
- case e: Throwable =>
- error("failed to complete delayed actions", e)
- } finally count += 1
- }
- }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index bdd6cc62b69..ff483f10584 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRec
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, NodeToControllerChannelManager}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DelayedActionQueue, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.config.ConfigType
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e8e8eb566cf..ca5acf79b50 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -55,7 +55,7 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel,
Node, TopicIdParti
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
-import org.apache.kafka.server.common
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal,
TopicOptionalIdPartition}
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
@@ -776,7 +776,7 @@ class ReplicaManager(val config: KafkaConfig,
def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
- def addToActionQueue(action: () => Unit): Unit =
defaultActionQueue.add(action)
+ def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
/**
* Append messages to leader replicas of the partition, and wait for them to
be replicated to other replicas;
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index c00e580aa3d..ba64974437a 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
RecordValidationStats}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 10f5345819a..efb9e8e96c9 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -18,7 +18,7 @@
package kafka.coordinator.group
import java.util.{OptionalInt, OptionalLong}
-import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition,
KafkaConfig, KafkaRequestHandler, ReplicaManager}
+import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig,
KafkaRequestHandler, ReplicaManager}
import kafka.utils._
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -36,6 +36,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig,
OffsetAndMetadata}
+import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
diff --git a/server/src/main/java/org/apache/kafka/server/ActionQueue.java
b/server/src/main/java/org/apache/kafka/server/ActionQueue.java
new file mode 100644
index 00000000000..c1f138da007
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ActionQueue.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+/**
+ * The action queue is used to collect actions which need to be executed later.
+ */
+public interface ActionQueue {
+ /**
+ * Add action to this queue.
+ * @param action action
+ */
+ void add(Runnable action);
+
+ /**
+ * Try to complete all delayed actions.
+ */
+ void tryCompleteActions();
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
b/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
new file mode 100644
index 00000000000..cf00dc1dcba
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This queue is used to collect actions which need to be executed later. One
use case is that ReplicaManager#appendRecords
+ * produces record changes, so we need to check and complete delayed requests.
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+public class DelayedActionQueue implements ActionQueue {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DelayedActionQueue.class);
+ private final ConcurrentLinkedQueue<Runnable> queue = new
ConcurrentLinkedQueue<>();
+
+ @Override
+ public void add(Runnable action) {
+ queue.add(action);
+ }
+
+ @Override
+ public void tryCompleteActions() {
+ int maxToComplete = queue.size();
+ for (int count = 0; count < maxToComplete; count++) {
+ try {
+ Runnable action = queue.poll();
+ if (action == null) return;
+ action.run();
+ } catch (Throwable e) {
+ LOGGER.error("failed to complete delayed actions", e);
+ }
+ }
+ }
+}