This is an automated email from the ASF dual-hosted git repository.

ivandika3 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b5164c39cf HDDS-15338. Fix affinity executor queue assignment skew 
(#10325)
6b5164c39cf is described below

commit 6b5164c39cfb785cc60dec01a6686ac5f8f969d0
Author: Ivan Andika <[email protected]>
AuthorDate: Fri May 22 12:01:04 2026 +0800

    HDDS-15338. Fix affinity executor queue assignment skew (#10325)
    
    Generated-by: Codex (GPT 5.5)
---
 .../FixedThreadPoolWithAffinityExecutor.java       |  2 +-
 .../hadoop/hdds/server/events/TestEventQueue.java  | 46 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
index 07804c2f2e9..ab6017411ec 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
@@ -145,7 +145,7 @@ public void onMessage(EventHandler<P> handler, P message, 
EventPublisher
     // For messages that need to be routed to the same thread need to
     // implement hashCode to match the messages. This should be safe for
     // other messages that implement the native hash.
-    int index = message.hashCode() & (workQueues.size() - 1);
+    int index = Math.floorMod(message.hashCode(), workQueues.size());
     BlockingQueue<Q> queue = workQueues.get(index);
     queue.add((Q) message);
     if (queue instanceof IQueueMetrics) {
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
index 8582455b5ee..5cbd5fe0f37 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -21,8 +21,10 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -123,6 +125,34 @@ public void simpleEventWithFixedThreadPoolExecutor()
     eventExecutor.close();
   }
 
+  @Test
+  public void 
fixedThreadPoolExecutorUsesAllQueuesWithNonPowerOfTwoQueueCount() {
+    Set<Integer> selectedQueues = new HashSet<>();
+    List<BlockingQueue<Integer>> queues = new ArrayList<>();
+    for (int i = 0; i < 10; ++i) {
+      queues.add(new TrackingQueue<>(i, selectedQueues));
+    }
+    Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
+        = new ConcurrentHashMap<>();
+    FixedThreadPoolWithAffinityExecutor<Integer, Integer>
+        executor = new FixedThreadPoolWithAffinityExecutor<>(
+            "non-power-of-two-queue-count", (payload, publisher) -> { },
+            queues, queue, Integer.class,
+            FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(queues),
+            reportExecutorMap);
+
+    try {
+      for (int hash = 0; hash < queues.size(); ++hash) {
+        executor.onMessage((payload, publisher) -> { }, hash, queue);
+      }
+
+      assertThat(selectedQueues).containsExactlyInAnyOrder(
+          0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+    } finally {
+      executor.close();
+    }
+  }
+
   /**
    * Event handler used in tests.
    */
@@ -138,6 +168,22 @@ public void onMessage(Object payload, EventPublisher 
publisher) {
     }
   }
 
+  private static class TrackingQueue<T> extends LinkedBlockingQueue<T> {
+    private final int index;
+    private final Set<Integer> selectedQueues;
+
+    TrackingQueue(int index, Set<Integer> selectedQueues) {
+      this.index = index;
+      this.selectedQueues = selectedQueues;
+    }
+
+    @Override
+    public boolean add(T payload) {
+      selectedQueues.add(index);
+      return super.add(payload);
+    }
+  }
+
   @Test
   public void multipleSubscriber() {
     final long[] result = new long[2];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to