This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch branch-panama
in repository https://gitbox.apache.org/repos/asf/artemis-native.git

commit ca8cf7fd1b0abf74d99965f86e3874e458049f88
Author: mayankkunwar <[email protected]>
AuthorDate: Tue Apr 28 13:08:06 2026 +0100

    removing hashMap registry
---
 .../artemis/nativo/jlibaio/LibaioContext.java      |  6 +--
 .../nativo/jlibaio/ffm/CallbackRegistry.java       | 58 ----------------------
 .../nativo/jlibaio/ffm/FFMNativeHelper.java        | 57 +++++++++++----------
 .../artemis/nativo/jlibaio/ffm/IOControl.java      | 26 +++++++++-
 4 files changed, 58 insertions(+), 89 deletions(-)

diff --git 
a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java 
b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
index 78e3f7c..355708f 100644
--- 
a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
+++ 
b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java
@@ -150,7 +150,7 @@ public class LibaioContext<Callback extends SubmitInfo> 
implements Closeable {
    /**
     * the native ioContext including the structure created.
     */
-   private final IOControl ioContext;
+   private final IOControl<Callback> ioContext;
 
    private final AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -376,7 +376,7 @@ public class LibaioContext<Callback extends SubmitInfo> 
implements Closeable {
    /**
     * This is the queue for libaio, initialized with queueSize.
     */
-   private IOControl newContext(int queueSize) {
+   private IOControl<Callback> newContext(int queueSize) {
       return this.ffmNativeHelper.newContext(queueSize);
    }
 
@@ -464,7 +464,7 @@ public class LibaioContext<Callback extends SubmitInfo> 
implements Closeable {
    /**
     * This method will block as long as the context is open.
     */
-   void blockedPoll(IOControl ioControl, boolean useFdatasync) {
+   void blockedPoll(IOControl<Callback> ioControl, boolean useFdatasync) {
       this.ffmNativeHelper.blockedPoll(ioControl, useFdatasync);
    }
 
diff --git 
a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/CallbackRegistry.java
 
b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/CallbackRegistry.java
deleted file mode 100644
index 8d86038..0000000
--- 
a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/CallbackRegistry.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.nativo.jlibaio.ffm;
-
-import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class CallbackRegistry<Callback extends SubmitInfo> {
-//    private static final Logger logger = 
LoggerFactory.getLogger(CallbackRegistry.class);
-    private final ConcurrentHashMap<Long, Callback> registry = new 
ConcurrentHashMap<>();
-    private final AtomicLong idGenerator = new AtomicLong(0);
-
-    /*
-    * <p>
-    * This method is thread-safe and lock-free. It avoids returning an ID of 
-1 and 0,
-    * as -1 and 0 are reserved as a NULL/Invalid marker in the native libaio 
layer.
-    *
-    * @param callback the I/O completion handler to register.
-    * @return a unique identifier (other than -1 or 0) for this callback.
-    * */
-    public long register(Callback callback) {
-        Objects.requireNonNull(callback, "Callbacks cannot be null");
-        long id;
-        do {
-            id = idGenerator.incrementAndGet();
-        } while (id == -1 || id == 0);
-        registry.put(id, callback);
-//        logger.debug("CallbackRegistry::register id = {}, callback = {}", 
id, callback);
-        return id;
-    }
-
-    public Callback findCallbackById(long id) {
-        return registry.get(id);
-    }
-
-    public void removeCallbackById(long id) {
-        this.registry.remove(id);
-    }
-}
diff --git 
a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java
 
b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java
index 9101936..79eed7f 100644
--- 
a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java
+++ 
b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java
@@ -135,12 +135,9 @@ public class FFMNativeHelper<Callback extends SubmitInfo> {
         }
     }
 
-    private final CallbackRegistry<Callback> registry;
-
     private final ReleaseCallback releaseCallback;
 
     public FFMNativeHelper(ReleaseCallback releaseCallback) {
-        this.registry = new CallbackRegistry<>();
         this.releaseCallback = releaseCallback;
     }
 
@@ -319,10 +316,10 @@ public class FFMNativeHelper<Callback extends SubmitInfo> 
{
         return forceSysCall.get() || !RING_REAPER;
     }
 
-    public IOControl newContext(int queueSize) {
+    public IOControl<Callback> newContext(int queueSize) {
 //        logger.debug("Initializing context with QueueSize={}", queueSize);
 
-        IOControl ioControl = new IOControl();
+        IOControl<Callback> ioControl = new IOControl<>();
         try {
             MemorySegment ioContext = ioQueueInit(queueSize);
             ioControl.setIoContext(ioContext);
@@ -348,7 +345,7 @@ public class FFMNativeHelper<Callback extends SubmitInfo> {
                     throw new OutOfMemoryError(
                             String.format("Arena memory allocation failed: 
iocb[%d/%d]", i, queueSize));
                 }
-                IOCBInit.setAioData(iocb, 0L);
+                IOCBInit.setAioData(iocb, i);
                 iocbPool[i] = iocb;
             }
             ioControl.setIocbPool(iocbPool);
@@ -430,7 +427,8 @@ public class FFMNativeHelper<Callback extends SubmitInfo> {
                 throw new IOException("Not enough space in libaio queue during 
shutdown");
             }
             ioPrepPOp(dumbIocb, DUMB_WRITE_HANDLER, MemorySegment.NULL, 0L, 
0L, 1);
-            IOCBInit.setAioData(dumbIocb, -1L);
+            int iocbId = (int) IOCBInit.getAioData(dumbIocb);
+            ioControl.getIocbState().set(iocbId, -1);
 
             if(!submit(ioControl, dumbIocb)) {
 //                logger.warn("deleteContext: submit failed: Continuing 
cleanup");
@@ -631,21 +629,24 @@ public class FFMNativeHelper<Callback extends SubmitInfo> 
{
             throw new IOException("IOCB pool exhausted (used=" + 
ioControl.used() +
                     "/queueSize=" + ioControl.queueSize() + ")");
         }
-//        logger.trace("submitWrite called!");
-        long callbackId = registry.register(callback);
+        int callbackId = (int) IOCBInit.getAioData(iocb);
+//        logger.trace("submitWrite called! callbackId: {}", callbackId);
         boolean submitted = false;
         try {
+            if (!ioControl.getIocbState().compareAndSet(callbackId, 0, 1)) {
+                throw new IOException("submitWrite failed: callbackId=" + 
callbackId + " already in use");
+            }
+            ioControl.addCallback(callbackId, callback);
             bufferWrite.clear();
             ioPrepPOp(iocb, fd, MemorySegment.ofBuffer(bufferWrite), size, 
position, 1);
 
-            IOCBInit.setAioData(iocb, callbackId);
             submit(ioControl, iocb);
             submitted = true;
         } catch (Throwable e) {
             throw new IOException("submitWrite failed", e);
         } finally {
             if(!submitted) {
-                registry.removeCallbackById(callbackId);
+                ioControl.takeCallback(callbackId);
             }
         }
     }
@@ -690,25 +691,28 @@ public class FFMNativeHelper<Callback extends SubmitInfo> 
{
         }
 
 //        logger.trace("submitRead called!");
-        long callbackId = registry.register(callback);
+        long callbackId = IOCBInit.getAioData(iocb);
         boolean submitted = false;
         try {
+            if (!ioControl.getIocbState().compareAndSet((int) callbackId, 0, 
1)) {
+                throw new IOException("submitRead failed: callbackId=" + 
callbackId + " already in use");
+            }
+            ioControl.addCallback((int) callbackId, callback);
             bufferWrite.clear();
             ioPrepPOp(iocb, fd, MemorySegment.ofBuffer(bufferWrite), size, 
position, 0);
 
-            IOCBInit.setAioData(iocb, callbackId);
             submit(ioControl, iocb);
             submitted = true;
         } catch (Throwable e) {
             throw new IOException("submitRead failed", e);
         } finally {
             if(!submitted) {
-                registry.removeCallbackById(callbackId);
+                ioControl.takeCallback((int) callbackId);
             }
         }
     }
 
-    public int poll(IOControl ioControl, Callback[] callbacks, int min, int 
max) {
+    public int poll(IOControl<Callback> ioControl, Callback[] callbacks, int 
min, int max) {
         if(ioControl == null || !ioControl.isValid()) {
 //            logger.warn("poll: invalid context");
             return 0;
@@ -734,20 +738,22 @@ public class FFMNativeHelper<Callback extends SubmitInfo> 
{
                 MemorySegment iocbp = event.get(ValueLayout.ADDRESS, 8L)
                         .reinterpret(64);
                 int eventResult = (int) event.get(ValueLayout.JAVA_LONG, 16L);
-//                logger.trace("poll[{}]: res={}, iocbp=0x{}", i, eventResult, 
Long.toHexString(iocbp.address()));
+//                logger.trace("poll[{}]: res={}, iocbp=0x{}, AioData: {}", i, 
eventResult, Long.toHexString(iocbp.address()),
+//                        IOCBInit.getAioData(iocbp));
 
                 if(eventResult < 0) {
 //                    logger.warn("poll[{}]: I/O error: {}", i, eventResult);
                 }
 
-                long callbackIdRaw = IOCBInit.getAioData(iocbp);
-                if(callbackIdRaw == 0L || callbackIdRaw == -1L) {
+                int callbackIdRaw = (int) IOCBInit.getAioData(iocbp);
+                int iocbState = ioControl.getIocbState().get(callbackIdRaw);
+                if(iocbState == 0 || iocbState == -1) {
 //                    logger.warn("poll[{}]: invalid callback=0x{}", i, 
Long.toHexString(callbackIdRaw));
                     ioControl.putIOCB(iocbp);
                     continue;
                 }
 
-                Callback callback = registry.findCallbackById(callbackIdRaw);
+                Callback callback = ioControl.takeCallback(callbackIdRaw);
                 if(callback != null) {
                     callbacks[i] = callback;
                     if(eventResult < 0) {
@@ -758,11 +764,11 @@ public class FFMNativeHelper<Callback extends SubmitInfo> 
{
                     if (releaseCallback != null) {
                         releaseCallback.release();
                     }
-                    registry.removeCallbackById(callbackIdRaw);
                 } else {
 //                    logger.warn("poll[{}]: callback not found for id=0x{}",
 //                            i, Long.toHexString(callbackIdRaw));
                 }
+                ioControl.getIocbState().set(callbackIdRaw, 0);
                 ioControl.putIOCB(iocbp);
             }
             return result;
@@ -772,7 +778,7 @@ public class FFMNativeHelper<Callback extends SubmitInfo> {
         }
     }
 
-    public void blockedPoll(IOControl ioControl, boolean useFdatasync) {
+    public void blockedPoll(IOControl<Callback> ioControl, boolean 
useFdatasync) {
 //        logger.debug("blockedPoll starting(useFdatasync={})", useFdatasync);
         if(ioControl == null || !ioControl.isValid()) {
 //            logger.warn("blockedPoll: invalid context");
@@ -830,13 +836,13 @@ public class FFMNativeHelper<Callback extends SubmitInfo> 
{
 
                         int eventResult = (int) 
event.get(ValueLayout.JAVA_LONG, 16L);
 
-                        long callbackIdRaw = IOCBInit.getAioData(iocbp);
+                        int callbackIdRaw = (int) IOCBInit.getAioData(iocbp);
 //                        logger.trace("blockedPoll: callbackIdRaw: {}", 
callbackIdRaw);
-                        IOCBInit.setAioData(iocbp, 0L); // this is to detect 
invalid elements on the buffer.
 
-                        if(callbackIdRaw != 0L) {
+                        // this IOCB state is to detect invalid elements on 
the buffer.
+                        
if(ioControl.getIocbState().compareAndSet(callbackIdRaw, 1, 0)) {
                             ioControl.putIOCB(iocbp);
-                            Callback callback = 
registry.findCallbackById(callbackIdRaw);
+                            Callback callback = 
ioControl.takeCallback(callbackIdRaw);
                             if (callback != null) {
                                 if (eventResult < 0) {
 //                                    logger.error("blockedPoll[{}]: I/O error 
fd={}, {}", i, fd, eventResult);
@@ -848,7 +854,6 @@ public class FFMNativeHelper<Callback extends SubmitInfo> {
                                 if (releaseCallback != null) {
                                     releaseCallback.release();
                                 }
-                                registry.removeCallbackById(callbackIdRaw);
                             }
                         } else {
                             if(!forceSysCall.get()) {
diff --git 
a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/IOControl.java 
b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/IOControl.java
index 8777335..8ba9eef 100644
--- 
a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/IOControl.java
+++ 
b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/ffm/IOControl.java
@@ -16,13 +16,15 @@
  */
 package org.apache.activemq.artemis.nativo.jlibaio.ffm;
 
+import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.foreign.MemorySegment;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 
-public class IOControl {
+public class IOControl<Callback extends SubmitInfo> {
 //    private static final Logger logger = 
LoggerFactory.getLogger(IOControl.class);
 
     private final Object iocbLock = new Object();
@@ -35,6 +37,10 @@ public class IOControl {
     private int iocbGet;
     private int used;
     private MemorySegment[] iocbPool;
+    private AtomicReferenceArray<Callback> callbackRegistry;
+
+    // -1: delete, 0: free, 1: used
+    private AtomicIntegerArray iocbState;
 
     public MemorySegment ioContext() {
         return this.ioContext;
@@ -55,6 +61,8 @@ public class IOControl {
     }
     public void setQueueSize(int size) {
         this.queueSize = size;
+        callbackRegistry = new AtomicReferenceArray<>(size);
+        iocbState = new AtomicIntegerArray(size);
     }
 
     public int iocbPut() {
@@ -74,6 +82,20 @@ public class IOControl {
         this.iocbPool = iocbPool;
     }
 
+    public void addCallback(int idx, Callback callback) {
+        if (callbackRegistry.get(idx) != null) {
+            throw new IllegalStateException("callback already registered");
+        }
+        callbackRegistry.set(idx, callback);
+    }
+    public Callback takeCallback(int idx) {
+        return callbackRegistry.getAndSet(idx, null);
+    }
+
+    public AtomicIntegerArray getIocbState() {
+        return this.iocbState;
+    }
+
     public void withIocbLock(Runnable action) {
         synchronized ( iocbLock){
             action.run();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to