This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis-native.git


The following commit(s) were added to refs/heads/master by this push:
     new 74a94f1  ARTEMIS-1977 ASYNCIO can reduce sys-calls to retrieve I/O 
events
74a94f1 is described below

commit 74a94f19367233872f792ae8731358a3ae6e0078
Author: Francesco Nigro <[email protected]>
AuthorDate: Tue Jan 29 10:54:32 2019 +0100

    ARTEMIS-1977 ASYNCIO can reduce sys-calls to retrieve I/O events
    
    On LibAIO is possible to retrieve the I/O completion
    events without using io_getevents sys-calls by reading
    the user-space ring buffer used by the kernel to store them.
    This commit include another optimization to avoid
    calling a method to obtain the buffers address, saving
    safepoint polls, a method call and implicit instance
    checks performed.
---
 ...activemq_artemis_nativo_jlibaio_LibaioContext.c | 117 +++++++++++++++++++--
 1 file changed, 110 insertions(+), 7 deletions(-)

diff --git 
a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c 
b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
index 694194e..768e62a 100644
--- a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
+++ b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c
@@ -36,6 +36,11 @@
 #include "org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h"
 #include "exception_helper.h"
 
+//x86 has a strong memory model and there is no need of HW fences if just 
Write-Back (WB) memory is used
+#define mem_barrier() __asm__ __volatile__ ("":::"memory")
+#define read_barrier() __asm__ __volatile__("":::"memory")
+#define store_barrier()        __asm__ __volatile__("":::"memory")
+
 struct io_control {
     io_context_t ioContext;
     struct io_event * events;
@@ -57,6 +62,90 @@ struct io_control {
 
 };
 
+//These should be used to check if the user-space io_getevents is supported:
+//Linux ABI for the ring buffer: 
https://elixir.bootlin.com/linux/latest/source/fs/aio.c#L54
+//aio_read_events_ring: 
https://elixir.bootlin.com/linux/latest/source/fs/aio.c#L1148
+#define AIO_RING_MAGIC 0xa10a10a1
+#define AIO_RING_INCOMPAT_FEATURES     0
+
+
+struct aio_ring {
+       unsigned        id;     /* kernel internal index number */
+       unsigned        nr;     /* number of io_events */
+       unsigned        head;
+       unsigned        tail;
+
+       unsigned        magic;
+       unsigned        compat_features;
+       unsigned        incompat_features;
+       unsigned        header_length;  /* size of aio_ring */
+
+
+       struct io_event         io_events[0];
+}; /* 128 bytes + ring size */
+
+static inline int has_usable_ring(struct aio_ring *ring) {
+    return ring->magic == AIO_RING_MAGIC && ring->incompat_features == 
AIO_RING_INCOMPAT_FEATURES;
+}
+
+static inline struct aio_ring* to_aio_ring(io_context_t aio_ctx) {
+    return (struct aio_ring*) aio_ctx;
+}
+
+//It implements a user space batch read io events implementation that attempts 
to read io avoiding any sys calls
+static int artemis_io_getevents(io_context_t aio_ctx, long min_nr, long max,
+                                                       struct io_event 
*events, struct timespec *timeout) {
+    struct aio_ring *ring = to_aio_ring(aio_ctx);
+    //checks if it could be completed in user space, saving a sys call
+    if (has_usable_ring(ring)) {
+        const unsigned ring_nr = ring->nr;
+        // We're assuming to be the exclusive writer to head, so we just need 
a compiler barrier
+        unsigned head = ring->head;
+        mem_barrier();
+        const unsigned tail = ring->tail;
+        int available = tail - head;
+        if (available < 0) {
+            //a wrap has occurred
+            available += ring_nr;
+        }
+        #ifdef DEBUG
+            fprintf(stdout, "tail = %d head= %d nr = %d available = %d\n", 
tail, head, ring_nr, available);
+        #endif
+        if ((available >= min_nr) || (timeout && timeout->tv_sec == 0 && 
timeout->tv_nsec == 0)) {
+            if (!available) {
+                return 0;
+            }
+            //the kernel has written ring->tail from an interrupt:
+            //we need to load acquire the completed events here
+            read_barrier();
+            const int available_nr = available < max? available : max;
+            //if isn't needed to wrap we can avoid % operations that are quite 
expansive
+            const int needMod = ((head + available_nr) >= ring_nr) ? 1 : 0;
+            for (int i = 0; i<available_nr; i++) {
+                events[i] = ring->io_events[head];
+                if (needMod == 1) {
+                    head = (head + 1) % ring_nr;
+                } else {
+                    head = (head + 1);
+                }
+            }
+            //it allow the kernel to build its own view of the ring buffer size
+            //and push new events if there are any
+            store_barrier();
+            ring->head = head;
+            #ifdef DEBUG
+                fprintf(stdout, "consumed non sys-call = %d\n", available_nr);
+            #endif
+            return available_nr;
+        }
+    }
+    int sys_call_events = io_getevents(aio_ctx, min_nr, max, events, timeout);
+    #ifdef DEBUG
+        fprintf(stdout, "consumed sys-call = %d\n", sys_call_events);
+    #endif
+    return sys_call_events;
+}
+
 // We need a fast and reliable way to stop the blocked poller
 // for that we need a dumb file,
 // We are using a temporary file for this.
@@ -76,6 +165,8 @@ jmethodID libaioContextDone = NULL;
 jclass libaioContextClass = NULL;
 jclass runtimeExceptionClass = NULL;
 jclass ioExceptionClass = NULL;
+jclass nioBufferClass = NULL;
+jfieldID nioBufferAddressFieldId = NULL;
 
 // util methods
 void throwRuntimeException(JNIEnv* env, char* message) {
@@ -229,6 +320,13 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
            return JNI_ERR;
         }
 
+        nioBufferClass = (*env)->FindClass(env, "java/nio/Buffer");
+        if (nioBufferClass == NULL) {
+           return JNI_ERR;
+        }
+        nioBufferClass = (jclass)(*env)->NewGlobalRef(env, 
(jobject)nioBufferClass);
+        nioBufferAddressFieldId = (*env)->GetFieldID(env, nioBufferClass, 
"address", "J");
+
         return JNI_VERSION_1_6;
     }
 }
