Repository: qpid-proton Updated Branches: refs/heads/master db3ee8284 -> 5105b6418
PROTON-1349: Windows iocp proactor additional changes for 0.18 Beta Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5105b641 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5105b641 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5105b641 Branch: refs/heads/master Commit: 5105b641894bfe4dedd87d80d501a5c4601e720a Parents: db3ee82 Author: Clifford Jansen <[email protected]> Authored: Thu Sep 28 23:37:23 2017 -0700 Committer: Clifford Jansen <[email protected]> Committed: Thu Sep 28 23:37:23 2017 -0700 ---------------------------------------------------------------------- examples/c/CMakeLists.txt | 17 +++++----- examples/c/thread.h | 53 +++++++++++++++++++++--------- examples/cpp/CMakeLists.txt | 4 ++- proton-c/CMakeLists.txt | 1 - proton-c/bindings/cpp/CMakeLists.txt | 5 ++- proton-c/src/proactor/win_iocp.c | 54 ++++++++++++++++++++++--------- proton-c/src/tests/CMakeLists.txt | 6 +--- proton-c/src/tests/proactor.c | 9 ++++++ 8 files changed, 101 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 9447ade..3bd5c6b 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -34,16 +34,15 @@ else() set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}") endif() -if(WIN32) - message(STATUS "Windows IOCP proactor examples temporarily disabled for build") -else() - foreach (name broker send receive direct send-abort) - add_executable(c-${name} ${name}.c) - target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) - set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name}) - endforeach() -endif() +foreach (name broker send receive direct send-abort) + add_executable(c-${name} ${name}.c) + target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) + set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name}) +endforeach() set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV}) +# windows exclusion only for 0.18 beta +if(NOT WIN32) add_test(c-example-tests ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v) +endif() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/c/thread.h ---------------------------------------------------------------------- diff --git a/examples/c/thread.h b/examples/c/thread.h index 1bd5595..d96104a 100644 --- a/examples/c/thread.h +++ b/examples/c/thread.h @@ -22,23 +22,46 @@ /* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */ #ifdef _WIN32 - #include <windows.h> -#include <time.h> -#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */ #include <process.h> -#include <windows.h> -#define pthread_function DWORD WINAPI -#define pthread_function_return DWORD -#define pthread_t HANDLE -#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL) -#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread)) -#define pthread_mutex_T HANDLE -#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL)) -#define pthread_mutex_destroy(pobject) CloseHandle(*pobject) -#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE) -#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject) +typedef struct { + HANDLE handle; + void *(*func)(void *); + void *arg; +} pthread_t; + +static unsigned __stdcall pthread_run(void *thr0) { + pthread_t *t = (pthread_t *) thr0; + t->func(t->arg); + return 0; +} + +static int pthread_create(pthread_t *t, void *unused, void *(*f)(void *), void *arg) { + t->handle = 0; + t->func = f; + t->arg = arg; + HANDLE th = (HANDLE) _beginthreadex(0, 0, &pthread_run, t, 0, 0); + if (th) { + t->handle = th; + return 0; + } + return -1; +} + +static int pthread_join(pthread_t t, void **unused) { + if (t.handle) { + WaitForSingleObject(t.handle, INFINITE); + CloseHandle(t.handle); + } + return 0; +} + +typedef CRITICAL_SECTION pthread_mutex_t; +#define pthread_mutex_init(m, unused) InitializeCriticalSectionAndSpinCount(m, 4000) +#define pthread_mutex_destroy(m) DeleteCriticalSection(m) +#define pthread_mutex_lock(m) EnterCriticalSection(m) +#define pthread_mutex_unlock(m) LeaveCriticalSection(m) #else @@ -46,4 +69,4 @@ #endif -#endif /* thread.h */ +#endif /* thread.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index a8d9d34..eeab787 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -73,10 +73,12 @@ endif() add_cpp_test(cpp-example-container ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleTest) +# windows exclusion only for 0.18 beta +if (NOT WIN32) if (NOT SSL_IMPL STREQUAL none) add_cpp_test(cpp-example-container-ssl ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleSSLTest) endif() - +endif() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index cb05ce5..921906e 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -529,7 +529,6 @@ endif() if (PROACTOR STREQUAL "iocp" OR (NOT PROACTOR AND NOT PROACTOR_OK)) if(WIN32 AND NOT CYGWIN) - message(WARNING "Windows IOCP proactor will be built as a prototype but does not yet pass tests") set (PROACTOR_OK iocp) set (qpid-proton-proactor src/proactor/win_iocp.c src/proactor/proactor-internal.c) set_source_files_properties (${qpid-proton-proactor} PROPERTIES http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index 9409d30..83f564f 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -207,4 +207,7 @@ add_cpp_test(scalar_test) add_cpp_test(value_test) add_cpp_test(container_test) add_cpp_test(url_test) -add_cpp_test(reconnect_test) +# windows exclusion only for 0.18 beta +if(NOT WIN32) + add_cpp_test(reconnect_test) +endif() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/proactor/win_iocp.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/win_iocp.c b/proton-c/src/proactor/win_iocp.c index 0ebaf90..36467ff 100644 --- a/proton-c/src/proactor/win_iocp.c +++ b/proton-c/src/proactor/win_iocp.c @@ -64,6 +64,7 @@ */ // TODO: make all code C++ or all C90-ish +// change INACTIVE to be from begin_close instead of zombie reap, to be more like Posix // make the global write lock window much smaller // 2 exclusive write buffers per connection // make the zombie processing thread safe @@ -1515,7 +1516,7 @@ void pni_iocp_initialize(void *obj) iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); assert(iocp->completion_port != NULL); iocp->zombie_list = pn_list(PN_OBJECT, 0); - iocp->iocp_trace = true; + iocp->iocp_trace = false; } void pni_iocp_finalize(void *obj) @@ -1564,6 +1565,7 @@ class csguard { bool set_; }; + // Get string from error status std::string errno_str2(DWORD status) { char buf[512]; @@ -1813,7 +1815,7 @@ void do_complete(iocp_result_t *result) { switch (result->type) { case IOCP_ACCEPT: - /* accept is now processed inline to do in parallel except on teardown */ + /* accept is now processed inline to do in parallel, except on teardown */ assert(iocpd->closing); complete_accept((accept_result_t *) result, result->status); // free's result and retires new_sock break; @@ -1897,7 +1899,7 @@ VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ ); class reaper { public: reaper(pn_proactor_t *p, CRITICAL_SECTION *wlock, iocp_t *iocp) - : iocp_(iocp), global_wlock_(wlock), timer_(NULL) { + : iocp_(iocp), global_wlock_(wlock), timer_(NULL), running(true) { InitializeCriticalSectionAndSpinCount(&lock_, 4000); timer_queue_ = CreateTimerQueue(); if (!timer_queue_) { @@ -1948,6 +1950,7 @@ class reaper { // Called when all competing threads have terminated except our own reap_check timer. void final_shutdown() { + running = false; DeleteTimerQueueEx(timer_queue_, INVALID_HANDLE_VALUE); // No pending or active timers from thread pool remain. Truly single threaded now. pn_free((void *) iocp_); // calls pni_iocp_finalize(); cleans up all sockets, completions, completion port. @@ -1965,13 +1968,14 @@ class reaper { private: void reap_timer() { // Call with lock - if (timer_) + if (timer_ || !running) return; pn_timestamp_t now = pn_i_now2(); pni_zombie_check(iocp_, now); pn_timestamp_t zd = pni_zombie_deadline(iocp_); - if (zd && zd > now) { - if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, zd - now, + if (zd) { + DWORD tm = (zd > now) ? zd - now : 1; + if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, tm, 0, WT_EXECUTEONLYONCE)) { perror("CreateTimerQueueTimer"); abort(); @@ -1984,6 +1988,7 @@ class reaper { CRITICAL_SECTION *global_wlock_; HANDLE timer_queue_; HANDLE timer_; + bool running; }; VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ ) { @@ -2033,25 +2038,27 @@ static pn_event_t *log_event(void* p, pn_event_t *e) { return e; } -static void psocket_error(psocket_t *ps, int err, const char* what) { +static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) { if (ps->is_reaper) return; if (!ps->listener) { pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver; pn_connection_driver_bind(driver); /* Bind so errors will be reported */ - pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s", - what, ps->host, ps->port, - errno_str2(err).c_str()); + pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg); pn_connection_driver_close(driver); } else { pn_listener_t *l = as_listener(ps); - pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s", - what, ps->host, ps->port, - errno_str2(err).c_str()); + pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg); listener_begin_close(l); } } +static void psocket_error(psocket_t *ps, int err, const char* what) { + psocket_error_str(ps, errno_str2(err).c_str(), what); +} + + + // ======================================================================== // pconnection // ======================================================================== @@ -2168,6 +2175,17 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) { return NULL; } +// Call after successful accept +static void set_sock_names(pconnection_t *pc) { + // This works. Note possible use of GetAcceptExSockaddrs() + pn_socket_t sock = pc->psocket.iocpd->socket; + socklen_t len = sizeof(pc->local.ss); + getsockname(sock, (struct sockaddr*)&pc->local.ss, &len); + len = sizeof(pc->remote.ss); + getpeername(sock, (struct sockaddr*)&pc->remote.ss, &len); +} + + // Call with lock held when closing and transitioning away from working context static inline bool pconnection_can_free(pconnection_t *pc) { return pc->psocket.iocpd == NULL && pc->context.completion_ops == 0 @@ -2632,6 +2650,8 @@ static bool connect_step(pconnection_t *pc) { if (success || WSAGetLastError() == ERROR_IO_PENDING) { iocpd->ops_in_progress++; iocpd->active_completer = &pc->psocket; + // getpeername unreliable for outgoing connections, but we know it at this point + memcpy(&pc->remote.ss, ai->ai_addr, ai->ai_addrlen); return true; // logic resumes at connect_step_done() } pn_free(result); @@ -2660,6 +2680,8 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) { pc->psocket.iocpd->write_closed = false; pc->psocket.iocpd->read_closed = false; if (pc->addrinfo) { + socklen_t len = sizeof(pc->local.ss); + getsockname(pc->psocket.iocpd->socket, (struct sockaddr*)&pc->local.ss, &len); freeaddrinfo(pc->addrinfo); pc->addrinfo = NULL; } @@ -2671,6 +2693,7 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) { // Connect failed, no IO started, i.e. no pending iocpd based events pc->context.proactor->reaper->fast_reap(iocpd); pc->psocket.iocpd = NULL; + memset(&pc->remote.ss, 0, sizeof(pc->remote.ss)); // Is there a next connection target in the addrinfo to try? if (pc->ai && connect_step(pc)) { // Trying the next addrinfo possibility. Will return here. @@ -2700,7 +2723,6 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) if (!pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo)) { pc->ai = pc->addrinfo; if (connect_step(pc)) { - g.release(); return; } } @@ -3153,6 +3175,7 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { iocpdesc_t *conn_iocpd = accept_result->new_sock; pc->psocket.iocpd = conn_iocpd; conn_iocpd->active_completer =&pc->psocket; + set_sock_names(pc); pni_iocpdesc_start(conn_iocpd); } @@ -3287,7 +3310,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { { csguard g(&p->context.cslock); // Move the whole contexts list into a disconnecting state - pcontext_t *disconnecting_pcontexts = p->contexts; + disconnecting_pcontexts = p->contexts; p->contexts = NULL; // First pass: mark each pcontext as disconnecting and update global pending count. pcontext_t *ctx = disconnecting_pcontexts; @@ -3331,7 +3354,6 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { } } } else { - assert(l); if (!ctx->closing) { if (cond) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt index c562025..ce7d25e 100644 --- a/proton-c/src/tests/CMakeLists.txt +++ b/proton-c/src/tests/CMakeLists.txt @@ -56,11 +56,7 @@ pn_add_c_test (c-condition-tests condition.c) pn_add_c_test (c-connection-driver-tests connection_driver.c) if(HAS_PROACTOR) - if(WIN32) - message(STATUS "Windows IOCP proactor tests temporarily suspended") - else(WIN32) - pn_add_c_test (c-proactor-tests proactor.c) - endif(WIN32) + pn_add_c_test (c-proactor-tests proactor.c) if(WIN32) # set(path "$<TARGET_FILE_DIR:c-broker>;$<TARGET_FILE_DIR:qpid-proton>") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5105b641/proton-c/src/tests/proactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c index 3995c1d..be00f46 100644 --- a/proton-c/src/tests/proactor.c +++ b/proton-c/src/tests/proactor.c @@ -216,9 +216,14 @@ typedef struct test_listener_t { test_listener_t test_listen(test_proactor_t *tp, const char *host) { test_listener_t l = { test_port(host), pn_listener() }; +#if defined(_WIN32) + sock_close(l.port.sock); // small chance another process will steal the port in Windows +#endif pn_proactor_listen(tp->proactor, l.listener, l.port.host_port, 4); TEST_ETYPE_EQUAL(tp->handler.t, PN_LISTENER_OPEN, test_proactors_run(tp, 1)); +#if !defined(_WIN32) sock_close(l.port.sock); +#endif return l; } @@ -686,7 +691,9 @@ static void test_ipv4_ipv6(test_t *t) { EXPECT_CONNECT(l.port, ""); /* local->all */ if (has_ipv6) { +#if !defined(_WIN32) EXPECT_CONNECT(l6.port, "::"); /* v6->v6 */ +#endif EXPECT_CONNECT(l6.port, ""); /* local->v6 */ EXPECT_CONNECT(l.port, "::1"); /* v6->all */ @@ -1098,7 +1105,9 @@ int main(int argc, char **argv) { RUN_ARGV_TEST(failed, t, test_connection_wake(&t)); RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t)); RUN_ARGV_TEST(failed, t, test_release_free(&t)); +#if !defined(_WIN32) RUN_ARGV_TEST(failed, t, test_ssl(&t)); +#endif RUN_ARGV_TEST(failed, t, test_proactor_addr(&t)); RUN_ARGV_TEST(failed, t, test_parse_addr(&t)); RUN_ARGV_TEST(failed, t, test_netaddr(&t)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
