Repository: qpid-proton Updated Branches: refs/heads/master 3d46b4f02 -> 94dfe1bf0
PROTON-1771: [c] add -close-connnect, -cancel-timeout to threaderciser Also added -no-xxx flags to disable selected actions Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/94dfe1bf Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/94dfe1bf Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/94dfe1bf Branch: refs/heads/master Commit: 94dfe1bf033f7d4b9183bbad75b1801d688a300d Parents: 3d46b4f Author: Alan Conway <[email protected]> Authored: Tue May 8 14:30:46 2018 -0400 Committer: Alan Conway <[email protected]> Committed: Tue May 8 14:32:30 2018 -0400 ---------------------------------------------------------------------- c/tests/threaderciser.c | 110 +++++++++++++++++++++++++++------------ c/tests/threaderciser.tsupp | 5 ++ 2 files changed, 83 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/94dfe1bf/c/tests/threaderciser.c ---------------------------------------------------------------------- diff --git a/c/tests/threaderciser.c b/c/tests/threaderciser.c index 82f39ea..e74db2c 100644 --- a/c/tests/threaderciser.c +++ b/c/tests/threaderciser.c @@ -31,11 +31,17 @@ unpredictable scheduling. Currently using plain old rand(), if quality of randomness is problem we can upgrade. - TODO - - closing connections + NOTE: to narrow down race conditions you have two tools + - specify a limited set of actions with command line flags, e.g. + $ threaderciser -no-close-connect # Do everything but close connections + $ threaderciser -timeout -cancel-timeout # Only do timeout actions + - use suppressions to hide races you know about but are not ready to fix + $ TSAN_OPTIONS="suppressions=/my/suppressions/file" threaderciser + $ valgrind --tool=helgrind --suppressions=/my/suppressions/file threaderciser + + TODO: - pn_proactor_release_connection and re-use with pn_proactor_connect/accept - sending/receiving/tracking messages - - cancel timeout */ #include "thread.h" @@ -60,8 +66,8 @@ #define SLEEP_MAX 100 /* Milliseconds */ /* Set of actions that can be enabled/disabled/counted */ -typedef enum { A_LISTEN, A_LCLOSE, A_CONNECT, A_CCLOSE, A_WAKE, A_TIMEOUT } action; -const char* action_name[] = { "listen", "lclose", "connect", "cclose", "wake", "timeout" }; +typedef enum { A_LISTEN, A_CLOSE_LISTEN, A_CONNECT, A_CLOSE_CONNECT, A_WAKE, A_TIMEOUT, A_CANCEL_TIMEOUT } action; +const char* action_name[] = { "listen", "close-listen", "connect", "close-connect", "wake", "timeout", "cancel-timeout" }; #define action_size (sizeof(action_name)/sizeof(*action_name)) bool action_enabled[action_size] = { 0 } ; @@ -90,7 +96,7 @@ static void debug_impl(const char *fmt, ...) { i += assert_no_err(vsnprintf(i, end-i, fmt, ap)); va_end(ap); } - fputs(msg, stderr); + fprintf(stderr, "%s\n", msg); } /* Shorthand for debugging an action using id as identifier */ @@ -237,6 +243,20 @@ void cpool_wake(cpool *cp) { } } +void cpool_close(cpool *cp) { + if (!action_enabled[A_CLOSE_CONNECT]) return; + connection_ctx *ctx = cpool_pick(cp); + if (ctx) { + pthread_mutex_lock(&ctx->lock); + if (ctx->pn_connection) { + pn_connection_close(ctx->pn_connection); + debuga(A_CLOSE_CONNECT, ctx->pn_connection); + } + pthread_mutex_unlock(&ctx->lock); + cpool_unref(ctx); + } +} + static void connection_ctx_on_close(connection_ctx *ctx) { /* Required locking: mark connection (possibly) closed no more wake calls */ pthread_mutex_lock(&ctx->lock); @@ -318,13 +338,13 @@ static void lpool_addr(lpool *lp, char* a, size_t s) { } void lpool_close(lpool *lp) { - if (!action_enabled[A_LCLOSE]) return; + if (!action_enabled[A_CLOSE_LISTEN]) return; listener_ctx *ctx = lpool_pick(lp); if (ctx) { pthread_mutex_lock(&ctx->lock); if (ctx->pn_listener) { pn_listener_close(ctx->pn_listener); - debuga(A_LCLOSE, ctx->pn_listener); + debuga(A_CLOSE_LISTEN, ctx->pn_listener); } pthread_mutex_unlock(&ctx->lock); lpool_unref(ctx); @@ -391,13 +411,18 @@ static bool maybe(double probability) { static void global_do_stuff(global *g) { if (maybe(0.5)) global_connect(g); if (maybe(0.3)) lpool_listen(&g->listeners, g->proactor); + if (maybe(0.1)) lpool_close(&g->listeners); if (maybe(0.5)) cpool_wake(&g->connections_active); if (maybe(0.5)) cpool_wake(&g->connections_idle); - if (maybe(0.1)) lpool_close(&g->listeners); + if (maybe(0.1)) cpool_close(&g->connections_active); if (action_enabled[A_TIMEOUT] && maybe(0.5)) { debuga(A_TIMEOUT, g->proactor); pn_proactor_set_timeout(g->proactor, rand() % TIMEOUT_MAX); } + if (action_enabled[A_CANCEL_TIMEOUT] && maybe(0.1)) { + debuga(A_CANCEL_TIMEOUT, g->proactor); + pn_proactor_cancel_timeout(g->proactor); + } } static void* user_thread(void* void_g) { @@ -470,66 +495,87 @@ static void* proactor_thread(void* void_g) { static const int default_runtime = 1; static const int default_threads = 8; -void usage(const char **argv, const char **arg, const char **end) { +void usage(const char **argv, const char **arg) { fprintf(stderr, "usage: %s [options]\n", argv[0]); fprintf(stderr, " -time TIME: total run-time in seconds (default %d)\n", default_runtime); fprintf(stderr, " -threads THREADS: total number of threads (default %d)\n", default_threads); fprintf(stderr, " -debug: print debug messages\n"); + fprintf(stderr, "Flags to enable specific actions (all enabled by default)\n"); fprintf(stderr, " "); for (int i = 0; i < (int)action_size; ++i) fprintf(stderr, " -%s", action_name[i]); - fprintf(stderr, ": enable actions\n\n"); - - fprintf(stderr, "bad argument: "); - for (const char **a = argv+1; a < arg; ++a) fprintf(stderr, "%s ", *a); - fprintf(stderr, ">>> %s <<<", *arg++); - for (; arg < end; ++arg) fprintf(stderr, " %s", *arg); + fprintf(stderr, "\n"); + fprintf(stderr, "Flags to disable specific actions (all enabled by default)\n"); + for (int i = 0; i < (int)action_size; ++i) fprintf(stderr, " -no-%s", action_name[i]); + fprintf(stderr, "\n\n"); + fprintf(stderr, "bad argument: %s\n", *arg); exit(1); } +size_t find_action(const char *name, const char **argv, const char **arg) { + for (size_t i = 0; i < action_size; ++i) { + if (!strcmp(name, action_name[i])) return i; + } + usage(argv, arg); + return 0; /* Can't get here. */ +} + int main(int argc, const char* argv[]) { const char **arg = argv + 1; const char **end = argv + argc; int runtime = default_runtime; int threads = default_threads; + bool action_default = true; + for (size_t i = 0; i < action_size; ++i) action_enabled[i] = action_default; while (arg < end) { if (!strcmp(*arg, "-time") && ++arg < end) { runtime = atoi(*arg); - if (runtime <= 0) usage(argv, arg, end); + if (runtime <= 0) usage(argv, arg); } else if (!strcmp(*arg, "-threads") && ++arg < end) { threads = atoi(*arg); - if (threads <= 0) usage(argv, arg, end); + if (threads <= 0) usage(argv, arg); if (threads % 2) threads += 1; /* Round up to even: half proactor, half user */ } else if (!strcmp(*arg, "-debug")) { debug_enable = true; } - else if (**arg == '-') { - size_t i = 0; - while (i < action_size && strcmp((*arg)+1, action_name[i])) ++i; - if (i == action_size) usage(argv, arg, end); - action_enabled[i] = true; - } else { - break; + else if (!strncmp(*arg, "-no-", 4)) { + action_enabled[find_action((*arg) + 4, argv, arg)] = false; + } + else if (!strncmp(*arg, "-", 1)) { + if (action_default) { /* First enable-action flag, switch to default-off */ + action_default = false; + for (size_t i = 0; i < action_size; ++i) action_enabled[i] = action_default; + } + action_enabled[find_action((*arg) + 1, argv, arg)] = true; + } + else { + usage(argv, arg); } ++arg; } - int i = 0; - /* If no actions are requested on command line, enable them all */ - while (i < (int)action_size && !action_enabled[i]) ++i; - if (i == action_size) { - for (i = 0; i < (int)action_size; ++i) action_enabled[i] = true; - } /* Set up global state, start threads */ - debug("threaderciser start threads=%d, time=%d\n", threads, runtime); + + printf("threaderciser start: threads=%d, time=%d, actions=[", threads, runtime); + bool comma = false; + for (size_t i = 0; i < action_size; ++i) { + if (action_enabled[i]) { + printf("%s%s", (comma ? ", " : ""), action_name[i]); + comma = true; + } + } + printf("]\n"); + fflush(stdout); + global g; global_init(&g, threads); lpool_listen(&g.listeners, g.proactor); /* Start initial listener */ pthread_t *user_threads = (pthread_t*)calloc(threads/2, sizeof(pthread_t)); pthread_t *proactor_threads = (pthread_t*)calloc(threads/2, sizeof(pthread_t)); + int i; for (i = 0; i < threads/2; ++i) { pthread_create(&user_threads[i], NULL, user_thread, &g); pthread_create(&proactor_threads[i], NULL, proactor_thread, &g); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/94dfe1bf/c/tests/threaderciser.tsupp ---------------------------------------------------------------------- diff --git a/c/tests/threaderciser.tsupp b/c/tests/threaderciser.tsupp new file mode 100644 index 0000000..18ceb23 --- /dev/null +++ b/c/tests/threaderciser.tsupp @@ -0,0 +1,5 @@ +# TSAN suppressions for threaderciser + +# Benign race in pni_log_enabled +race:pni_log_enabled + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