@@ -274,6 +372,10 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
         if (libaioContextClass != NULL) {
             (*env)->DeleteGlobalRef(env, (jobject)libaioContextClass);
         }
+
+        if (nioBufferClass != NULL) {
+            (*env)->DeleteGlobalRef(env, (jobject)nioBufferClass);
+        }
     }
 }
 
@@ -284,7 +386,8 @@ JNIEXPORT void JNICALL 
Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon
 
 
 static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
-    struct io_control * ioControl = (struct io_control *) 
(*env)->GetDirectBufferAddress(env, pointer);
+    jlong address = (*env)->GetLongField(env, pointer, 
nioBufferAddressFieldId);
+    struct io_control * ioControl = (struct io_control *) address;
     if (ioControl == NULL) {
        throwRuntimeException(env, "Controller not initialized");
     }
@@ -352,7 +455,7 @@ static inline short submit(JNIEnv * env, struct io_control 
* theControl, struct
 }
 
 static inline void * getBuffer(JNIEnv* env, jobject pointer) {
-    return (*env)->GetDirectBufferAddress(env, pointer);
+    return (void *) (*env)->GetLongField(env, pointer, 
nioBufferAddressFieldId);;
 }
 
 JNIEXPORT jboolean JNICALL 
Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_lock
@@ -513,7 +616,7 @@ JNIEXPORT void JNICALL 
Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon
     pthread_mutex_unlock(&(theControl->pollLock));
 
     // To return any pending IOCBs
-    int result = io_getevents(theControl->ioContext, 0, 1, theControl->events, 
0);
+    int result = artemis_io_getevents(theControl->ioContext, 0, 1, 
theControl->events, 0);
     for (i = 0; i < result; i++) {
         struct io_event * event = &(theControl->events[i]);
         struct iocb * iocbp = event->obj;
@@ -635,7 +738,7 @@ JNIEXPORT void JNICALL 
Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon
 
     while (running) {
 
-        int result = io_getevents(theControl->ioContext, 1, max, 
theControl->events, 0);
+        int result = artemis_io_getevents(theControl->ioContext, 1, max, 
theControl->events, 0);
 
         if (result == -EINTR)
         {
@@ -726,7 +829,7 @@ JNIEXPORT jint JNICALL 
Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon
     }
 
 
-    int result = io_getevents(theControl->ioContext, min, max, 
theControl->events, 0);
+    int result = artemis_io_getevents(theControl->ioContext, min, max, 
theControl->events, 0);
     int retVal = result;
 
     for (i = 0; i < result; i++) {
@@ -792,7 +895,7 @@ JNIEXPORT void JNICALL 
Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon
        throwRuntimeException(env, "Null pointer");
        return;
     }
-       void *  buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
+       void *  buffer = getBuffer(env, jbuffer);
        free(buffer);
 }
 
@@ -907,7 +1010,7 @@ JNIEXPORT void JNICALL 
Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon
     #ifdef DEBUG
         fprintf (stdout, "Mem setting buffer with %d bytes\n", size);
     #endif
-    void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
+    void * buffer = getBuffer(env, jbuffer);
 
     if (buffer == 0)
     {

Reply via email to