This is an automated email from the ASF dual-hosted git repository.
ivandika3 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6b5164c39cf HDDS-15338. Fix affinity executor queue assignment skew
(#10325)
6b5164c39cf is described below
commit 6b5164c39cfb785cc60dec01a6686ac5f8f969d0
Author: Ivan Andika <[email protected]>
AuthorDate: Fri May 22 12:01:04 2026 +0800
HDDS-15338. Fix affinity executor queue assignment skew (#10325)
Generated-by: Codex (GPT 5.5)
---
.../FixedThreadPoolWithAffinityExecutor.java | 2 +-
.../hadoop/hdds/server/events/TestEventQueue.java | 46 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
index 07804c2f2e9..ab6017411ec 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
@@ -145,7 +145,7 @@ public void onMessage(EventHandler<P> handler, P message,
EventPublisher
// For messages that need to be routed to the same thread need to
// implement hashCode to match the messages. This should be safe for
// other messages that implement the native hash.
- int index = message.hashCode() & (workQueues.size() - 1);
+ int index = Math.floorMod(message.hashCode(), workQueues.size());
BlockingQueue<Q> queue = workQueues.get(index);
queue.add((Q) message);
if (queue instanceof IQueueMetrics) {
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
index 8582455b5ee..5cbd5fe0f37 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -21,8 +21,10 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -123,6 +125,34 @@ public void simpleEventWithFixedThreadPoolExecutor()
eventExecutor.close();
}
+ @Test
+ public void
fixedThreadPoolExecutorUsesAllQueuesWithNonPowerOfTwoQueueCount() {
+ Set<Integer> selectedQueues = new HashSet<>();
+ List<BlockingQueue<Integer>> queues = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ queues.add(new TrackingQueue<>(i, selectedQueues));
+ }
+ Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
+ = new ConcurrentHashMap<>();
+ FixedThreadPoolWithAffinityExecutor<Integer, Integer>
+ executor = new FixedThreadPoolWithAffinityExecutor<>(
+ "non-power-of-two-queue-count", (payload, publisher) -> { },
+ queues, queue, Integer.class,
+ FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(queues),
+ reportExecutorMap);
+
+ try {
+ for (int hash = 0; hash < queues.size(); ++hash) {
+ executor.onMessage((payload, publisher) -> { }, hash, queue);
+ }
+
+ assertThat(selectedQueues).containsExactlyInAnyOrder(
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ } finally {
+ executor.close();
+ }
+ }
+
/**
* Event handler used in tests.
*/
@@ -138,6 +168,22 @@ public void onMessage(Object payload, EventPublisher
publisher) {
}
}
+ private static class TrackingQueue<T> extends LinkedBlockingQueue<T> {
+ private final int index;
+ private final Set<Integer> selectedQueues;
+
+ TrackingQueue(int index, Set<Integer> selectedQueues) {
+ this.index = index;
+ this.selectedQueues = selectedQueues;
+ }
+
+ @Override
+ public boolean add(T payload) {
+ selectedQueues.add(index);
+ return super.add(payload);
+ }
+ }
+
@Test
public void multipleSubscriber() {
final long[] result = new long[2];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]