This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ac416e38e0b HDFS-17372. KeyUpdateCommand should be processed at high
priority level to avoid access key not update in time when meet huge many
commands. (#6530). Contributed by Haobo Zhang.
ac416e38e0b is described below
commit ac416e38e0b149ad0f3de7ac63faf22c04027249
Author: hfutatzhanghb <[email protected]>
AuthorDate: Fri Feb 13 16:43:23 2026 +0800
HDFS-17372. KeyUpdateCommand should be processed at high priority level to
avoid access key not update in time when meet huge many commands. (#6530).
Contributed by Haobo Zhang.
Signed-off-by: He Xiaoqiao <[email protected]>
---
.../hdfs/server/datanode/BPServiceActor.java | 34 ++++++++++++++++++++--
1 file changed, 31 insertions(+), 3 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 2ceca34edd9..9af3c39b33d 100755
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -36,7 +36,7 @@
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -62,6 +62,7 @@
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -741,7 +742,18 @@ private void offerService() throws Exception {
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
- commandProcessingThread.enqueue(resp.getCommands());
+ DatanodeCommand[] cmds = resp.getCommands();
+ if (cmds != null && cmds.length != 0) {
+ int length = cmds.length;
+ for (int i = length - 1; i >= 0; i--) {
+ if (cmds[i] instanceof KeyUpdateCommand) {
+ commandProcessingThread.enqueueFirst(cmds[i]);
+ cmds[i] = null;
+ break;
+ }
+ }
+ commandProcessingThread.enqueue(cmds);
+ }
isSlownode = resp.getIsSlownode();
}
}
@@ -1392,7 +1404,7 @@ class CommandProcessingThread extends
SubjectInheritingThread {
CommandProcessingThread(BPServiceActor actor) {
super("Command processor");
this.actor = actor;
- this.queue = new LinkedBlockingQueue<>();
+ this.queue = new LinkedBlockingDeque<>();
setDaemon(true);
}
@@ -1479,6 +1491,22 @@ void enqueue(DatanodeCommand cmd) throws
InterruptedException {
dn.getMetrics().incrActorCmdQueueLength(1);
}
+ /**
+ * Enqueue DatanodeCommand to the head of queue.
+ * @param cmd
+ * @throws InterruptedException
+ */
+ void enqueueFirst(DatanodeCommand cmd) throws InterruptedException {
+ if (cmd == null) {
+ return;
+ }
+ ((LinkedBlockingDeque<Runnable>) queue).putFirst(
+ () -> processCommand(new DatanodeCommand[]{cmd}));
+
+ LOG.info("Enqueue command: {} to the head of queue", cmd);
+ dn.getMetrics().incrActorCmdQueueLength(1);
+ }
+
void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
if (cmds == null) {
return;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]