HTRACE-164. htrace hrpc: use msgpack for serialization (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/33ab96fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/33ab96fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/33ab96fb Branch: refs/heads/master Commit: 33ab96fbe6e5d12f75ed4f054d693dfd211aa586 Parents: 08d54c9 Author: Colin P. Mccabe <[email protected]> Authored: Fri May 1 18:36:10 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Mon May 11 22:46:20 2015 -0700 ---------------------------------------------------------------------- LICENSE.txt | 8 + htrace-c/src/CMakeLists.txt | 6 + htrace-c/src/core/conf.c | 1 + htrace-c/src/core/htrace.h | 7 + htrace-c/src/core/span.c | 87 +- htrace-c/src/core/span.h | 21 +- htrace-c/src/receiver/hrpc.c | 26 +- htrace-c/src/receiver/hrpc.h | 9 +- htrace-c/src/receiver/htraced.c | 545 ++-- htrace-c/src/test/cmp_util-unit.c | 123 + htrace-c/src/test/span_util.c | 191 ++ htrace-c/src/test/span_util.h | 14 + htrace-c/src/util/cmp.c | 2674 ++++++++++++++++++ htrace-c/src/util/cmp.h | 441 +++ htrace-c/src/util/cmp_util.c | 90 + htrace-c/src/util/cmp_util.h | 76 + htrace-htraced/src/go/Godeps/Godeps.json | 4 + htrace-htraced/src/go/gobuild.sh | 11 +- .../go/src/org/apache/htrace/client/hclient.go | 16 +- .../src/go/src/org/apache/htrace/common/rpc.go | 2 +- .../go/src/org/apache/htrace/htraced/hrpc.go | 13 +- 21 files changed, 4093 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/LICENSE.txt ---------------------------------------------------------------------- diff --git a/LICENSE.txt b/LICENSE.txt index fa6051c..19dd8f7 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -276,3 +276,11 @@ https://github.com/marionettejs/backbone.babysitter/blob/v0.1.6/LICENSE.md Backbone.Marionette is a composite application library for Backbone.js. It is MIT licensed: https://github.com/marionettejs/backbone.marionette/blob/v2.4.1/license.txt + +CMP is an implementation of the MessagePack serialization format in +C. It is licensed under the MIT license: +https://github.com/camgunz/cmp/blob/master/LICENSE + +go-codec is an implementation of several serialization and deserialization +codecs in Go. It is licensed under the MIT license: +https://github.com/ugorji/go/blob/master/LICENSE \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/htrace-c/src/CMakeLists.txt b/htrace-c/src/CMakeLists.txt index 3d51968..da08ee9 100644 --- a/htrace-c/src/CMakeLists.txt +++ b/htrace-c/src/CMakeLists.txt @@ -83,6 +83,8 @@ set(SRC_ALL sampler/never.c sampler/prob.c sampler/sampler.c + util/cmp.c + util/cmp_util.c util/htable.c util/log.c util/process_id.c @@ -130,6 +132,10 @@ macro(add_utest utest) add_test(${utest} ${CMAKE_CURRENT_BINARY_DIR}/${utest} ${utest}) endmacro(add_utest) +add_utest(cmp_util-unit + test/cmp_util-unit.c +) + add_utest(conf-unit test/conf-unit.c ) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/core/conf.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/core/conf.c b/htrace-c/src/core/conf.c index a561906..d9eed2a 100644 --- a/htrace-c/src/core/conf.c +++ b/htrace-c/src/core/conf.c @@ -34,6 +34,7 @@ ";" HTRACED_READ_TIMEO_MS_KEY "=60000"\ ";" HTRACE_PROCESS_ID "=%{tname}/%{ip}"\ ";" HTRACED_ADDRESS_KEY "=localhost:9095"\ + ";" HTRACED_BUFFER_SEND_TRIGGER_FRACTION "=0.50"\ ) static int parse_key_value(char *str, char **key, char **val) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/core/htrace.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/core/htrace.h b/htrace-c/src/core/htrace.h index 4e2f1f9..816c222 100644 --- a/htrace-c/src/core/htrace.h +++ b/htrace-c/src/core/htrace.h @@ -135,6 +135,13 @@ extern "C" { #define HTRACED_BUFFER_SIZE_KEY "htraced.buffer.size" /** + * The fraction of the buffer that needs to be full to trigger the spans to be + * sent from the htraced span receiver. + */ +#define HTRACED_BUFFER_SEND_TRIGGER_FRACTION \ + "htraced.buffer.send.trigger.fraction" + +/** * The process ID string to use. * * %{ip} will be replaced by an IP address; http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/core/span.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/core/span.c b/htrace-c/src/core/span.c index 13ba3cf..790afa5 100644 --- a/htrace-c/src/core/span.c +++ b/htrace-c/src/core/span.c @@ -19,6 +19,7 @@ #include "core/span.h" #include "receiver/receiver.h" #include "sampler/sampler.h" +#include "util/cmp.h" #include "util/log.h" #include "util/rand.h" #include "util/string.h" @@ -26,6 +27,7 @@ #include <inttypes.h> #include <stdint.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> @@ -127,7 +129,7 @@ void htrace_span_sort_and_dedupe_parents(struct htrace_span *span) * containing the span contents. With buf non-NULL, we will write the span * contents to the provided buffer. * - * @param scope The scope + * @param span The span * @param max The maximum number of bytes to write to buf. * @param buf If non-NULL, where the string will be written. * @@ -147,7 +149,7 @@ static int span_json_sprintf_impl(const struct htrace_span *span, ret += fwdprintf(&buf, &max, "{\"s\":\"%016" PRIx64 "\",\"b\":%" PRId64 ",\"e\":%" PRId64",", span->span_id, span->begin_ms, span->end_ms); - if (span->desc) { + if (span->desc[0]) { ret += fwdprintf(&buf, &max, "\"d\":\"%s\",", span->desc); } if (span->prid) { @@ -174,14 +176,87 @@ static int span_json_sprintf_impl(const struct htrace_span *span, return ret + 1; } -int span_json_size(const struct htrace_span *scope) +int span_json_size(const struct htrace_span *span) { - return span_json_sprintf_impl(scope, 0, NULL); + return span_json_sprintf_impl(span, 0, NULL); } -void span_json_sprintf(const struct htrace_span *scope, int max, void *buf) +void span_json_sprintf(const struct htrace_span *span, int max, void *buf) { - span_json_sprintf_impl(scope, max, buf); + span_json_sprintf_impl(span, max, buf); +} + +int span_write_msgpack(const struct htrace_span *span, cmp_ctx_t *ctx) +{ + int i, num_parents; + uint16_t map_size = + 1 + // desc + 1 + // begin_ms + 1 + // end_ms + 1; // span_id + + num_parents = span->num_parents; + if (span->prid) { + map_size++; + } + if (num_parents > 0) { + map_size++; + } + if (!cmp_write_map16(ctx, map_size)) { + return 0; + } + if (!cmp_write_fixstr(ctx, "d", 1)) { + return 0; + } + if (!cmp_write_str16(ctx, span->desc, strlen(span->desc))) { + return 0; + } + if (!cmp_write_fixstr(ctx, "b", 1)) { + return 0; + } + if (!cmp_write_u64(ctx, span->begin_ms)) { + return 0; + } + if (!cmp_write_fixstr(ctx, "e", 1)) { + return 0; + } + if (!cmp_write_u64(ctx, span->end_ms)) { + return 0; + } + if (!cmp_write_fixstr(ctx, "s", 1)) { + return 0; + } + if (!cmp_write_u64(ctx, span->span_id)) { + return 0; + } + if (span->prid) { + if (!cmp_write_fixstr(ctx, "r", 1)) { + return 0; + } + if (!cmp_write_str16(ctx, span->prid, strlen(span->prid))) { + return 0; + } + } + if (num_parents > 0) { + if (!cmp_write_fixstr(ctx, "p", 1)) { + return 0; + } + if (!cmp_write_array16(ctx, num_parents)) { + return 0; + } + if (num_parents == 1) { + if (!cmp_write_u64(ctx, span->parent.single)) { + return 0; + } + } else { + for (i = 0; i < num_parents; i++) { + if (!cmp_write_u64(ctx, span->parent.list[i])) { + return 0; + } + } + } + } + return 1; } // vim:ts=4:sw=4:et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/core/span.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/core/span.h b/htrace-c/src/core/span.h index b19bd94..710cff7 100644 --- a/htrace-c/src/core/span.h +++ b/htrace-c/src/core/span.h @@ -29,6 +29,7 @@ #include <stdint.h> +struct cmp_ctx_s; struct htracer; struct htrace_span { @@ -105,16 +106,6 @@ void htrace_span_free(struct htrace_span *span); void htrace_span_sort_and_dedupe_parents(struct htrace_span *span); /** - * Escape a JSON string. Specifically, put backslashes before double quotes and - * other backslashes. - * - * @param in The string to escape. - * - * @param out The escaped string. Malloced. NULL on OOM. - */ -char *json_escape(const char *in); - -/** * Get the buffer size that would be needed to serialize this span to a buffer. * * @param span The span. @@ -133,6 +124,16 @@ int span_json_size(const struct htrace_span *span); */ void span_json_sprintf(const struct htrace_span *span, int max, void *buf); +/** + * Write a span to the provided CMP context. + * + * @param span The span. + * @param ctx The CMP context. + * + * @return 0 on failure; 1 on success. + */ +int span_write_msgpack(const struct htrace_span *span, struct cmp_ctx_s *ctx); + #endif // vim: ts=4:sw=4:et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/receiver/hrpc.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/hrpc.c b/htrace-c/src/receiver/hrpc.c index 32a1f4a..f2e296d 100644 --- a/htrace-c/src/receiver/hrpc.c +++ b/htrace-c/src/receiver/hrpc.c @@ -52,7 +52,7 @@ * Implements sending messages via HRPC. */ -#define HRPC_MAGIC 0x48545243U +#define HRPC_MAGIC 0x43525448U #define MAX_HRPC_ERROR_LENGTH (4 * 1024 * 1024) @@ -130,7 +130,8 @@ static int try_connect(struct hrpc_client *hcli, struct addrinfo *p); static int set_socket_read_and_write_timeout(struct hrpc_client *hcli, int sock); static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id, - const void *req, size_t req_len, uint64_t *seq); + const void *buf1, size_t buf1_len, + const void *buf2, size_t buf2_len, uint64_t *seq); static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id, uint64_t seq, char **err, void **resp, size_t *resp_len); @@ -185,7 +186,8 @@ void hrpc_client_free(struct hrpc_client *hcli) } int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id, - const void *req, size_t req_len, + const void *buf1, size_t buf1_len, + const void *buf2, size_t buf2_len, char **err, void **resp, size_t *resp_len) { uint64_t seq; @@ -198,7 +200,8 @@ int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id, } else { htrace_log(hcli->lg, "hrpc_client_call: connection was already open\n"); } - if (!hrpc_client_send_req(hcli, method_id, req, req_len, &seq)) { + if (!hrpc_client_send_req(hcli, method_id, + buf1, buf1_len, buf2, buf2_len, &seq)) { goto error; } htrace_log(hcli->lg, "hrpc_client_call: waiting for response\n"); @@ -338,20 +341,25 @@ static int set_socket_read_and_write_timeout(struct hrpc_client *hcli, } static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id, - const void *req, size_t req_len, uint64_t *seq) + const void *buf1, size_t buf1_len, + const void *buf2, size_t buf2_len, uint64_t *seq) { + // We use writev (scatter/gather I/O) here in order to avoid sending + // multiple packets when TCP_NODELAY is turned on. struct hrpc_req_header hdr; - struct iovec iov[2]; + struct iovec iov[3]; hdr.magic = htole64(HRPC_MAGIC); hdr.method_id = htole32(method_id); *seq = hcli->seq++; hdr.seq = htole64(*seq); - hdr.length = htole32(req_len); + hdr.length = htole32(buf1_len + buf2_len); iov[0].iov_base = &hdr; iov[0].iov_len = sizeof(hdr); - iov[1].iov_base = (void*)req; - iov[1].iov_len = req_len; + iov[1].iov_base = (void*)buf1; + iov[1].iov_len = buf1_len; + iov[2].iov_base = (void*)buf2; + iov[2].iov_len = buf2_len; while (1) { ssize_t res = writev(hcli->sock, iov, sizeof(iov)/sizeof(iov[0])); http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/receiver/hrpc.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/hrpc.h b/htrace-c/src/receiver/hrpc.h index 8ec20be..c7126b0 100644 --- a/htrace-c/src/receiver/hrpc.h +++ b/htrace-c/src/receiver/hrpc.h @@ -60,8 +60,10 @@ void hrpc_client_free(struct hrpc_client *hcli); * * @param hcli The HRPC client. * @param method_id The method ID to use. - * @param req The request buffer to send. - * @param req_len The size of the request buffer to send. + * @param buf1 The first buffer to send. + * @param buf1_len The size of the first buffer to send. + * @param buf2 The second buffer to send. + * @param buf2_len The size of the second buffer to send. * @param err (out param) Will be set to a malloced * NULL-terminated string if the server returned an * error response. NULL otherwise. @@ -72,7 +74,8 @@ void hrpc_client_free(struct hrpc_client *hcli); * @return 0 on failure, 1 on success. */ int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id, - const void *req, size_t req_len, + const void *buf1, size_t buf1_len, + const void *buf2, size_t buf2_len, char **err, void **resp, size_t *resp_len); /** http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/receiver/htraced.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c index 7e4af81..8f392b7 100644 --- a/htrace-c/src/receiver/htraced.c +++ b/htrace-c/src/receiver/htraced.c @@ -23,6 +23,8 @@ #include "receiver/hrpc.h" #include "receiver/receiver.h" #include "test/test.h" +#include "util/cmp.h" +#include "util/cmp_util.h" #include "util/log.h" #include "util/string.h" #include "util/time.h" @@ -30,6 +32,7 @@ #include <errno.h> #include <inttypes.h> #include <pthread.h> +#include <stddef.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> @@ -38,22 +41,59 @@ /** * @file htraced.c * - * The htraced span receiver which implements sending spans on to htraced. + * The htraced span receiver which implements sending spans on to the htraced + * daemon. + * + * We send spans via the HRPC protocol. HRPC consists of a simple fixed-size + * header specifying a magic number, the length, and a message type, followed by + * data in the msgpack serialization format. The message type will determine + * the type of msgpack data. Messages bodies are sent as maps, essentially + * making all fields optional and allowing the protocol to evolve. See hrpc.c + * and rpc.go for the implementation of HRPC. + * + * Spans are serialized immediately when they are added to the buffer. This is + * one of the advantages of msgpack-- it has a good streaming interface. We do + * not need to keep around the span objects after htraced_rcv_add_span. + * + * The htraced receiver keeps two equally sized buffers around internally. + * While we are writing spans to one buffer, we can be sending the data from the + * other buffer over the wire. The intention here is to avoid copies as much as + * possible. In general, what we send over the wire is exactly what is in the + * buffer, except that we have to add a short "prequel" to it containing the + * other WriteSpansReq fields. + * + * Note that we may change the serialization in the future if we discover better + * alternatives. Sending spans over HTTP as JSON will always be supported + * as a fallback. + */ + +/** + * The maximum length of the message we will send to the server. + * This must be the same or shorter than MAX_HRPC_BODY_LENGTH in rpc.go. + */ +#define MAX_HRPC_LEN (64ULL * 1024ULL * 1024ULL) + +/** + * The maximum length of the prequel in a WriteSpans message. */ +#define MAX_WRITESPANS_PREQUEL_LEN 1024 /** - * The minimum buffer size to allow for the htraced circular buffer. + * The maximum length of the span data in a WriteSpans message. + */ +#define MAX_SPAN_DATA_LEN (MAX_HRPC_LEN - MAX_WRITESPANS_PREQUEL_LEN) + +/** + * The minimum total buffer size to allow. * - * This should hopefully allow at least a few spans to be buffered. + * This should allow at least a few spans to be buffered. */ #define HTRACED_MIN_BUFFER_SIZE (4ULL * 1024ULL * 1024ULL) /** - * The maximum buffer size to allow for the htraced circular buffer. - * This is mainly to avoid overflow. Of course, you couldn't allocate a buffer - * anywhere near this size anyway. + * The maximum total buffer size to allow. */ -#define HTRACED_MAX_BUFFER_SIZE 0x7ffffffffffffffLL +#define HTRACED_MAX_BUFFER_SIZE (2ULL * MAX_SPAN_DATA_LEN) /** * The minimum number of milliseconds to allow for flush_interval_ms. @@ -77,13 +117,6 @@ #define HTRACED_READ_TIMEO_MS_MIN 50LL /** - * The maximum size of the message we will send over the wire. - * This also sets the size of the transmission buffer. - * This constant must not be more than 2^^32 on 32-bit systems. - */ -#define HTRACED_MAX_MSG_LEN (8ULL * 1024ULL * 1024ULL) - -/** * The maximum number of times we will try to add a span to the circular buffer * before giving up. */ @@ -101,6 +134,36 @@ */ #define HTRACED_SEND_RETRY_SLEEP_MS 5000 +/** + * The number of buffers used by htraced. + */ +#define HTRACED_NUM_BUFS 2 + +/** + * An HTraced send buffer. + */ +struct htraced_sbuf { + /** + * Current offset within the buffer. + */ + uint64_t off; + + /** + * Length of the buffer. + */ + uint64_t len; + + /** + * The number of spans in the buffer. + */ + uint64_t num_spans; + + /** + * The buffer data. This field actually has size 'len,' not size 1. + */ + char buf[1]; +}; + /* * A span receiver that writes spans to htraced. */ @@ -136,45 +199,34 @@ struct htraced_rcv { struct hrpc_client *hcli; /** - * Length of the circular buffer. - */ - uint64_t clen; - - /** - * A circular buffer containing span data. - */ - uint8_t *cbuf; - - /** - * 'start' pointer of the circular buffer. + * The monotonic-clock time at which we last did a send operation. */ - uint64_t cstart; + uint64_t last_send_ms; /** - * 'end' pointer of the circular buffer. + * The index of the active buffer. */ - uint64_t cend; + int active_buf; /** - * The monotonic-clock time at which we last did a send operation. + * The two send buffers. */ - uint64_t last_send_ms; + struct htraced_sbuf *sbuf[HTRACED_NUM_BUFS]; /** - * RPC messages are copied into this buffer before being sent. - * Its length is HTRACED_MAX_MSG_LEN. + * Lock protecting the buffers from concurrent writes. */ - uint8_t *sbuf; + pthread_mutex_t lock; /** - * Lock protecting the circular buffer from concurrent writes. + * Condition variable used to wake up the background thread. */ - pthread_mutex_t lock; + pthread_cond_t bg_cond; /** - * Condition variable used to wake up the background thread. + * Condition variable used to wake up flushing threads. */ - pthread_cond_t cond; + pthread_cond_t flush_cond; /** * Background transmitter thread. @@ -185,8 +237,44 @@ struct htraced_rcv { void* run_htraced_xmit_manager(void *data); static int should_xmit(struct htraced_rcv *rcv, uint64_t now); static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now); -static uint64_t cbuf_used(const struct htraced_rcv *rcv); -static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv); + +static int htraced_sbufs_empty(struct htraced_rcv *rcv) +{ + int i; + for (i = 0; i < HTRACED_NUM_BUFS; i++) { + if (rcv->sbuf[i]->off) { + return 0; + } + } + return 1; +} + +static struct htraced_sbuf *htraced_sbuf_alloc(uint64_t len) +{ + struct htraced_sbuf *sbuf; + + // The final field of the htraced_sbuf structure is declared as having size + // 1, but really it has size 'len'. This avoids a pointer dereference when + // accessing data in the sbuf. + sbuf = malloc(offsetof(struct htraced_sbuf, buf) + len); + if (!sbuf) { + return NULL; + } + sbuf->off = 0; + sbuf->len = len; + sbuf->num_spans = 0; + return sbuf; +} + +static void htraced_sbuf_free(struct htraced_sbuf *sbuf) +{ + free(sbuf); +} + +static uint64_t htraced_sbuf_remaining(const struct htraced_sbuf *sbuf) +{ + return sbuf->len - sbuf->off; +} static uint64_t htraced_get_bounded_u64(struct htrace_log *lg, const struct htrace_conf *cnf, const char *prop, @@ -207,13 +295,33 @@ static uint64_t htraced_get_bounded_u64(struct htrace_log *lg, return val; } +static double htraced_get_bounded_double(struct htrace_log *lg, + const struct htrace_conf *cnf, const char *prop, + double min, double max) +{ + double val = htrace_conf_get_double(lg, cnf, prop); + if (val < min) { + htrace_log(lg, "htraced_rcv_create: can't set %s to %g" + ". Using minimum value of %g instead.\n", + prop, val, min); + return min; + } else if (val > max) { + htrace_log(lg, "htraced_rcv_create: can't set %s to %g" + ". Using maximum value of %g instead.\n", + prop, val, max); + return max; + } + return val; +} + static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer, const struct htrace_conf *conf) { struct htraced_rcv *rcv; const char *endpoint; - int ret; - uint64_t write_timeo_ms, read_timeo_ms; + int i, ret; + uint64_t write_timeo_ms, read_timeo_ms, buf_len; + double send_fraction; endpoint = htrace_conf_get(conf, HTRACED_ADDRESS_KEY); if (!endpoint) { @@ -223,7 +331,7 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer, HTRACED_ADDRESS_KEY); goto error; } - rcv = malloc(sizeof(*rcv)); + rcv = calloc(1, sizeof(*rcv)); if (!rcv) { htrace_log(tracer->lg, "htraced_rcv_create: OOM while " "allocating htraced_rcv.\n"); @@ -247,68 +355,66 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer, if (!rcv->hcli) { goto error_free_rcv; } - rcv->clen = htrace_conf_get_u64(tracer->lg, conf, HTRACED_BUFFER_SIZE_KEY); - if (rcv->clen < HTRACED_MIN_BUFFER_SIZE) { - htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64 - ". Setting the minimum buffer size of %llu" - " instead.\n", rcv->clen, HTRACED_MIN_BUFFER_SIZE); - rcv->clen = HTRACED_MIN_BUFFER_SIZE; - } else if (rcv->clen > HTRACED_MAX_BUFFER_SIZE) { - htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64 - ". Setting the maximum buffer size of %lld" - " instead.\n", rcv->clen, HTRACED_MAX_BUFFER_SIZE); - rcv->clen = HTRACED_MAX_BUFFER_SIZE; - } - rcv->cbuf = malloc(rcv->clen); - if (!rcv->cbuf) { - htrace_log(tracer->lg, "htraced_rcv_create: failed to malloc %"PRId64 - " bytes for the htraced circular buffer.\n", rcv->clen); - goto error_free_hcli; - } - // Send when the buffer gets 1/4 full. - rcv->send_threshold = rcv->clen * 0.25; - rcv->cstart = 0; - rcv->cend = 0; - rcv->last_send_ms = monotonic_now_ms(tracer->lg); - rcv->sbuf = malloc(HTRACED_MAX_MSG_LEN); - if (!rcv->sbuf) { - goto error_free_cbuf; + buf_len = htraced_get_bounded_u64(tracer->lg, conf, + HTRACED_BUFFER_SIZE_KEY, HTRACED_MIN_BUFFER_SIZE, + HTRACED_MAX_BUFFER_SIZE) / 2; + for (i = 0; i < HTRACED_NUM_BUFS; i++) { + rcv->sbuf[i] = htraced_sbuf_alloc(buf_len); + if (!rcv->sbuf[i]) { + htrace_log(tracer->lg, "htraced_rcv_create: htraced_sbuf_alloc(" + "buf_len=%"PRId64") failed: OOM.\n", buf_len); + goto error_free_bufs; + } } + send_fraction = htraced_get_bounded_double(tracer->lg, conf, + HTRACED_BUFFER_SEND_TRIGGER_FRACTION, 0.1, 1.0); + rcv->send_threshold = buf_len * send_fraction; + if (rcv->send_threshold > buf_len) { + rcv->send_threshold = buf_len; + } + rcv->last_send_ms = monotonic_now_ms(tracer->lg); ret = pthread_mutex_init(&rcv->lock, NULL); if (ret) { htrace_log(tracer->lg, "htraced_rcv_create: pthread_mutex_init " "error %d: %s\n", ret, terror(ret)); - goto error_free_sbuf; + goto error_free_bufs; } - ret = pthread_cond_init(&rcv->cond, NULL); + ret = pthread_cond_init(&rcv->bg_cond, NULL); if (ret) { - htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init " - "error %d: %s\n", ret, terror(ret)); + htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init(" + "bg_cond) error %d: %s\n", ret, terror(ret)); goto error_free_lock; } + ret = pthread_cond_init(&rcv->flush_cond, NULL); + if (ret) { + htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init(" + "flush_cond) error %d: %s\n", ret, terror(ret)); + goto error_free_bg_cond; + } ret = pthread_create(&rcv->xmit_thread, NULL, run_htraced_xmit_manager, rcv); if (ret) { htrace_log(tracer->lg, "htraced_rcv_create: failed to create xmit thread: " "error %d: %s\n", ret, terror(ret)); - goto error_free_cvar; + goto error_free_flush_cond; } htrace_log(tracer->lg, "Initialized htraced receiver for %s" ", flush_interval_ms=%" PRId64 ", send_threshold=%" PRId64 ", write_timeo_ms=%" PRId64 ", read_timeo_ms=%" PRId64 - ", clen=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli), + ", buf_len=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli), rcv->flush_interval_ms, rcv->send_threshold, - write_timeo_ms, read_timeo_ms, rcv->clen); + write_timeo_ms, read_timeo_ms, buf_len); return (struct htrace_rcv*)rcv; -error_free_cvar: - pthread_cond_destroy(&rcv->cond); +error_free_flush_cond: + pthread_cond_destroy(&rcv->flush_cond); +error_free_bg_cond: + pthread_cond_destroy(&rcv->bg_cond); error_free_lock: pthread_mutex_destroy(&rcv->lock); -error_free_sbuf: - free(rcv->sbuf); -error_free_cbuf: - free(rcv->cbuf); -error_free_hcli: +error_free_bufs: + for (i = 0; i < HTRACED_NUM_BUFS; i++) { + htraced_sbuf_free(rcv->sbuf[i]); + } hrpc_client_free(rcv->hcli); error_free_rcv: free(rcv); @@ -331,7 +437,7 @@ void* run_htraced_xmit_manager(void *data) htraced_xmit(rcv, now); } if (rcv->shutdown) { - while (cbuf_used(rcv) > 0) { + while (!htraced_sbufs_empty(rcv)) { htraced_xmit(rcv, now); } break; @@ -344,7 +450,7 @@ void* run_htraced_xmit_manager(void *data) // buffered. wakeup = now + (rcv->flush_interval_ms / 2); ms_to_timespec(wakeup, &wakeup_ts); - ret = pthread_cond_timedwait(&rcv->cond, &rcv->lock, &wakeup_ts); + ret = pthread_cond_timedwait(&rcv->bg_cond, &rcv->lock, &wakeup_ts); if ((ret != 0) && (ret != ETIMEDOUT)) { htrace_log(lg, "run_htraced_xmit_manager: pthread_cond_timedwait " "error: %d (%s)\n", ret, terror(ret)); @@ -367,67 +473,107 @@ void* run_htraced_xmit_manager(void *data) */ static int should_xmit(struct htraced_rcv *rcv, uint64_t now) { - uint64_t used; + uint64_t off = rcv->sbuf[rcv->active_buf]->off; - used = cbuf_used(rcv); - if (used > rcv->send_threshold) { + if (off > rcv->send_threshold) { // We have buffered a lot of bytes, so let's send. return 1; } if (now - rcv->last_send_ms > rcv->flush_interval_ms) { // It's been too long since the last transmission, so let's send. - if (used > 0) { + if (off > 0) { return 1; } } return 0; // Let's wait. } +#define DEFAULT_PID_STR "DefaultPid" +#define DEFAULT_PID_STR_LEN (sizeof(DEFAULT_PID_STR) - 1) +#define SPANS_STR "Spans" +#define SPANS_STR_LEN (sizeof(SPANS_STR) - 1) + +/** + * Write the prequel to the WriteSpans message. + */ +static int add_writespans_prequel(struct htraced_rcv *rcv, + struct htraced_sbuf *sbuf, uint8_t *prequel) +{ + struct cmp_bcopy_ctx bctx; + struct cmp_ctx_s *ctx = (struct cmp_ctx_s *)&bctx; + cmp_bcopy_ctx_init(&bctx, prequel, MAX_WRITESPANS_PREQUEL_LEN); + if (!cmp_write_fixmap(ctx, 2)) { + return -1; + } + if (!cmp_write_fixstr(ctx, DEFAULT_PID_STR, DEFAULT_PID_STR_LEN)) { + return -1; + } + if (!cmp_write_str(ctx, rcv->tracer->prid, strlen(rcv->tracer->prid))) { + return -1; + } + if (!cmp_write_fixstr(ctx, SPANS_STR, SPANS_STR_LEN)) { + return -1; + } + if (!cmp_write_array(ctx, sbuf->num_spans)) { + return -1; + } + return bctx.off; +} + /** * Send all the spans which we have buffered. * * @param rcv The htraced receiver. - * @param slen The length of the buffer to send. + * @param sbuf The span buffer to send. * * @return 1 on success; 0 otherwise. */ -static int htraced_xmit_impl(struct htraced_rcv *rcv, int32_t slen) +static int htraced_xmit_impl(struct htraced_rcv *rcv, struct htraced_sbuf *sbuf) { struct htrace_log *lg = rcv->tracer->lg; - int res, retval = 0; - char *prequel = NULL, *err = NULL, *resp = NULL; + uint8_t prequel[MAX_WRITESPANS_PREQUEL_LEN]; + int prequel_len, ret; + char *err = NULL, *resp = NULL; size_t resp_len = 0; - res = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS, - rcv->sbuf, slen, &err, (void**)&resp, &resp_len); - if (!res) { + prequel_len = add_writespans_prequel(rcv, sbuf, prequel); + if (prequel_len < 0) { + htrace_log(lg, "htrace_xmit_impl: add_writespans_prequel failed.\n"); + ret = 0; + goto done; + } + ret = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS, + prequel, prequel_len, sbuf->buf, sbuf->off, + &err, (void**)&resp, &resp_len); + if (!ret) { htrace_log(lg, "htrace_xmit_impl: hrpc_client_call failed.\n"); - retval = 0; + goto done; } else if (err) { htrace_log(lg, "htrace_xmit_impl: server returned error: %s\n", err); - retval = 0; - } else { - retval = 1; + ret = 0; + goto done; } - free(prequel); + ret = 1; +done: free(err); free(resp); - return retval; + return ret; } static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now) { - int32_t slen; int tries = 0; + struct htraced_sbuf *sbuf; - // Move span data from the circular buffer into the transmission buffer. - slen = cbuf_to_sbuf(rcv); + // Flip to the other buffer. + sbuf = rcv->sbuf[rcv->active_buf]; + rcv->active_buf = !rcv->active_buf; // Release the lock while doing network I/O, so that we don't block threads // adding spans. pthread_mutex_unlock(&rcv->lock); while (1) { - int retry, success = htraced_xmit_impl(rcv, slen); + int retry, success = htraced_xmit_impl(rcv, sbuf); if (success) { break; } @@ -440,175 +586,100 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now) break; } } + sbuf->off = 0; + sbuf->num_spans = 0; pthread_mutex_lock(&rcv->lock); rcv->last_send_ms = now; -} - -/** - * Move data from the circular buffer into the transmission buffer, advancing - * the circular buffer's start offset. - * - * This function must be called with the lock held. - * - * Note that we rely on HTRACED_MAX_MSG_LEN being < 4GB in this function for - * correctness on 32-bit systems. - * - * @param rcv The htraced receiver. - * - * @return The amount of data copied. - */ -static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv) -{ - const char * const SUFFIX = "]}"; - int SUFFIX_LEN = sizeof(SUFFIX) - 1; - int rem = HTRACED_MAX_MSG_LEN - SUFFIX_LEN; - size_t amt; - char *sbuf = (char*)rcv->sbuf; - - fwdprintf(&sbuf, &rem, "{\"DefaultPid\":\"%s\",\"Spans\":[", - rcv->tracer->prid); - if (rcv->cstart < rcv->cend) { - amt = rcv->cend - rcv->cstart; - if (amt > rem) { - amt = rem; - } - memcpy(sbuf, rcv->cbuf + rcv->cstart, amt); - sbuf += amt; - rem -= amt; - rcv->cstart += amt; - } else { - amt = rcv->clen - rcv->cstart; - if (amt > rem) { - amt = rem; - } - memcpy(sbuf, rcv->cbuf + rcv->cstart, amt); - sbuf += amt; - rem -= amt; - rcv->cstart += amt; - if (rem > 0) { - amt = rcv->cend; - if (amt > rem) { - amt = rem; - } - memcpy(sbuf, rcv->cbuf, amt); - sbuf += amt; - rem -= amt; - rcv->cstart = amt; - } - } - // overwrite last comma - rem++; - sbuf--; - rem += SUFFIX_LEN; - fwdprintf(&sbuf, &rem, "%s", SUFFIX); - return HTRACED_MAX_MSG_LEN - rem; -} - -/** - * Returns the current number of bytes used in the htraced circular buffer. - * Must be called under the lock. - * - * @param rcv The htraced receiver. - * - * @return The number of bytes used. - */ -static uint64_t cbuf_used(const struct htraced_rcv *rcv) -{ - if (rcv->cstart <= rcv->cend) { - return rcv->cend - rcv->cstart; - } - return rcv->clen - (rcv->cstart - rcv->cend); + pthread_cond_broadcast(&rcv->flush_cond); } static void htraced_rcv_add_span(struct htrace_rcv *r, struct htrace_span *span) { - int json_len, tries, retry; - uint64_t used, rem; + int tries, retry; + uint64_t rem, off; struct htraced_rcv *rcv = (struct htraced_rcv *)r; + struct htraced_sbuf *sbuf; struct htrace_log *lg = rcv->tracer->lg; + struct cmp_counter_ctx cctx; + struct cmp_bcopy_ctx bctx; + uint64_t msgpack_len; + + // Determine the length of the span when serialized to msgpack. + cmp_counter_ctx_init(&cctx); + if (!span_write_msgpack(span, (cmp_ctx_t*)&cctx)) { + htrace_log(lg, "htraced_rcv_add_span: span_write_msgpack failed.\n"); + return; + } + msgpack_len = cctx.count; - json_len = span_json_size(span); + // Try to get enough space in the current buffer. tries = 0; do { pthread_mutex_lock(&rcv->lock); - used = cbuf_used(rcv); - if (used + json_len >= rcv->clen) { - pthread_cond_signal(&rcv->cond); + sbuf = rcv->sbuf[rcv->active_buf]; + rem = htraced_sbuf_remaining(sbuf); + if (rem < msgpack_len) { + pthread_cond_signal(&rcv->bg_cond); pthread_mutex_unlock(&rcv->lock); tries++; retry = tries < HTRACED_MAX_ADD_TRIES; htrace_log(lg, "htraced_rcv_add_span: not enough space in the " - "circular buffer. Have %" PRId64 ", need %d" - ". %s...\n", (rcv->clen - used), json_len, + "current buffer. Have %" PRId64 ", need %" + PRId64 ". %s...\n", rem, msgpack_len, (retry ? "Retrying" : "Giving up")); if (retry) { pthread_yield(); continue; } + pthread_mutex_unlock(&rcv->lock); return; } } while (0); // OK, now we have the lock, and we know that there is enough space in the - // circular buffer. - rem = rcv->clen - rcv->cend; - if (rem < json_len) { - // Handle a 'torn write' where the circular buffer loops around to the - // beginning in the middle of the write. - char *temp = malloc(json_len); - if (!temp) { - htrace_log(lg, "htraced_rcv_add_span: failed to malloc %d byte " - "buffer for torn write.\n", json_len); - goto done; - } - span_json_sprintf(span, json_len, temp); - temp[json_len - 1] = ','; - memcpy(rcv->cbuf + rcv->cend, temp, rem); - memcpy(rcv->cbuf, temp + rem, json_len - rem); - rcv->cend = json_len - rem; - free(temp); - } else { - span_json_sprintf(span, json_len, rcv->cbuf + rcv->cend); - rcv->cbuf[rcv->cend + json_len - 1] = ','; - rcv->cend += json_len; - } - used += json_len; - if (used > rcv->send_threshold) { - pthread_cond_signal(&rcv->cond); + // current buffer. + off = sbuf->off; + cmp_bcopy_ctx_init(&bctx, sbuf->buf + off, msgpack_len); + bctx.base.write = cmp_bcopy_write_nocheck_fn; + span_write_msgpack(span, (cmp_ctx_t*)&bctx); + off += msgpack_len; + sbuf->off = off; + sbuf->num_spans++; + if (off > rcv->send_threshold) { + pthread_cond_signal(&rcv->bg_cond); } -done: pthread_mutex_unlock(&rcv->lock); } static void htraced_rcv_flush(struct htrace_rcv *r) { struct htraced_rcv *rcv = (struct htraced_rcv *)r; + uint64_t now; + // Note: This assumes that we only flush one buffer at once, and + // that we flush buffers in order. If we revisit those assumptions we'll + // need to change this. + // The SpanReceiver flush is only used for testing anyway. + pthread_mutex_lock(&rcv->lock); + now = monotonic_now_ms(rcv->tracer->lg); while (1) { - pthread_mutex_lock(&rcv->lock); - if (cbuf_used(rcv) == 0) { - // If the buffer is empty, we're done. - // Note that there is no guarantee that we'll ever be done if spans - // are being added continuously throughout the flush. This is OK, - // since flush() is actually only used by unit tests. - // We could do something more clever here, but it would be a lot more - // complex. - pthread_mutex_unlock(&rcv->lock); + if (rcv->last_send_ms >= now) { + break; + } + if (htraced_sbufs_empty(rcv)) { break; } - // Get the xmit thread to send what it can, by resetting the "last send - // time" to the oldest possible monotonic time. rcv->last_send_ms = 0; - pthread_cond_signal(&rcv->cond); - pthread_mutex_unlock(&rcv->lock); + pthread_cond_wait(&rcv->flush_cond, &rcv->lock); } + pthread_mutex_unlock(&rcv->lock); } static void htraced_rcv_free(struct htrace_rcv *r) { struct htraced_rcv *rcv = (struct htraced_rcv *)r; struct htrace_log *lg; - int ret; + int i, ret; if (!rcv) { return; @@ -618,24 +689,30 @@ static void htraced_rcv_free(struct htrace_rcv *r) hrpc_client_get_endpoint(rcv->hcli)); pthread_mutex_lock(&rcv->lock); rcv->shutdown = 1; - pthread_cond_signal(&rcv->cond); + pthread_cond_signal(&rcv->bg_cond); pthread_mutex_unlock(&rcv->lock); ret = pthread_join(rcv->xmit_thread, NULL); if (ret) { htrace_log(lg, "htraced_rcv_free: pthread_join " "error %d: %s\n", ret, terror(ret)); } - free(rcv->cbuf); - free(rcv->sbuf); + for (i = 0; i < HTRACED_NUM_BUFS; i++) { + htraced_sbuf_free(rcv->sbuf[i]); + } hrpc_client_free(rcv->hcli); ret = pthread_mutex_destroy(&rcv->lock); if (ret) { htrace_log(lg, "htraced_rcv_free: pthread_mutex_destroy " "error %d: %s\n", ret, terror(ret)); } - ret = pthread_cond_destroy(&rcv->cond); + ret = pthread_cond_destroy(&rcv->bg_cond); + if (ret) { + htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy(bg_cond) " + "error %d: %s\n", ret, terror(ret)); + } + ret = pthread_cond_destroy(&rcv->flush_cond); if (ret) { - htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy " + htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy(flush_cond) " "error %d: %s\n", ret, terror(ret)); } free(rcv); http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/test/cmp_util-unit.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/test/cmp_util-unit.c b/htrace-c/src/test/cmp_util-unit.c new file mode 100644 index 0000000..bfe4edc --- /dev/null +++ b/htrace-c/src/test/cmp_util-unit.c @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/span.h" +#include "test/span_util.h" +#include "test/test.h" +#include "util/cmp.h" +#include "util/cmp_util.h" + +#include <inttypes.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#define NUM_TEST_SPANS 3 +#define TEST_BUF_LENGTH (8UL * 1024UL * 1024UL) + +static struct htrace_span **setup_test_spans(void) +{ + struct htrace_span **spans = + xcalloc(sizeof(struct htrace_span*) * NUM_TEST_SPANS); + + spans[0] = xcalloc(sizeof(struct htrace_span)); + spans[0]->desc = xstrdup("FirstSpan"); + spans[0]->begin_ms = 1927; + spans[0]->end_ms = 2000; + spans[0]->span_id = 1; + + spans[1] = xcalloc(sizeof(struct htrace_span)); + spans[1]->desc = xstrdup("SecondSpan"); + spans[1]->begin_ms = 1950; + spans[1]->end_ms = 2000; + spans[1]->span_id = 0xffffffffffffffffULL; + spans[1]->prid = xstrdup("SecondSpanProc"); + spans[1]->num_parents = 1; + spans[1]->parent.single = 1; + + spans[2] = xcalloc(sizeof(struct htrace_span)); + spans[2]->desc = xstrdup("ThirdSpan"); + spans[2]->begin_ms = 1969; + spans[2]->end_ms = 1997; + spans[2]->span_id = 0xcfcfcfcfcfcfcfcfULL; + spans[2]->prid = xstrdup("ThirdSpanProc"); + spans[2]->num_parents = 2; + spans[2]->parent.list = xcalloc(sizeof(uint64_t) * 2); + spans[2]->parent.list[0] = 1; + spans[2]->parent.list[1] = 0xffffffffffffffffULL; + + return spans; +} + +static int serialize_test_spans(struct htrace_span **test_spans, cmp_ctx_t *ctx) +{ + int i; + for (i = 0; i < NUM_TEST_SPANS; i++) { + EXPECT_INT_EQ(1, span_write_msgpack(test_spans[i], ctx)); + } + return EXIT_SUCCESS; +} + +static int test_serialize_spans(struct htrace_span **test_spans) +{ + int i; + struct htrace_span *span; + struct cmp_counter_ctx cctx; + struct cmp_bcopy_ctx bctx; + char *buf; + char err[1024]; + size_t err_len = sizeof(err); + + cmp_counter_ctx_init(&cctx); + EXPECT_INT_ZERO(serialize_test_spans(test_spans, (cmp_ctx_t *)&cctx)); + + buf = xcalloc(TEST_BUF_LENGTH); + cmp_bcopy_ctx_init(&bctx, buf, TEST_BUF_LENGTH); + EXPECT_INT_ZERO(serialize_test_spans(test_spans, (cmp_ctx_t *)&bctx)); + EXPECT_UINT64_EQ(cctx.count, bctx.off); + EXPECT_UINT64_EQ(TEST_BUF_LENGTH, bctx.len); + + bctx.off = 0; + for (i = 0; i < NUM_TEST_SPANS; i++) { + span = span_read_msgpack((cmp_ctx_t*)&bctx, err, err_len); + EXPECT_STR_EQ("", err); + EXPECT_NONNULL(span); + EXPECT_INT_ZERO(span_compare(test_spans[i], span)); + htrace_span_free(span); + } + + free(buf); + return EXIT_SUCCESS; +} + +int main(void) +{ + int i; + struct htrace_span **test_spans; + + test_spans = setup_test_spans(); + EXPECT_NONNULL(test_spans); + EXPECT_INT_ZERO(test_serialize_spans(test_spans)); + for (i = 0; i < NUM_TEST_SPANS; i++) { + htrace_span_free(test_spans[i]); + } + free(test_spans); + return EXIT_SUCCESS; +} + +// vim: ts=4:sw=4:tw=79:et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/test/span_util.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/test/span_util.c b/htrace-c/src/test/span_util.c index fc4b041..8a615b1 100644 --- a/htrace-c/src/test/span_util.c +++ b/htrace-c/src/test/span_util.c @@ -18,6 +18,7 @@ #include "core/span.h" #include "test/span_util.h" +#include "util/cmp.h" #include "util/log.h" #include <errno.h> @@ -291,4 +292,194 @@ int span_compare(struct htrace_span *a, struct htrace_span *b) return compare_parents(a, b); } +static int span_read_key_str(struct cmp_ctx_s *ctx, char *out, + uint32_t out_len, char *err, size_t err_len) +{ + uint32_t size = 0; + + err[0] = '\0'; + if (!cmp_read_str_size(ctx, &size)) { + snprintf(err, err_len, "span_read_key_str: cmp_read_str_size failed."); + return 0; + } + if (size >= out_len) { + snprintf(err, err_len, "span_read_key_str: size of key string was " + "%"PRId32", but we can only handle key strings less than " + "%"PRId32" bytes.", size, out_len); + return 0; + } + if (!ctx->read(ctx, out, size)) { + snprintf(err, err_len, "span_read_key_str: ctx->read failed for " + "%"PRId32"-byte key string.", size); + return 0; + } + out[size] = '\0'; + return 1; +} + +static char *cmp_read_malloced_string(struct cmp_ctx_s *ctx, const char *what, + char *err, size_t err_len) +{ + uint32_t size = 0; + char *str; + + err[0] = '\0'; + if (!cmp_read_str_size(ctx, &size)) { + snprintf(err, err_len, "cmp_read_malloced_string: cmp_read_str_size " + "failed for %s.", what); + return NULL; + } + str = malloc(size + 1); + if (!str) { + snprintf(err, err_len, "cmp_read_malloced_string: failed to malloc " + "failed for %d-byte string for %s.", size + 1, what); + return NULL; + } + if (!ctx->read(ctx, str, size)) { + snprintf(err, err_len, "cmp_read_malloced_string: failed to read " + "%"PRId32"-byte string for %s.", size, what); + free(str); + return NULL; + } + str[size] = '\0'; + return str; +} + +static void span_parse_msgpack_parents(struct cmp_ctx_s *ctx, + struct htrace_span *span, char *err, size_t err_len) +{ + uint32_t i, size; + + err[0] = '\0'; + if (span->num_parents > 1) { + free(span->parent.list); + span->parent.list = NULL; + } + span->parent.single = 0; + span->num_parents = 0; + if (!cmp_read_array(ctx, &size)) { + snprintf(err, err_len, "span_parse_msgpack_parents: cmp_read_array " + "failed."); + return; + } + if (size == 1) { + if (!cmp_read_u64(ctx, &span->parent.single)) { + snprintf(err, err_len, "span_parse_msgpack_parents: cmp_read_u64 " + "for single child ID failed"); + return; + } + } else if (size > 1) { + span->parent.list = malloc(sizeof(uint64_t) * size); + if (!span->parent.list) { + snprintf(err, err_len, "span_parse_msgpack_parents: failed to " + "malloc %"PRId32"-entry parent array.", size); + return; + } + for (i = 0; i < size; i++) { + if (!cmp_read_u64(ctx, &span->parent.list[i])) { + snprintf(err, err_len, "span_parse_msgpack_parents: cmp_read_u64 " + "for child %d ID failed", i); + free(span->parent.list); + span->parent.list = NULL; + return; + } + } + } + span->num_parents = size; +} + +struct htrace_span *span_read_msgpack(struct cmp_ctx_s *ctx, + char *err, size_t err_len) +{ + struct htrace_span *span = NULL; + uint32_t map_size = 0; + char key[8]; + + err[0] = '\0'; + span = calloc(1, sizeof(*span)); + if (!span) { + snprintf(err, err_len, "span_read_msgpack: OOM allocating " + "htrace_span."); + goto error; + } + if (!cmp_read_map(ctx, &map_size)) { + snprintf(err, err_len, "span_read_msgpack: cmp_read_map failed to " + "read enclosing map object.\n"); + goto error; + } + while (map_size > 0) { + if (!span_read_key_str(ctx, key, sizeof(key), err, err_len)) { + goto error; + } + switch (key[0]) { + case 'd': + if (span->desc) { + free(span->desc); + } + span->desc = cmp_read_malloced_string(ctx, "description", + err, err_len); + if (err[0]) { + goto error; + } + break; + case 'b': + if (!cmp_read_u64(ctx, &span->begin_ms)) { + snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 " + "failed for span->begin_ms."); + goto error; + } + break; + case 'e': + if (!cmp_read_u64(ctx, &span->end_ms)) { + snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 " + "failed for span->end_ms."); + goto error; + } + break; + case 's': + if (!cmp_read_u64(ctx, &span->span_id)) { + snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 " + "failed for span->span_id"); + goto error; + } + break; + case 'r': + if (span->prid) { + free(span->prid); + } + span->prid = cmp_read_malloced_string(ctx, "process_id", + err, err_len); + if (err[0]) { + goto error; + } + break; + case 'p': + span_parse_msgpack_parents(ctx, span, err, err_len); + if (err[0]) { + goto error; + } + break; + default: + snprintf(err, err_len, "span_read_msgpack: can't understand key " + "'%s'.\n", key); + goto error; + } + map_size--; + } + // Description cannot be NULL. + if (!span->desc) { + span->desc = strdup(""); + if (!span->desc) { + snprintf(err, err_len, "span_read_msgpack: OOM allocating empty " + "description string."); + goto error; + } + } + return span; + +error: + htrace_span_free(span); + return NULL; +} + // vim:ts=4:sw=4:et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/test/span_util.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/test/span_util.h b/htrace-c/src/test/span_util.h index 393e213..8ab197f 100644 --- a/htrace-c/src/test/span_util.h +++ b/htrace-c/src/test/span_util.h @@ -22,6 +22,7 @@ #include <stdint.h> #include <unistd.h> /* for size_t */ +struct cmp_ctx_s; struct htrace_span; /** @@ -61,6 +62,19 @@ void span_json_parse(const char *in, struct htrace_span **span, */ int span_compare(struct htrace_span *a, struct htrace_span *b); +/** + * Read a span from the provided CMP context. + * + * @param ctx The CMP context. + * @param err (out param) On error, where the error message will be + * written. Will be set to the empty string on success. + * @param err_len The length of the error buffer. Must be nonzero. + * + * @return The span on success; NULL otherwise. + */ +struct htrace_span *span_read_msgpack(struct cmp_ctx_s *ctx, + char *err, size_t err_len); + #endif // vim:ts=4:sw=4:et
