This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ac416e38e0b HDFS-17372. KeyUpdateCommand should be processed at high 
priority level to avoid access key not update in time when meet huge many 
commands. (#6530). Contributed by  Haobo Zhang.
ac416e38e0b is described below

commit ac416e38e0b149ad0f3de7ac63faf22c04027249
Author: hfutatzhanghb <[email protected]>
AuthorDate: Fri Feb 13 16:43:23 2026 +0800

    HDFS-17372. KeyUpdateCommand should be processed at high priority level to 
avoid access key not update in time when meet huge many commands. (#6530). 
Contributed by  Haobo Zhang.
    
    Signed-off-by: He Xiaoqiao <[email protected]>
---
 .../hdfs/server/datanode/BPServiceActor.java       | 34 ++++++++++++++++++++--
 1 file changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 2ceca34edd9..9af3c39b33d 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -36,7 +36,7 @@
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -62,6 +62,7 @@
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -741,7 +742,18 @@ private void offerService() throws Exception {
             if (state == HAServiceState.ACTIVE) {
               handleRollingUpgradeStatus(resp);
             }
-            commandProcessingThread.enqueue(resp.getCommands());
+            DatanodeCommand[] cmds = resp.getCommands();
+            if (cmds != null && cmds.length != 0) {
+              int length = cmds.length;
+              for (int i = length - 1; i >= 0; i--) {
+                if (cmds[i] instanceof KeyUpdateCommand) {
+                  commandProcessingThread.enqueueFirst(cmds[i]);
+                  cmds[i] = null;
+                  break;
+                }
+              }
+              commandProcessingThread.enqueue(cmds);
+            }
             isSlownode = resp.getIsSlownode();
           }
         }
@@ -1392,7 +1404,7 @@ class CommandProcessingThread extends 
SubjectInheritingThread {
     CommandProcessingThread(BPServiceActor actor) {
       super("Command processor");
       this.actor = actor;
-      this.queue = new LinkedBlockingQueue<>();
+      this.queue = new LinkedBlockingDeque<>();
       setDaemon(true);
     }
 
@@ -1479,6 +1491,22 @@ void enqueue(DatanodeCommand cmd) throws 
InterruptedException {
       dn.getMetrics().incrActorCmdQueueLength(1);
     }
 
+    /**
+     * Enqueue DatanodeCommand to the head of queue.
+     * @param cmd
+     * @throws InterruptedException
+     */
+    void enqueueFirst(DatanodeCommand cmd) throws InterruptedException {
+      if (cmd == null) {
+        return;
+      }
+      ((LinkedBlockingDeque<Runnable>) queue).putFirst(
+          () -> processCommand(new DatanodeCommand[]{cmd}));
+
+      LOG.info("Enqueue command: {} to the head of queue", cmd);
+      dn.getMetrics().incrActorCmdQueueLength(1);
+    }
+
     void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
       if (cmds == null) {
         return;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to