stoddard 99/10/08 12:07:19
Modified: src/include ap_listen.h src/modules/mpm/winnt winnt.c winnt.h src/os/win32 iol_socket.c iol_socket.h Log: Use AcceptEx with an async completion port. What we are gaining here is LIFO dispatching out of the worker thread pool. Revision Changes Path 1.9 +3 -0 apache-2.0/src/include/ap_listen.h Index: ap_listen.h =================================================================== RCS file: /home/cvs/apache-2.0/src/include/ap_listen.h,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- ap_listen.h 1999/10/07 21:47:36 1.8 +++ ap_listen.h 1999/10/08 19:06:55 1.9 @@ -65,6 +65,9 @@ ap_listen_rec *next; ap_socket_t *sd; int active; +#ifdef WIN32 + int count; +#endif /* more stuff here, like which protocol is bound to the port */ }; 1.17 +183 -135 apache-2.0/src/modules/mpm/winnt/winnt.c Index: winnt.c =================================================================== RCS file: /home/cvs/apache-2.0/src/modules/mpm/winnt/winnt.c,v retrieving revision 1.16 retrieving revision 1.17 diff -u -r1.16 -r1.17 --- winnt.c 1999/10/07 20:48:26 1.16 +++ winnt.c 1999/10/08 19:06:58 1.17 @@ -90,6 +90,7 @@ static char ap_coredump_dir[MAX_STRING_LEN]; static server_rec *server_conf; +HANDLE AcceptExCompPort = NULL; static int one_process = 0; @@ -363,7 +364,7 @@ } return NULL; } -static int setup_listeners(server_rec *s) +static int setup_listeners(ap_context_t *pconf, server_rec *s) { ap_listen_rec *lr; int num_listeners = 0; @@ -372,7 +373,7 @@ /* Setup the listeners */ FD_ZERO(&listenfds); - if (ap_listen_open(s->process, s->port)) { + if (ap_listen_open(pconf, s->port)) { return 0; } for (lr = ap_listeners; lr; lr = lr->next) { @@ -384,6 +385,7 @@ listenmaxfd = nsd; } } + lr->count = 0; } head_listener = ap_listeners; @@ -445,6 +447,7 @@ } // ap_register_cleanup(p, (void *)lr->sd, socket_cleanup, NULL); ap_put_os_sock(&lr->sd, &nsd, pconf); + lr->count = 0; } CloseHandle(pipe); @@ -724,106 +727,126 @@ } } #endif -static PCOMP_CONTEXT winnt_get_connection(PCOMP_CONTEXT context) +/* + * Windows NT specific code... + */ +static int create_and_queue_completion_context(ap_context_t *p, ap_listen_rec *lr) { - int requests_this_child = 0; - int count_select_errors = 0; - struct timeval tv; - fd_set main_fds; - int wait_time = 1; + PCOMP_CONTEXT context; + DWORD BytesRead; SOCKET nsd; - int rc; + context = ap_pcalloc(p, sizeof(COMP_CONTEXT)); + + if (!context) + return -1; + + context->lr = lr; + context->Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - /* AcceptEx needs a pre-allocated accept socket */ context->accept_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + ap_create_context(&context->ptrans, p); +// context->ptrans = ap_make_sub_pool(p); + context->conn_io = ap_bcreate(context->ptrans, B_RDWR); + context->recv_buf = context->conn_io->inbase; + context->recv_buf_size = context->conn_io->bufsiz - 2*PADDED_ADDR_SIZE; + ap_get_os_sock(context->lr->sd, &nsd); + + AcceptEx(nsd,//context->lr->fd, + context->accept_socket, + context->recv_buf, + context->recv_buf_size, + PADDED_ADDR_SIZE, + PADDED_ADDR_SIZE, + &BytesRead, + (LPOVERLAPPED) context); - ap_lock(allowed_globals.jobmutex); + lr->count++; +// num_comp_contexts++; - while (!workers_may_exit) { - workers_may_exit |= ((ap_max_requests_per_child != 0) && (requests_this_child > ap_max_requests_per_child)); - if (workers_may_exit) - break; + return 0; +} +static ap_inline void reset_completion_context(PCOMP_CONTEXT context) +{ + DWORD BytesRead; + SOCKET nsd; + int rc; + context->lr->count++; - tv.tv_sec = wait_time; - tv.tv_usec = 0; - memcpy(&main_fds, &listenfds, sizeof(fd_set)); + if (context->accept_socket == -1) + context->accept_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - rc = ap_select(listenmaxfd + 1, &main_fds, NULL, NULL, &tv); + ap_clear_pool(context->ptrans); + context->conn_io = ap_bcreate(context->ptrans, B_RDWR); + context->recv_buf = context->conn_io->inbase; + context->recv_buf_size = context->conn_io->bufsiz - 2*PADDED_ADDR_SIZE; + ap_get_os_sock(context->lr->sd, &nsd); + + rc = AcceptEx(nsd, //context->lr->fd, + context->accept_socket, + context->recv_buf, + context->recv_buf_size, + PADDED_ADDR_SIZE, + PADDED_ADDR_SIZE, + &BytesRead, + (LPOVERLAPPED) context); +} +static PCOMP_CONTEXT winnt_get_connection(PCOMP_CONTEXT context) +{ + int requests_this_child = 0; + int rc; - if (rc == 0 || (rc == SOCKET_ERROR && h_errno == WSAEINTR)) { - count_select_errors = 0; /* reset count of errors */ - continue; - } - else if (rc == SOCKET_ERROR) { - /* A "real" error occurred, log it and increment the count of - * select errors. This count is used to ensure we don't go into - * a busy loop of continuous errors. - */ - ap_log_error(APLOG_MARK, APLOG_INFO|APLOG_WIN32ERROR, server_conf, "select failed with errno %d", h_errno); - count_select_errors++; - if (count_select_errors > MAX_SELECT_ERRORS) { - workers_may_exit = 1; - ap_log_error(APLOG_MARK, APLOG_ERR|APLOG_WIN32ERROR, server_conf, - "Too many errors in select loop. Child process exiting."); - break; - } + LPOVERLAPPED pol; + DWORD CompKey; + DWORD BytesRead; + + if (context != NULL) { + context->accept_socket = -1; /* Don't reuse the socket */ + reset_completion_context(context); + } + + rc = GetQueuedCompletionStatus(AcceptExCompPort, + &BytesRead, + &CompKey, + &pol, + INFINITE); + context = (PCOMP_CONTEXT) pol; + if (CompKey == 999) { + if (context) { + closesocket(context->accept_socket); + CloseHandle(context->Overlapped.hEvent); + return NULL; } - else { - DWORD BytesRead; - ap_listen_rec *lr; - - lr = find_ready_listener(&main_fds); - if (lr != NULL) { - ap_get_os_sock(lr->sd, &nsd); - } - else { - ap_log_error(APLOG_MARK, APLOG_ERR|APLOG_WIN32ERROR, server_conf, - "select returned but there are no ready listeners! Exiting."); - break; - } + } - rc = AcceptEx(nsd, context->accept_socket, - context->conn_io->inbase, - context->conn_io->bufsiz - 2*PADDED_ADDR_SIZE, - PADDED_ADDR_SIZE, - PADDED_ADDR_SIZE, - &BytesRead, - &context->Overlapped); - - if (!rc && (h_errno == WSA_IO_PENDING)) { - rc = GetOverlappedResult(context->Overlapped.hEvent, - &context->Overlapped, - &BytesRead, - INFINITE); /* TODO: get timeout from the config file */ - } - if (!rc) { - if (h_errno != WSAECONNABORTED) { - ap_log_error(APLOG_MARK, APLOG_ERR|APLOG_WIN32ERROR, server_conf, - "AcceptEx failed."); - } - continue; /* go back to select */ - } - requests_this_child++; - context->conn_io->incnt = BytesRead; - GetAcceptExSockaddrs(context->conn_io->inbase, - context->conn_io->bufsiz - 2*PADDED_ADDR_SIZE, - PADDED_ADDR_SIZE, - PADDED_ADDR_SIZE, - &context->sa_server, - &context->sa_server_len, - &context->sa_client, - &context->sa_client_len); + ap_lock(allowed_globals.jobmutex); - ap_unlock(allowed_globals.jobmutex); - return context; + context->lr->count--; + if (context->lr->count < 2) { + if (create_and_queue_completion_context(pconf, context->lr) == -1) { + /* log error and continue */ } } - CloseHandle(context->Overlapped.hEvent); ap_unlock(allowed_globals.jobmutex); + + context->conn_io->incnt = BytesRead; +/* + GetAcceptExSockaddrs(context->conn_io->inbase, + context->conn_io->bufsiz - 2*PADDED_ADDR_SIZE, + PADDED_ADDR_SIZE, + PADDED_ADDR_SIZE, + &context->sa_server, + &context->sa_server_len, + &context->sa_client, + &context->sa_client_len); + +*/ + return context; +/* + CloseHandle(context->Overlapped.hEvent); SetEvent(exit_event); return NULL; +*/ } - /* * child_main() - this is the main loop for the worker threads * @@ -844,66 +867,52 @@ */ static void child_main(int child_num) { - PCOMP_CONTEXT lpCompContext; - ap_iol *iol; + PCOMP_CONTEXT context = NULL; - /* Create and initialize the static (unchangeing) portion of the - * completion context - */ - lpCompContext = ap_pcalloc(pconf, sizeof(COMP_CONTEXT)); - lpCompContext->Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - ap_create_context(&(lpCompContext->ptrans), pconf); +// ap_create_context(&(lpCompContext->ptrans), pconf); #if 0 (void) ap_update_child_status(child_num, SERVER_READY, (request_rec *) NULL); #endif while (1) { - BUFF *conn_io; - ap_context_t *ptrans; - int csd = -1; conn_rec *current_conn; - - /* Initialize the dynamic portion of the completion context */ - ap_clear_pool(lpCompContext->ptrans); - lpCompContext->conn_io = ap_bcreate(lpCompContext->ptrans, B_RDWR); + ap_iol *iol; /* Grab a connection off the network */ - if (osver.dwPlatformId == VER_PLATFORM_WIN32_NT) - lpCompContext = winnt_get_connection(lpCompContext); + if (osver.dwPlatformId == VER_PLATFORM_WIN32_WINDOWS) { +// context = win9x_get_connection(context); + } else { -// lpCompContext = win9x_get_connection(lpCompContext); + context = winnt_get_connection(context); } - - if (!lpCompContext) + if (!context) break; - conn_io = lpCompContext->conn_io; - ptrans = lpCompContext->ptrans; - csd = lpCompContext->accept_socket; +// ap_note_cleanups_for_socket(context->ptrans, context->accept_socket); -// ap_note_cleanups_for_socket(ptrans, csd); #if 0 (void) ap_update_child_status(child_num, SERVER_BUSY_READ, (request_rec *) NULL); #endif - sock_disable_nagle(csd); + + sock_disable_nagle(context->accept_socket); - iol = win32_attach_socket(csd); + iol = win32_attach_socket(context->ptrans, context->accept_socket); if (iol == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, server_conf, "error attaching to socket"); - close(csd); + closesocket(context->accept_socket); continue; } - ap_bpush_iol(conn_io, iol); + ap_bpush_iol(context->conn_io, iol); - current_conn = ap_new_connection(ptrans, server_conf, conn_io, - (struct sockaddr_in *) &lpCompContext->sa_client, - (struct sockaddr_in *) &lpCompContext->sa_server, + current_conn = ap_new_connection(context->ptrans, server_conf, context->conn_io, + (struct sockaddr_in *) &context->sa_client, + (struct sockaddr_in *) &context->sa_server, child_num); ap_process_connection(current_conn); @@ -1019,7 +1028,7 @@ ap_log_error(APLOG_MARK,APLOG_ERR|APLOG_WIN32ERROR, server_conf, "Waiting for start_mutex or exit_event -- process will exit"); - ap_destroy_pool(pchild); + ap_destroy_context(pchild); // ap_destroy_pool(pchild): #if 0 cleanup_scoreboard(); #endif @@ -1028,7 +1037,7 @@ /* start_mutex obtained, continue into the select() loop */ if (one_process) { - setup_listeners(server_conf); + setup_listeners(pconf, server_conf); } else { /* Get listeners from the parent process */ setup_inherited_listeners(pconf, server_conf); @@ -1041,7 +1050,8 @@ signal_parent(0); /* tell parent to die */ - ap_destroy_pool(pchild); +// ap_destroy_pool(pchild); + ap_destroy_context(pchild); #if 0 cleanup_scoreboard(); #endif @@ -1051,33 +1061,71 @@ allowed_globals.jobsemaphore = create_semaphore(0); ap_create_lock(pchild, APR_MUTEX, APR_INTRAPROCESS, NULL, &allowed_globals.jobmutex); - /* spawn off the worker threads */ - child_handles = (thread *) alloca(nthreads * sizeof(int)); - for (i = 0; i < nthreads; i++) { - child_handles[i] = create_thread((void (*)(void *)) child_main, (void *) i); - } - /* spawn off accept thread (WIN9x only) */ -// if (osver.dwPlatformId != VER_PLATFORM_WIN32_NT) + if (osver.dwPlatformId == VER_PLATFORM_WIN32_WINDOWS) { + /* spawn off the worker threads */ + child_handles = (thread *) alloca(nthreads * sizeof(int)); + for (i = 0; i < nthreads; i++) { + child_handles[i] = create_thread((void (*)(void *)) child_main, (void *) i); + } + // create_thread((void (*)(void *)) accept_and_queue_connections, (void *) NULL); + } + else { + ap_listen_rec *lr; + SOCKET nsd; + /* Create the AcceptEx completion port */ + AcceptExCompPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, + NULL, + 0, + 0); /* CONCURRENT ACTIVE THREADS */ + /* Associate each listener with the completion port */ + for (lr = ap_listeners; lr != NULL; lr = lr->next) { + ap_get_os_sock(lr->sd, &nsd); + CreateIoCompletionPort((HANDLE) nsd, //(HANDLE)lr->fd, + AcceptExCompPort, + 0, + 0); + } + + /* spawn off the worker threads */ + child_handles = (thread *) alloca(nthreads * sizeof(int)); + for (i = 0; i < nthreads; i++) { + child_handles[i] = create_thread((void (*)(void *)) child_main, (void *) i); + } + + /* Create an AcceptEx context for each listener and queue it to the + * AcceptEx completion port + */ + for (lr = ap_listeners; lr != NULL; lr = lr->next) { + for(i=0; i<1; i++) { + if (create_and_queue_completion_context(pconf, lr) == -1) { + /* log error and exit */ + } + } + } + } + rv = WaitForSingleObject(exit_event, INFINITE); printf("exit event signalled \n"); workers_may_exit = 1; /* Get ready to shutdown and exit */ ap_unlock(start_mutex); -#if 0 - if (osver.dwPlatformId != VER_PLATFORM_WIN32_NT) { - /* This is only needed for platforms that use the accept queue code - * (WIN9x only). It should work on NT but not as efficiently as the - * code written specifically for Windows NT. - */ + + /* Tell the workers to stop */ + if (osver.dwPlatformId == VER_PLATFORM_WIN32_WINDOWS) { for (i = 0; i < nthreads; i++) { - add_job(-1); +// add_job(-1); } } -#endif + else { + for (i=0; i < nthreads; i++) { + PostQueuedCompletionStatus(AcceptExCompPort, 0, 999, NULL); + } + } + /* Wait for all your children */ end_time = time(NULL) + 180; while (nthreads) { @@ -1100,7 +1148,7 @@ destroy_semaphore(allowed_globals.jobsemaphore); ap_destroy_lock(allowed_globals.jobmutex); - ap_destroy_pool(pchild); + ap_destroy_context(pchild); #if 0 cleanup_scoreboard(); @@ -1343,7 +1391,7 @@ HANDLE process_handles[MAX_PROCESSES]; HANDLE process_kill_events[MAX_PROCESSES]; - setup_listeners(s); + setup_listeners(pconf, s); /* Create child process * Should only be one in this version of Apache for WIN32 1.4 +16 -1 apache-2.0/src/modules/mpm/winnt/winnt.h Index: winnt.h =================================================================== RCS file: /home/cvs/apache-2.0/src/modules/mpm/winnt/winnt.h,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- winnt.h 1999/08/31 05:33:28 1.3 +++ winnt.h 1999/10/08 19:07:05 1.4 @@ -64,9 +64,24 @@ extern int ap_extended_status; extern void clean_child_exit(int); + typedef struct CompContext { OVERLAPPED Overlapped; SOCKET accept_socket; + ap_listen_rec *lr; + BUFF *conn_io; + char *recv_buf; + int recv_buf_size; + ap_context_t *ptrans; + struct sockaddr sa_server; + int sa_server_len; + struct sockaddr sa_client; + int sa_client_len; +} COMP_CONTEXT, *PCOMP_CONTEXT; +#if 0 +typedef struct CompContext { + OVERLAPPED Overlapped; + SOCKET accept_socket; BUFF* conn_io; ap_context_t *ptrans; struct sockaddr sa_server; @@ -74,5 +89,5 @@ struct sockaddr sa_client; int sa_client_len; } COMP_CONTEXT, *PCOMP_CONTEXT; - +#endif #endif /* APACHE_MPM_WINNT_H */ 1.5 +4 -3 apache-2.0/src/os/win32/iol_socket.c Index: iol_socket.c =================================================================== RCS file: /home/cvs/apache-2.0/src/os/win32/iol_socket.c,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- iol_socket.c 1999/10/08 14:35:34 1.4 +++ iol_socket.c 1999/10/08 19:07:11 1.5 @@ -116,7 +116,7 @@ rv = closesocket(iol->fd); - free(iol); +// free(iol); return rv; } @@ -603,11 +603,12 @@ #endif -ap_iol *win32_attach_socket(int fd) +ap_iol *win32_attach_socket(ap_context_t *p, int fd) { iol_socket *iol; - iol = malloc(sizeof(iol_socket)); +// iol = malloc(sizeof(iol_socket)); + iol = ap_palloc(p,sizeof(iol_socket)); if (!iol) return (ap_iol*) NULL; iol->iol.methods = &socket_methods; 1.3 +1 -1 apache-2.0/src/os/win32/iol_socket.h Index: iol_socket.h =================================================================== RCS file: /home/cvs/apache-2.0/src/os/win32/iol_socket.h,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- iol_socket.h 1999/09/24 18:41:11 1.2 +++ iol_socket.h 1999/10/08 19:07:12 1.3 @@ -58,7 +58,7 @@ #ifndef OS_WIN32_IOL_SOCKET_H #define OS_WIN32_IOL_SOCKET_H -ap_iol *win32_attach_socket(int fd); +ap_iol *win32_attach_socket(ap_context_t *p, int fd); #endif