HTRACE-164. htrace hrpc: use msgpack for serialization (cmccabe)

Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/33ab96fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/33ab96fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/33ab96fb

Branch: refs/heads/master
Commit: 33ab96fbe6e5d12f75ed4f054d693dfd211aa586
Parents: 08d54c9
Author: Colin P. Mccabe <[email protected]>
Authored: Fri May 1 18:36:10 2015 -0700
Committer: Colin P. Mccabe <[email protected]>
Committed: Mon May 11 22:46:20 2015 -0700

----------------------------------------------------------------------
 LICENSE.txt                                     |    8 +
 htrace-c/src/CMakeLists.txt                     |    6 +
 htrace-c/src/core/conf.c                        |    1 +
 htrace-c/src/core/htrace.h                      |    7 +
 htrace-c/src/core/span.c                        |   87 +-
 htrace-c/src/core/span.h                        |   21 +-
 htrace-c/src/receiver/hrpc.c                    |   26 +-
 htrace-c/src/receiver/hrpc.h                    |    9 +-
 htrace-c/src/receiver/htraced.c                 |  545 ++--
 htrace-c/src/test/cmp_util-unit.c               |  123 +
 htrace-c/src/test/span_util.c                   |  191 ++
 htrace-c/src/test/span_util.h                   |   14 +
 htrace-c/src/util/cmp.c                         | 2674 ++++++++++++++++++
 htrace-c/src/util/cmp.h                         |  441 +++
 htrace-c/src/util/cmp_util.c                    |   90 +
 htrace-c/src/util/cmp_util.h                    |   76 +
 htrace-htraced/src/go/Godeps/Godeps.json        |    4 +
 htrace-htraced/src/go/gobuild.sh                |   11 +-
 .../go/src/org/apache/htrace/client/hclient.go  |   16 +-
 .../src/go/src/org/apache/htrace/common/rpc.go  |    2 +-
 .../go/src/org/apache/htrace/htraced/hrpc.go    |   13 +-
 21 files changed, 4093 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index fa6051c..19dd8f7 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -276,3 +276,11 @@ 
https://github.com/marionettejs/backbone.babysitter/blob/v0.1.6/LICENSE.md
 Backbone.Marionette is a composite application library for Backbone.js.
 It is MIT licensed:
 https://github.com/marionettejs/backbone.marionette/blob/v2.4.1/license.txt
+
+CMP is an implementation of the MessagePack serialization format in
+C.  It is licensed under the MIT license:
+https://github.com/camgunz/cmp/blob/master/LICENSE
+
+go-codec is an implementation of several serialization and deserialization
+codecs in Go.  It is licensed under the MIT license:
+https://github.com/ugorji/go/blob/master/LICENSE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/htrace-c/src/CMakeLists.txt b/htrace-c/src/CMakeLists.txt
index 3d51968..da08ee9 100644
--- a/htrace-c/src/CMakeLists.txt
+++ b/htrace-c/src/CMakeLists.txt
@@ -83,6 +83,8 @@ set(SRC_ALL
     sampler/never.c
     sampler/prob.c
     sampler/sampler.c
+    util/cmp.c
+    util/cmp_util.c
     util/htable.c
     util/log.c
     util/process_id.c
@@ -130,6 +132,10 @@ macro(add_utest utest)
     add_test(${utest} ${CMAKE_CURRENT_BINARY_DIR}/${utest} ${utest})
 endmacro(add_utest)
 
+add_utest(cmp_util-unit
+    test/cmp_util-unit.c
+)
+
 add_utest(conf-unit
     test/conf-unit.c
 )

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/core/conf.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/core/conf.c b/htrace-c/src/core/conf.c
index a561906..d9eed2a 100644
--- a/htrace-c/src/core/conf.c
+++ b/htrace-c/src/core/conf.c
@@ -34,6 +34,7 @@
      ";" HTRACED_READ_TIMEO_MS_KEY "=60000"\
      ";" HTRACE_PROCESS_ID "=%{tname}/%{ip}"\
      ";" HTRACED_ADDRESS_KEY "=localhost:9095"\
+     ";" HTRACED_BUFFER_SEND_TRIGGER_FRACTION "=0.50"\
     )
 
 static int parse_key_value(char *str, char **key, char **val)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/core/htrace.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/core/htrace.h b/htrace-c/src/core/htrace.h
index 4e2f1f9..816c222 100644
--- a/htrace-c/src/core/htrace.h
+++ b/htrace-c/src/core/htrace.h
@@ -135,6 +135,13 @@ extern  "C" {
 #define HTRACED_BUFFER_SIZE_KEY "htraced.buffer.size"
 
 /**
+ * The fraction of the buffer that needs to be full to trigger the spans to be
+ * sent from the htraced span receiver.
+ */
+#define HTRACED_BUFFER_SEND_TRIGGER_FRACTION \
+    "htraced.buffer.send.trigger.fraction"
+
+/**
  * The process ID string to use.
  *
  * %{ip} will be replaced by an IP address;

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/core/span.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/core/span.c b/htrace-c/src/core/span.c
index 13ba3cf..790afa5 100644
--- a/htrace-c/src/core/span.c
+++ b/htrace-c/src/core/span.c
@@ -19,6 +19,7 @@
 #include "core/span.h"
 #include "receiver/receiver.h"
 #include "sampler/sampler.h"
+#include "util/cmp.h"
 #include "util/log.h"
 #include "util/rand.h"
 #include "util/string.h"
@@ -26,6 +27,7 @@
 
 #include <inttypes.h>
 #include <stdint.h>
+#include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
@@ -127,7 +129,7 @@ void htrace_span_sort_and_dedupe_parents(struct htrace_span 
*span)
  * containing the span contents.  With buf non-NULL, we will write the span
  * contents to the provided buffer.
  *
- * @param scope             The scope
+ * @param span              The span
  * @param max               The maximum number of bytes to write to buf.
  * @param buf               If non-NULL, where the string will be written.
  *
@@ -147,7 +149,7 @@ static int span_json_sprintf_impl(const struct htrace_span 
*span,
     ret += fwdprintf(&buf, &max, "{\"s\":\"%016" PRIx64 "\",\"b\":%" PRId64
                  ",\"e\":%" PRId64",", span->span_id, span->begin_ms,
                  span->end_ms);
-    if (span->desc) {
+    if (span->desc[0]) {
         ret += fwdprintf(&buf, &max, "\"d\":\"%s\",", span->desc);
     }
     if (span->prid) {
@@ -174,14 +176,87 @@ static int span_json_sprintf_impl(const struct 
htrace_span *span,
     return ret + 1;
 }
 
-int span_json_size(const struct htrace_span *scope)
+int span_json_size(const struct htrace_span *span)
 {
-    return span_json_sprintf_impl(scope, 0, NULL);
+    return span_json_sprintf_impl(span, 0, NULL);
 }
 
-void span_json_sprintf(const struct htrace_span *scope, int max, void *buf)
+void span_json_sprintf(const struct htrace_span *span, int max, void *buf)
 {
-    span_json_sprintf_impl(scope, max, buf);
+    span_json_sprintf_impl(span, max, buf);
+}
+
+int span_write_msgpack(const struct htrace_span *span, cmp_ctx_t *ctx)
+{
+    int i, num_parents;
+    uint16_t map_size =
+        1 + // desc
+        1 + // begin_ms
+        1 + // end_ms
+        1; // span_id
+
+    num_parents = span->num_parents;
+    if (span->prid) {
+        map_size++;
+    }
+    if (num_parents > 0) {
+        map_size++;
+    }
+    if (!cmp_write_map16(ctx, map_size)) {
+        return 0;
+    }
+    if (!cmp_write_fixstr(ctx, "d", 1)) {
+        return 0;
+    }
+    if (!cmp_write_str16(ctx, span->desc, strlen(span->desc))) {
+        return 0;
+    }
+    if (!cmp_write_fixstr(ctx, "b", 1)) {
+        return 0;
+    }
+    if (!cmp_write_u64(ctx, span->begin_ms)) {
+        return 0;
+    }
+    if (!cmp_write_fixstr(ctx, "e", 1)) {
+        return 0;
+    }
+    if (!cmp_write_u64(ctx, span->end_ms)) {
+        return 0;
+    }
+    if (!cmp_write_fixstr(ctx, "s", 1)) {
+        return 0;
+    }
+    if (!cmp_write_u64(ctx, span->span_id)) {
+        return 0;
+    }
+    if (span->prid) {
+        if (!cmp_write_fixstr(ctx, "r", 1)) {
+            return 0;
+        }
+        if (!cmp_write_str16(ctx, span->prid, strlen(span->prid))) {
+            return 0;
+        }
+    }
+    if (num_parents > 0) {
+        if (!cmp_write_fixstr(ctx, "p", 1)) {
+            return 0;
+        }
+        if (!cmp_write_array16(ctx, num_parents)) {
+            return 0;
+        }
+        if (num_parents == 1) {
+            if (!cmp_write_u64(ctx, span->parent.single)) {
+                return 0;
+            }
+        } else {
+            for (i = 0; i < num_parents; i++) {
+                if (!cmp_write_u64(ctx, span->parent.list[i])) {
+                    return 0;
+                }
+            }
+        }
+    }
+    return 1;
 }
 
 // vim:ts=4:sw=4:et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/core/span.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/core/span.h b/htrace-c/src/core/span.h
index b19bd94..710cff7 100644
--- a/htrace-c/src/core/span.h
+++ b/htrace-c/src/core/span.h
@@ -29,6 +29,7 @@
 
 #include <stdint.h>
 
+struct cmp_ctx_s;
 struct htracer;
 
 struct htrace_span {
@@ -105,16 +106,6 @@ void htrace_span_free(struct htrace_span *span);
 void htrace_span_sort_and_dedupe_parents(struct htrace_span *span);
 
 /**
- * Escape a JSON string.  Specifically, put backslashes before double quotes 
and
- * other backslashes.
- *
- * @param in            The string to escape.
- *
- * @param out           The escaped string.  Malloced. NULL on OOM.
- */
-char *json_escape(const char *in);
-
-/**
  * Get the buffer size that would be needed to serialize this span to a buffer.
  *
  * @param span          The span.
@@ -133,6 +124,16 @@ int span_json_size(const struct htrace_span *span);
  */
 void span_json_sprintf(const struct htrace_span *span, int max, void *buf);
 
+/**
+ * Write a span to the provided CMP context.
+ *
+ * @param span          The span.
+ * @param ctx           The CMP context.
+ *
+ * @return              0 on failure; 1 on success.
+ */
+int span_write_msgpack(const struct htrace_span *span, struct cmp_ctx_s *ctx);
+
 #endif
 
 // vim: ts=4:sw=4:et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/receiver/hrpc.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/hrpc.c b/htrace-c/src/receiver/hrpc.c
index 32a1f4a..f2e296d 100644
--- a/htrace-c/src/receiver/hrpc.c
+++ b/htrace-c/src/receiver/hrpc.c
@@ -52,7 +52,7 @@
  * Implements sending messages via HRPC.
  */
 
-#define HRPC_MAGIC 0x48545243U
+#define HRPC_MAGIC 0x43525448U
 
 #define MAX_HRPC_ERROR_LENGTH (4 * 1024 * 1024)
 
@@ -130,7 +130,8 @@ static int try_connect(struct hrpc_client *hcli, struct 
addrinfo *p);
 static int set_socket_read_and_write_timeout(struct hrpc_client *hcli,
                                              int sock);
 static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
-                        const void *req, size_t req_len, uint64_t *seq);
+                    const void *buf1, size_t buf1_len,
+                    const void *buf2, size_t buf2_len, uint64_t *seq);
 static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id,
                        uint64_t seq, char **err, void **resp,
                        size_t *resp_len);
@@ -185,7 +186,8 @@ void hrpc_client_free(struct hrpc_client *hcli)
 }
 
 int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
-                    const void *req, size_t req_len,
+                    const void *buf1, size_t buf1_len,
+                    const void *buf2, size_t buf2_len,
                     char **err, void **resp, size_t *resp_len)
 {
     uint64_t seq;
@@ -198,7 +200,8 @@ int hrpc_client_call(struct hrpc_client *hcli, uint32_t 
method_id,
     } else {
         htrace_log(hcli->lg, "hrpc_client_call: connection was already 
open\n");
     }
-    if (!hrpc_client_send_req(hcli, method_id, req, req_len, &seq)) {
+    if (!hrpc_client_send_req(hcli, method_id,
+                              buf1, buf1_len, buf2, buf2_len, &seq)) {
         goto error;
     }
     htrace_log(hcli->lg, "hrpc_client_call: waiting for response\n");
@@ -338,20 +341,25 @@ static int set_socket_read_and_write_timeout(struct 
hrpc_client *hcli,
 }
 
 static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
-                        const void *req, size_t req_len, uint64_t *seq)
+                    const void *buf1, size_t buf1_len,
+                    const void *buf2, size_t buf2_len, uint64_t *seq)
 {
+    // We use writev (scatter/gather I/O) here in order to avoid sending
+    // multiple packets when TCP_NODELAY is turned on.
     struct hrpc_req_header hdr;
-    struct iovec iov[2];
+    struct iovec iov[3];
 
     hdr.magic = htole64(HRPC_MAGIC);
     hdr.method_id = htole32(method_id);
     *seq = hcli->seq++;
     hdr.seq = htole64(*seq);
-    hdr.length = htole32(req_len);
+    hdr.length = htole32(buf1_len + buf2_len);
     iov[0].iov_base = &hdr;
     iov[0].iov_len = sizeof(hdr);
-    iov[1].iov_base = (void*)req;
-    iov[1].iov_len = req_len;
+    iov[1].iov_base = (void*)buf1;
+    iov[1].iov_len = buf1_len;
+    iov[2].iov_base = (void*)buf2;
+    iov[2].iov_len = buf2_len;
 
     while (1) {
         ssize_t res = writev(hcli->sock, iov, sizeof(iov)/sizeof(iov[0]));

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/receiver/hrpc.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/hrpc.h b/htrace-c/src/receiver/hrpc.h
index 8ec20be..c7126b0 100644
--- a/htrace-c/src/receiver/hrpc.h
+++ b/htrace-c/src/receiver/hrpc.h
@@ -60,8 +60,10 @@ void hrpc_client_free(struct hrpc_client *hcli);
  *
  * @param hcli              The HRPC client.
  * @param method_id         The method ID to use.
- * @param req               The request buffer to send.
- * @param req_len           The size of the request buffer to send.
+ * @param buf1              The first buffer to send.
+ * @param buf1_len          The size of the first buffer to send.
+ * @param buf2              The second buffer to send.
+ * @param buf2_len          The size of the second buffer to send.
  * @param err               (out param) Will be set to a malloced
  *                              NULL-terminated string if the server returned 
an
  *                              error response.  NULL otherwise.
@@ -72,7 +74,8 @@ void hrpc_client_free(struct hrpc_client *hcli);
  * @return                  0 on failure, 1 on success.
  */
 int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
-                     const void *req, size_t req_len,
+                     const void *buf1, size_t buf1_len,
+                     const void *buf2, size_t buf2_len,
                      char **err, void **resp, size_t *resp_len);
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/receiver/htraced.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c
index 7e4af81..8f392b7 100644
--- a/htrace-c/src/receiver/htraced.c
+++ b/htrace-c/src/receiver/htraced.c
@@ -23,6 +23,8 @@
 #include "receiver/hrpc.h"
 #include "receiver/receiver.h"
 #include "test/test.h"
+#include "util/cmp.h"
+#include "util/cmp_util.h"
 #include "util/log.h"
 #include "util/string.h"
 #include "util/time.h"
@@ -30,6 +32,7 @@
 #include <errno.h>
 #include <inttypes.h>
 #include <pthread.h>
+#include <stddef.h>
 #include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -38,22 +41,59 @@
 /**
  * @file htraced.c
  *
- * The htraced span receiver which implements sending spans on to htraced.
+ * The htraced span receiver which implements sending spans on to the htraced
+ * daemon.
+ *
+ * We send spans via the HRPC protocol.  HRPC consists of a simple fixed-size
+ * header specifying a magic number, the length, and a message type, followed 
by
+ * data in the msgpack serialization format.  The message type will determine
+ * the type of msgpack data.  Messages bodies are sent as maps, essentially
+ * making all fields optional and allowing the protocol to evolve.  See hrpc.c
+ * and rpc.go for the implementation of HRPC.
+ *
+ * Spans are serialized immediately when they are added to the buffer.  This is
+ * one of the advantages of msgpack-- it has a good streaming interface.  We do
+ * not need to keep around the span objects after htraced_rcv_add_span.
+ *
+ * The htraced receiver keeps two equally sized buffers around internally.
+ * While we are writing spans to one buffer, we can be sending the data from 
the
+ * other buffer over the wire.  The intention here is to avoid copies as much 
as
+ * possible.  In general, what we send over the wire is exactly what is in the
+ * buffer, except that we have to add a short "prequel" to it containing the
+ * other WriteSpansReq fields.
+ *
+ * Note that we may change the serialization in the future if we discover 
better
+ * alternatives.  Sending spans over HTTP as JSON will always be supported
+ * as a fallback.
+ */
+
+/**
+ * The maximum length of the message we will send to the server.
+ * This must be the same or shorter than MAX_HRPC_BODY_LENGTH in rpc.go.
+ */
+#define MAX_HRPC_LEN (64ULL * 1024ULL * 1024ULL)
+
+/**
+ * The maximum length of the prequel in a WriteSpans message.
  */
+#define MAX_WRITESPANS_PREQUEL_LEN 1024
 
 /**
- * The minimum buffer size to allow for the htraced circular buffer.
+ * The maximum length of the span data in a WriteSpans message.
+ */
+#define MAX_SPAN_DATA_LEN (MAX_HRPC_LEN - MAX_WRITESPANS_PREQUEL_LEN)
+
+/**
+ * The minimum total buffer size to allow.
  *
- * This should hopefully allow at least a few spans to be buffered.
+ * This should allow at least a few spans to be buffered.
  */
 #define HTRACED_MIN_BUFFER_SIZE (4ULL * 1024ULL * 1024ULL)
 
 /**
- * The maximum buffer size to allow for the htraced circular buffer.
- * This is mainly to avoid overflow.  Of course, you couldn't allocate a buffer
- * anywhere near this size anyway.
+ * The maximum total buffer size to allow.
  */
-#define HTRACED_MAX_BUFFER_SIZE 0x7ffffffffffffffLL
+#define HTRACED_MAX_BUFFER_SIZE (2ULL * MAX_SPAN_DATA_LEN)
 
 /**
  * The minimum number of milliseconds to allow for flush_interval_ms.
@@ -77,13 +117,6 @@
 #define HTRACED_READ_TIMEO_MS_MIN 50LL
 
 /**
- * The maximum size of the message we will send over the wire.
- * This also sets the size of the transmission buffer.
- * This constant must not be more than 2^^32 on 32-bit systems.
- */
-#define HTRACED_MAX_MSG_LEN (8ULL * 1024ULL * 1024ULL)
-
-/**
  * The maximum number of times we will try to add a span to the circular buffer
  * before giving up.
  */
@@ -101,6 +134,36 @@
  */
 #define HTRACED_SEND_RETRY_SLEEP_MS 5000
 
+/**
+ * The number of buffers used by htraced.
+ */
+#define HTRACED_NUM_BUFS 2
+
+/**
+ * An HTraced send buffer.
+ */
+struct htraced_sbuf {
+    /**
+     * Current offset within the buffer.
+     */
+    uint64_t off;
+
+    /**
+     * Length of the buffer.
+     */
+    uint64_t len;
+
+    /**
+     * The number of spans in the buffer.
+     */
+    uint64_t num_spans;
+
+    /**
+     * The buffer data.  This field actually has size 'len,' not size 1.
+     */
+    char buf[1];
+};
+
 /*
  * A span receiver that writes spans to htraced.
  */
@@ -136,45 +199,34 @@ struct htraced_rcv {
     struct hrpc_client *hcli;
 
     /**
-     * Length of the circular buffer.
-     */
-    uint64_t clen;
-
-    /**
-     * A circular buffer containing span data.
-     */
-    uint8_t *cbuf;
-
-    /**
-     * 'start' pointer of the circular buffer.
+     * The monotonic-clock time at which we last did a send operation.
      */
-    uint64_t cstart;
+    uint64_t last_send_ms;
 
     /**
-     * 'end' pointer of the circular buffer.
+     * The index of the active buffer.
      */
-    uint64_t cend;
+    int active_buf;
 
     /**
-     * The monotonic-clock time at which we last did a send operation.
+     * The two send buffers.
      */
-    uint64_t last_send_ms;
+    struct htraced_sbuf *sbuf[HTRACED_NUM_BUFS];
 
     /**
-     * RPC messages are copied into this buffer before being sent.
-     * Its length is HTRACED_MAX_MSG_LEN.
+     * Lock protecting the buffers from concurrent writes.
      */
-    uint8_t *sbuf;
+    pthread_mutex_t lock;
 
     /**
-     * Lock protecting the circular buffer from concurrent writes.
+     * Condition variable used to wake up the background thread.
      */
-    pthread_mutex_t lock;
+    pthread_cond_t bg_cond;
 
     /**
-     * Condition variable used to wake up the background thread.
+     * Condition variable used to wake up flushing threads.
      */
-    pthread_cond_t cond;
+    pthread_cond_t flush_cond;
 
     /**
      * Background transmitter thread.
@@ -185,8 +237,44 @@ struct htraced_rcv {
 void* run_htraced_xmit_manager(void *data);
 static int should_xmit(struct htraced_rcv *rcv, uint64_t now);
 static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now);
-static uint64_t cbuf_used(const struct htraced_rcv *rcv);
-static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv);
+
+static int htraced_sbufs_empty(struct htraced_rcv *rcv)
+{
+    int i;
+    for (i = 0; i < HTRACED_NUM_BUFS; i++) {
+        if (rcv->sbuf[i]->off) {
+            return 0;
+        }
+    }
+    return 1;
+}
+
+static struct htraced_sbuf *htraced_sbuf_alloc(uint64_t len)
+{
+    struct htraced_sbuf *sbuf;
+
+    // The final field of the htraced_sbuf structure is declared as having size
+    // 1, but really it has size 'len'.  This avoids a pointer dereference when
+    // accessing data in the sbuf.
+    sbuf = malloc(offsetof(struct htraced_sbuf, buf) + len);
+    if (!sbuf) {
+        return NULL;
+    }
+    sbuf->off = 0;
+    sbuf->len = len;
+    sbuf->num_spans = 0;
+    return sbuf;
+}
+
+static void htraced_sbuf_free(struct htraced_sbuf *sbuf)
+{
+    free(sbuf);
+}
+
+static uint64_t htraced_sbuf_remaining(const struct htraced_sbuf *sbuf)
+{
+    return sbuf->len - sbuf->off;
+}
 
 static uint64_t htraced_get_bounded_u64(struct htrace_log *lg,
                 const struct htrace_conf *cnf, const char *prop,
@@ -207,13 +295,33 @@ static uint64_t htraced_get_bounded_u64(struct htrace_log 
*lg,
     return val;
 }
 
+static double htraced_get_bounded_double(struct htrace_log *lg,
+                const struct htrace_conf *cnf, const char *prop,
+                double min, double max)
+{
+    double val = htrace_conf_get_double(lg, cnf, prop);
+    if (val < min) {
+        htrace_log(lg, "htraced_rcv_create: can't set %s to %g"
+                   ".  Using minimum value of %g instead.\n",
+                   prop, val, min);
+        return min;
+    } else if (val > max) {
+        htrace_log(lg, "htraced_rcv_create: can't set %s to %g"
+                   ".  Using maximum value of %g instead.\n",
+                   prop, val, max);
+        return max;
+    }
+    return val;
+}
+
 static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
                                              const struct htrace_conf *conf)
 {
     struct htraced_rcv *rcv;
     const char *endpoint;
-    int ret;
-    uint64_t write_timeo_ms, read_timeo_ms;
+    int i, ret;
+    uint64_t write_timeo_ms, read_timeo_ms, buf_len;
+    double send_fraction;
 
     endpoint = htrace_conf_get(conf, HTRACED_ADDRESS_KEY);
     if (!endpoint) {
@@ -223,7 +331,7 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer 
*tracer,
                    HTRACED_ADDRESS_KEY);
         goto error;
     }
-    rcv = malloc(sizeof(*rcv));
+    rcv = calloc(1, sizeof(*rcv));
     if (!rcv) {
         htrace_log(tracer->lg, "htraced_rcv_create: OOM while "
                    "allocating htraced_rcv.\n");
@@ -247,68 +355,66 @@ static struct htrace_rcv *htraced_rcv_create(struct 
htracer *tracer,
     if (!rcv->hcli) {
         goto error_free_rcv;
     }
-    rcv->clen = htrace_conf_get_u64(tracer->lg, conf, HTRACED_BUFFER_SIZE_KEY);
-    if (rcv->clen < HTRACED_MIN_BUFFER_SIZE) {
-        htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" 
PRId64
-                   ".  Setting the minimum buffer size of %llu"
-                   " instead.\n", rcv->clen, HTRACED_MIN_BUFFER_SIZE);
-        rcv->clen = HTRACED_MIN_BUFFER_SIZE;
-    } else if (rcv->clen > HTRACED_MAX_BUFFER_SIZE) {
-        htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" 
PRId64
-                   ".  Setting the maximum buffer size of %lld"
-                   " instead.\n", rcv->clen, HTRACED_MAX_BUFFER_SIZE);
-        rcv->clen = HTRACED_MAX_BUFFER_SIZE;
-    }
-    rcv->cbuf = malloc(rcv->clen);
-    if (!rcv->cbuf) {
-        htrace_log(tracer->lg, "htraced_rcv_create: failed to malloc %"PRId64
-                   " bytes for the htraced circular buffer.\n", rcv->clen);
-        goto error_free_hcli;
-    }
-    // Send when the buffer gets 1/4 full.
-    rcv->send_threshold = rcv->clen * 0.25;
-    rcv->cstart = 0;
-    rcv->cend = 0;
-    rcv->last_send_ms = monotonic_now_ms(tracer->lg);
-    rcv->sbuf = malloc(HTRACED_MAX_MSG_LEN);
-    if (!rcv->sbuf) {
-        goto error_free_cbuf;
+    buf_len = htraced_get_bounded_u64(tracer->lg, conf,
+                HTRACED_BUFFER_SIZE_KEY, HTRACED_MIN_BUFFER_SIZE,
+                HTRACED_MAX_BUFFER_SIZE) / 2;
+    for (i = 0; i < HTRACED_NUM_BUFS; i++) {
+        rcv->sbuf[i] = htraced_sbuf_alloc(buf_len);
+        if (!rcv->sbuf[i]) {
+            htrace_log(tracer->lg, "htraced_rcv_create: htraced_sbuf_alloc("
+                       "buf_len=%"PRId64") failed: OOM.\n", buf_len);
+            goto error_free_bufs;
+        }
     }
+    send_fraction = htraced_get_bounded_double(tracer->lg, conf,
+                HTRACED_BUFFER_SEND_TRIGGER_FRACTION, 0.1, 1.0);
+    rcv->send_threshold = buf_len * send_fraction;
+    if (rcv->send_threshold > buf_len) {
+        rcv->send_threshold = buf_len;
+    }
+    rcv->last_send_ms = monotonic_now_ms(tracer->lg);
     ret = pthread_mutex_init(&rcv->lock, NULL);
     if (ret) {
         htrace_log(tracer->lg, "htraced_rcv_create: pthread_mutex_init "
                    "error %d: %s\n", ret, terror(ret));
-        goto error_free_sbuf;
+        goto error_free_bufs;
     }
-    ret = pthread_cond_init(&rcv->cond, NULL);
+    ret = pthread_cond_init(&rcv->bg_cond, NULL);
     if (ret) {
-        htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init "
-                   "error %d: %s\n", ret, terror(ret));
+        htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init("
+                   "bg_cond) error %d: %s\n", ret, terror(ret));
         goto error_free_lock;
     }
+    ret = pthread_cond_init(&rcv->flush_cond, NULL);
+    if (ret) {
+        htrace_log(tracer->lg, "htraced_rcv_create: pthread_cond_init("
+                   "flush_cond) error %d: %s\n", ret, terror(ret));
+        goto error_free_bg_cond;
+    }
     ret = pthread_create(&rcv->xmit_thread, NULL, run_htraced_xmit_manager, 
rcv);
     if (ret) {
         htrace_log(tracer->lg, "htraced_rcv_create: failed to create xmit 
thread: "
                    "error %d: %s\n", ret, terror(ret));
-        goto error_free_cvar;
+        goto error_free_flush_cond;
     }
     htrace_log(tracer->lg, "Initialized htraced receiver for %s"
                 ", flush_interval_ms=%" PRId64 ", send_threshold=%" PRId64
                 ", write_timeo_ms=%" PRId64 ", read_timeo_ms=%" PRId64
-                ", clen=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli),
+                ", buf_len=%" PRId64 ".\n", 
hrpc_client_get_endpoint(rcv->hcli),
                 rcv->flush_interval_ms, rcv->send_threshold,
-                write_timeo_ms, read_timeo_ms, rcv->clen);
+                write_timeo_ms, read_timeo_ms, buf_len);
     return (struct htrace_rcv*)rcv;
 
-error_free_cvar:
-    pthread_cond_destroy(&rcv->cond);
+error_free_flush_cond:
+    pthread_cond_destroy(&rcv->flush_cond);
+error_free_bg_cond:
+    pthread_cond_destroy(&rcv->bg_cond);
 error_free_lock:
     pthread_mutex_destroy(&rcv->lock);
-error_free_sbuf:
-    free(rcv->sbuf);
-error_free_cbuf:
-    free(rcv->cbuf);
-error_free_hcli:
+error_free_bufs:
+    for (i = 0; i < HTRACED_NUM_BUFS; i++) {
+        htraced_sbuf_free(rcv->sbuf[i]);
+    }
     hrpc_client_free(rcv->hcli);
 error_free_rcv:
     free(rcv);
@@ -331,7 +437,7 @@ void* run_htraced_xmit_manager(void *data)
             htraced_xmit(rcv, now);
         }
         if (rcv->shutdown) {
-            while (cbuf_used(rcv) > 0) {
+            while (!htraced_sbufs_empty(rcv)) {
                 htraced_xmit(rcv, now);
             }
             break;
@@ -344,7 +450,7 @@ void* run_htraced_xmit_manager(void *data)
         //      buffered.
         wakeup = now + (rcv->flush_interval_ms / 2);
         ms_to_timespec(wakeup, &wakeup_ts);
-        ret = pthread_cond_timedwait(&rcv->cond, &rcv->lock, &wakeup_ts);
+        ret = pthread_cond_timedwait(&rcv->bg_cond, &rcv->lock, &wakeup_ts);
         if ((ret != 0) && (ret != ETIMEDOUT)) {
             htrace_log(lg, "run_htraced_xmit_manager: pthread_cond_timedwait "
                        "error: %d (%s)\n", ret, terror(ret));
@@ -367,67 +473,107 @@ void* run_htraced_xmit_manager(void *data)
  */
 static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
 {
-    uint64_t used;
+    uint64_t off = rcv->sbuf[rcv->active_buf]->off;
 
-    used = cbuf_used(rcv);
-    if (used > rcv->send_threshold) {
+    if (off > rcv->send_threshold) {
         // We have buffered a lot of bytes, so let's send.
         return 1;
     }
     if (now - rcv->last_send_ms > rcv->flush_interval_ms) {
         // It's been too long since the last transmission, so let's send.
-        if (used > 0) {
+        if (off > 0) {
             return 1;
         }
     }
     return 0; // Let's wait.
 }
 
+#define DEFAULT_PID_STR         "DefaultPid"
+#define DEFAULT_PID_STR_LEN     (sizeof(DEFAULT_PID_STR) - 1)
+#define SPANS_STR               "Spans"
+#define SPANS_STR_LEN           (sizeof(SPANS_STR) - 1)
+
+/**
+ * Write the prequel to the WriteSpans message.
+ */
+static int add_writespans_prequel(struct htraced_rcv *rcv,
+                                  struct htraced_sbuf *sbuf, uint8_t *prequel)
+{
+    struct cmp_bcopy_ctx bctx;
+    struct cmp_ctx_s *ctx =  (struct cmp_ctx_s *)&bctx;
+    cmp_bcopy_ctx_init(&bctx, prequel, MAX_WRITESPANS_PREQUEL_LEN);
+    if (!cmp_write_fixmap(ctx, 2)) {
+        return -1;
+    }
+    if (!cmp_write_fixstr(ctx, DEFAULT_PID_STR, DEFAULT_PID_STR_LEN)) {
+        return -1;
+    }
+    if (!cmp_write_str(ctx, rcv->tracer->prid, strlen(rcv->tracer->prid))) {
+        return -1;
+    }
+    if (!cmp_write_fixstr(ctx, SPANS_STR, SPANS_STR_LEN)) {
+        return -1;
+    }
+    if (!cmp_write_array(ctx, sbuf->num_spans)) {
+        return -1;
+    }
+    return bctx.off;
+}
+
 /**
  * Send all the spans which we have buffered.
  *
  * @param rcv           The htraced receiver.
- * @param slen          The length of the buffer to send.
+ * @param sbuf          The span buffer to send.
  *
  * @return              1 on success; 0 otherwise.
  */
-static int htraced_xmit_impl(struct htraced_rcv *rcv, int32_t slen)
+static int htraced_xmit_impl(struct htraced_rcv *rcv, struct htraced_sbuf 
*sbuf)
 {
     struct htrace_log *lg = rcv->tracer->lg;
-    int res, retval = 0;
-    char *prequel = NULL, *err = NULL, *resp = NULL;
+    uint8_t prequel[MAX_WRITESPANS_PREQUEL_LEN];
+    int prequel_len, ret;
+    char *err = NULL, *resp = NULL;
     size_t resp_len = 0;
 
-    res = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS,
-                     rcv->sbuf, slen, &err, (void**)&resp, &resp_len);
-    if (!res) {
+    prequel_len = add_writespans_prequel(rcv, sbuf, prequel);
+    if (prequel_len < 0) {
+        htrace_log(lg, "htrace_xmit_impl: add_writespans_prequel failed.\n");
+        ret = 0;
+        goto done;
+    }
+    ret = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS,
+                    prequel, prequel_len, sbuf->buf, sbuf->off,
+                    &err, (void**)&resp, &resp_len);
+    if (!ret) {
         htrace_log(lg, "htrace_xmit_impl: hrpc_client_call failed.\n");
-        retval = 0;
+        goto done;
     } else if (err) {
         htrace_log(lg, "htrace_xmit_impl: server returned error: %s\n", err);
-        retval = 0;
-    } else {
-        retval = 1;
+        ret = 0;
+        goto done;
     }
-    free(prequel);
+    ret = 1;
+done:
     free(err);
     free(resp);
-    return retval;
+    return ret;
 }
 
 static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
 {
-    int32_t slen;
     int tries = 0;
+    struct htraced_sbuf *sbuf;
 
-    // Move span data from the circular buffer into the transmission buffer.
-    slen = cbuf_to_sbuf(rcv);
+    // Flip to the other buffer.
+    sbuf = rcv->sbuf[rcv->active_buf];
+    rcv->active_buf = !rcv->active_buf;
 
     // Release the lock while doing network I/O, so that we don't block threads
     // adding spans.
     pthread_mutex_unlock(&rcv->lock);
     while (1) {
-        int retry, success = htraced_xmit_impl(rcv, slen);
+        int retry, success = htraced_xmit_impl(rcv, sbuf);
         if (success) {
             break;
         }
@@ -440,175 +586,100 @@ static void htraced_xmit(struct htraced_rcv *rcv, 
uint64_t now)
             break;
         }
     }
+    sbuf->off = 0;
+    sbuf->num_spans = 0;
     pthread_mutex_lock(&rcv->lock);
     rcv->last_send_ms = now;
-}
-
-/**
- * Move data from the circular buffer into the transmission buffer, advancing
- * the circular buffer's start offset.
- *
- * This function must be called with the lock held.
- *
- * Note that we rely on HTRACED_MAX_MSG_LEN being < 4GB in this function for
- * correctness on 32-bit systems.
- *
- * @param rcv           The htraced receiver.
- *
- * @return              The amount of data copied.
- */
-static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
-{
-    const char * const SUFFIX = "]}";
-    int SUFFIX_LEN = sizeof(SUFFIX) - 1;
-    int rem = HTRACED_MAX_MSG_LEN - SUFFIX_LEN;
-    size_t amt;
-    char *sbuf = (char*)rcv->sbuf;
-
-    fwdprintf(&sbuf, &rem, "{\"DefaultPid\":\"%s\",\"Spans\":[",
-             rcv->tracer->prid);
-    if (rcv->cstart < rcv->cend) {
-        amt = rcv->cend - rcv->cstart;
-        if (amt > rem) {
-            amt = rem;
-        }
-        memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
-        sbuf += amt;
-        rem -= amt;
-        rcv->cstart += amt;
-    } else {
-        amt = rcv->clen - rcv->cstart;
-        if (amt > rem) {
-            amt = rem;
-        }
-        memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
-        sbuf += amt;
-        rem -= amt;
-        rcv->cstart += amt;
-        if (rem > 0) {
-            amt = rcv->cend;
-            if (amt > rem) {
-                amt = rem;
-            }
-            memcpy(sbuf, rcv->cbuf, amt);
-            sbuf += amt;
-            rem -= amt;
-            rcv->cstart = amt;
-        }
-    }
-    // overwrite last comma
-    rem++;
-    sbuf--;
-    rem += SUFFIX_LEN;
-    fwdprintf(&sbuf, &rem, "%s", SUFFIX);
-    return HTRACED_MAX_MSG_LEN - rem;
-}
-
-/**
- * Returns the current number of bytes used in the htraced circular buffer.
- * Must be called under the lock.
- *
- * @param rcv           The htraced receiver.
- *
- * @return              The number of bytes used.
- */
-static uint64_t cbuf_used(const struct htraced_rcv *rcv)
-{
-    if (rcv->cstart <= rcv->cend) {
-        return rcv->cend - rcv->cstart;
-    }
-    return rcv->clen - (rcv->cstart - rcv->cend);
+    pthread_cond_broadcast(&rcv->flush_cond);
 }
 
 static void htraced_rcv_add_span(struct htrace_rcv *r,
                                  struct htrace_span *span)
 {
-    int json_len, tries, retry;
-    uint64_t used, rem;
+    int tries, retry;
+    uint64_t rem, off;
     struct htraced_rcv *rcv = (struct htraced_rcv *)r;
+    struct htraced_sbuf *sbuf;
     struct htrace_log *lg = rcv->tracer->lg;
+    struct cmp_counter_ctx cctx;
+    struct cmp_bcopy_ctx bctx;
+    uint64_t msgpack_len;
+
+    // Determine the length of the span when serialized to msgpack.
+    cmp_counter_ctx_init(&cctx);
+    if (!span_write_msgpack(span, (cmp_ctx_t*)&cctx)) {
+        htrace_log(lg, "htraced_rcv_add_span: span_write_msgpack failed.\n");
+        return;
+    }
+    msgpack_len = cctx.count;
 
-    json_len = span_json_size(span);
+    // Try to get enough space in the current buffer.
     tries = 0;
     do {
         pthread_mutex_lock(&rcv->lock);
-        used = cbuf_used(rcv);
-        if (used + json_len >= rcv->clen) {
-            pthread_cond_signal(&rcv->cond);
+        sbuf = rcv->sbuf[rcv->active_buf];
+        rem = htraced_sbuf_remaining(sbuf);
+        if (rem < msgpack_len) {
+            pthread_cond_signal(&rcv->bg_cond);
             pthread_mutex_unlock(&rcv->lock);
             tries++;
             retry = tries < HTRACED_MAX_ADD_TRIES;
             htrace_log(lg, "htraced_rcv_add_span: not enough space in the "
-                           "circular buffer.  Have %" PRId64 ", need %d"
-                           ".  %s...\n", (rcv->clen - used), json_len,
+                           "current buffer.  Have %" PRId64 ", need %"
+                           PRId64 ".  %s...\n", rem, msgpack_len,
                            (retry ? "Retrying" : "Giving up"));
             if (retry) {
                 pthread_yield();
                 continue;
             }
+            pthread_mutex_unlock(&rcv->lock);
             return;
         }
     } while (0);
     // OK, now we have the lock, and we know that there is enough space in the
-    // circular buffer.
-    rem = rcv->clen - rcv->cend;
-    if (rem < json_len) {
-        // Handle a 'torn write' where the circular buffer loops around to the
-        // beginning in the middle of the write.
-        char *temp = malloc(json_len);
-        if (!temp) {
-            htrace_log(lg, "htraced_rcv_add_span: failed to malloc %d byte "
-                       "buffer for torn write.\n", json_len);
-            goto done;
-        }
-        span_json_sprintf(span, json_len, temp);
-        temp[json_len - 1] = ',';
-        memcpy(rcv->cbuf + rcv->cend, temp, rem);
-        memcpy(rcv->cbuf, temp + rem, json_len - rem);
-        rcv->cend = json_len - rem;
-        free(temp);
-    } else {
-        span_json_sprintf(span, json_len, rcv->cbuf + rcv->cend);
-        rcv->cbuf[rcv->cend + json_len - 1] = ',';
-        rcv->cend += json_len;
-    }
-    used += json_len;
-    if (used > rcv->send_threshold) {
-        pthread_cond_signal(&rcv->cond);
+    // current buffer.
+    off = sbuf->off;
+    cmp_bcopy_ctx_init(&bctx, sbuf->buf + off, msgpack_len);
+    bctx.base.write = cmp_bcopy_write_nocheck_fn;
+    span_write_msgpack(span, (cmp_ctx_t*)&bctx);
+    off += msgpack_len;
+    sbuf->off = off;
+    sbuf->num_spans++;
+    if (off > rcv->send_threshold) {
+        pthread_cond_signal(&rcv->bg_cond);
     }
-done:
     pthread_mutex_unlock(&rcv->lock);
 }
 
 static void htraced_rcv_flush(struct htrace_rcv *r)
 {
     struct htraced_rcv *rcv = (struct htraced_rcv *)r;
+    uint64_t now;
 
+    // Note: This assumes that we only flush one buffer at once, and
+    // that we flush buffers in order.  If we revisit those assumptions we'll
+    // need to change this.
+    // The SpanReceiver flush is only used for testing anyway.
+    pthread_mutex_lock(&rcv->lock);
+    now = monotonic_now_ms(rcv->tracer->lg);
     while (1) {
-        pthread_mutex_lock(&rcv->lock);
-        if (cbuf_used(rcv) == 0) {
-            // If the buffer is empty, we're done.
-            // Note that there is no guarantee that we'll ever be done if spans
-            // are being added continuously throughout the flush.  This is OK,
-            // since flush() is actually only used by unit tests.
-            // We could do something more clever here, but it would be a lot 
more
-            // complex.
-            pthread_mutex_unlock(&rcv->lock);
+        if (rcv->last_send_ms >= now) {
+            break;
+        }
+        if (htraced_sbufs_empty(rcv)) {
             break;
         }
-        // Get the xmit thread to send what it can, by resetting the "last send
-        // time" to the oldest possible monotonic time.
         rcv->last_send_ms = 0;
-        pthread_cond_signal(&rcv->cond);
-        pthread_mutex_unlock(&rcv->lock);
+        pthread_cond_wait(&rcv->flush_cond, &rcv->lock);
     }
+    pthread_mutex_unlock(&rcv->lock);
 }
 
 static void htraced_rcv_free(struct htrace_rcv *r)
 {
     struct htraced_rcv *rcv = (struct htraced_rcv *)r;
     struct htrace_log *lg;
-    int ret;
+    int i, ret;
 
     if (!rcv) {
         return;
@@ -618,24 +689,30 @@ static void htraced_rcv_free(struct htrace_rcv *r)
                hrpc_client_get_endpoint(rcv->hcli));
     pthread_mutex_lock(&rcv->lock);
     rcv->shutdown = 1;
-    pthread_cond_signal(&rcv->cond);
+    pthread_cond_signal(&rcv->bg_cond);
     pthread_mutex_unlock(&rcv->lock);
     ret = pthread_join(rcv->xmit_thread, NULL);
     if (ret) {
         htrace_log(lg, "htraced_rcv_free: pthread_join "
                    "error %d: %s\n", ret, terror(ret));
     }
-    free(rcv->cbuf);
-    free(rcv->sbuf);
+    for (i = 0; i < HTRACED_NUM_BUFS; i++) {
+        htraced_sbuf_free(rcv->sbuf[i]);
+    }
     hrpc_client_free(rcv->hcli);
     ret = pthread_mutex_destroy(&rcv->lock);
     if (ret) {
         htrace_log(lg, "htraced_rcv_free: pthread_mutex_destroy "
                    "error %d: %s\n", ret, terror(ret));
     }
-    ret = pthread_cond_destroy(&rcv->cond);
+    ret = pthread_cond_destroy(&rcv->bg_cond);
+    if (ret) {
+        htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy(bg_cond) "
+                   "error %d: %s\n", ret, terror(ret));
+    }
+    ret = pthread_cond_destroy(&rcv->flush_cond);
     if (ret) {
-        htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy "
+        htrace_log(lg, "htraced_rcv_free: pthread_cond_destroy(flush_cond) "
                    "error %d: %s\n", ret, terror(ret));
     }
     free(rcv);

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/test/cmp_util-unit.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/cmp_util-unit.c 
b/htrace-c/src/test/cmp_util-unit.c
new file mode 100644
index 0000000..bfe4edc
--- /dev/null
+++ b/htrace-c/src/test/cmp_util-unit.c
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/span.h"
+#include "test/span_util.h"
+#include "test/test.h"
+#include "util/cmp.h"
+#include "util/cmp_util.h"
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define NUM_TEST_SPANS 3
+#define TEST_BUF_LENGTH (8UL * 1024UL * 1024UL)
+
+static struct htrace_span **setup_test_spans(void)
+{
+    struct htrace_span **spans =
+        xcalloc(sizeof(struct htrace_span*) * NUM_TEST_SPANS);
+
+    spans[0] = xcalloc(sizeof(struct htrace_span));
+    spans[0]->desc = xstrdup("FirstSpan");
+    spans[0]->begin_ms = 1927;
+    spans[0]->end_ms = 2000;
+    spans[0]->span_id = 1;
+
+    spans[1] = xcalloc(sizeof(struct htrace_span));
+    spans[1]->desc = xstrdup("SecondSpan");
+    spans[1]->begin_ms = 1950;
+    spans[1]->end_ms = 2000;
+    spans[1]->span_id = 0xffffffffffffffffULL;
+    spans[1]->prid = xstrdup("SecondSpanProc");
+    spans[1]->num_parents = 1;
+    spans[1]->parent.single = 1;
+
+    spans[2] = xcalloc(sizeof(struct htrace_span));
+    spans[2]->desc = xstrdup("ThirdSpan");
+    spans[2]->begin_ms = 1969;
+    spans[2]->end_ms = 1997;
+    spans[2]->span_id = 0xcfcfcfcfcfcfcfcfULL;
+    spans[2]->prid = xstrdup("ThirdSpanProc");
+    spans[2]->num_parents = 2;
+    spans[2]->parent.list = xcalloc(sizeof(uint64_t) * 2);
+    spans[2]->parent.list[0] = 1;
+    spans[2]->parent.list[1] = 0xffffffffffffffffULL;
+
+    return spans;
+}
+
+static int serialize_test_spans(struct htrace_span **test_spans, cmp_ctx_t 
*ctx)
+{
+    int i;
+    for (i = 0; i < NUM_TEST_SPANS; i++) {
+        EXPECT_INT_EQ(1, span_write_msgpack(test_spans[i], ctx));
+    }
+    return EXIT_SUCCESS;
+}
+
+static int test_serialize_spans(struct htrace_span **test_spans)
+{
+    int i;
+    struct htrace_span *span;
+    struct cmp_counter_ctx cctx;
+    struct cmp_bcopy_ctx bctx;
+    char *buf;
+    char err[1024];
+    size_t err_len = sizeof(err);
+
+    cmp_counter_ctx_init(&cctx);
+    EXPECT_INT_ZERO(serialize_test_spans(test_spans, (cmp_ctx_t *)&cctx));
+
+    buf = xcalloc(TEST_BUF_LENGTH);
+    cmp_bcopy_ctx_init(&bctx, buf, TEST_BUF_LENGTH);
+    EXPECT_INT_ZERO(serialize_test_spans(test_spans, (cmp_ctx_t *)&bctx));
+    EXPECT_UINT64_EQ(cctx.count, bctx.off);
+    EXPECT_UINT64_EQ(TEST_BUF_LENGTH, bctx.len);
+
+    bctx.off = 0;
+    for (i = 0; i < NUM_TEST_SPANS; i++) {
+        span = span_read_msgpack((cmp_ctx_t*)&bctx, err, err_len);
+        EXPECT_STR_EQ("", err);
+        EXPECT_NONNULL(span);
+        EXPECT_INT_ZERO(span_compare(test_spans[i], span));
+        htrace_span_free(span);
+    }
+
+    free(buf);
+    return EXIT_SUCCESS;
+}
+
+int main(void)
+{
+    int i;
+    struct htrace_span **test_spans;
+
+    test_spans = setup_test_spans();
+    EXPECT_NONNULL(test_spans);
+    EXPECT_INT_ZERO(test_serialize_spans(test_spans));
+    for (i = 0; i < NUM_TEST_SPANS; i++) {
+        htrace_span_free(test_spans[i]);
+    }
+    free(test_spans);
+    return EXIT_SUCCESS;
+}
+
+// vim: ts=4:sw=4:tw=79:et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/test/span_util.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/span_util.c b/htrace-c/src/test/span_util.c
index fc4b041..8a615b1 100644
--- a/htrace-c/src/test/span_util.c
+++ b/htrace-c/src/test/span_util.c
@@ -18,6 +18,7 @@
 
 #include "core/span.h"
 #include "test/span_util.h"
+#include "util/cmp.h"
 #include "util/log.h"
 
 #include <errno.h>
@@ -291,4 +292,194 @@ int span_compare(struct htrace_span *a, struct 
htrace_span *b)
     return compare_parents(a, b);
 }
 
+static int span_read_key_str(struct cmp_ctx_s *ctx, char *out,
+                             uint32_t out_len, char *err, size_t err_len)
+{
+    uint32_t size = 0;
+
+    err[0] = '\0';
+    if (!cmp_read_str_size(ctx, &size)) {
+        snprintf(err, err_len, "span_read_key_str: cmp_read_str_size failed.");
+        return 0;
+    }
+    if (size >= out_len) {
+        snprintf(err, err_len, "span_read_key_str: size of key string was "
+                 "%"PRId32", but we can only handle key strings less than "
+                 "%"PRId32" bytes.", size, out_len);
+        return 0;
+    }
+    if (!ctx->read(ctx, out, size)) {
+        snprintf(err, err_len, "span_read_key_str: ctx->read failed for "
+                 "%"PRId32"-byte key string.", size);
+        return 0;
+    }
+    out[size] = '\0';
+    return 1;
+}
+
+static char *cmp_read_malloced_string(struct cmp_ctx_s *ctx, const char *what,
+                                      char *err, size_t err_len)
+{
+    uint32_t size = 0;
+    char *str;
+
+    err[0] = '\0';
+    if (!cmp_read_str_size(ctx, &size)) {
+        snprintf(err, err_len, "cmp_read_malloced_string: cmp_read_str_size "
+                 "failed for %s.", what);
+        return NULL;
+    }
+    str = malloc(size + 1);
+    if (!str) {
+        snprintf(err, err_len, "cmp_read_malloced_string: failed to malloc "
+                 "failed for %d-byte string for %s.", size + 1, what);
+        return NULL;
+    }
+    if (!ctx->read(ctx, str, size)) {
+        snprintf(err, err_len, "cmp_read_malloced_string: failed to read "
+                 "%"PRId32"-byte string for %s.", size, what);
+        free(str);
+        return NULL;
+    }
+    str[size] = '\0';
+    return str;
+}
+
+static void span_parse_msgpack_parents(struct cmp_ctx_s *ctx,
+                struct htrace_span *span, char *err, size_t err_len)
+{
+    uint32_t i, size;
+
+    err[0] = '\0';
+    if (span->num_parents > 1) {
+        free(span->parent.list);
+        span->parent.list = NULL;
+    }
+    span->parent.single = 0;
+    span->num_parents = 0;
+    if (!cmp_read_array(ctx, &size)) {
+        snprintf(err, err_len, "span_parse_msgpack_parents: cmp_read_array "
+                 "failed.");
+        return;
+    }
+    if (size == 1) {
+        if (!cmp_read_u64(ctx, &span->parent.single)) {
+            snprintf(err, err_len, "span_parse_msgpack_parents: cmp_read_u64 "
+                     "for single child ID failed");
+            return;
+        }
+    } else if (size > 1) {
+        span->parent.list = malloc(sizeof(uint64_t) * size);
+        if (!span->parent.list) {
+            snprintf(err, err_len, "span_parse_msgpack_parents: failed to "
+                     "malloc %"PRId32"-entry parent array.", size);
+            return;
+        }
+        for (i = 0; i < size; i++) {
+            if (!cmp_read_u64(ctx, &span->parent.list[i])) {
+                snprintf(err, err_len, "span_parse_msgpack_parents: 
cmp_read_u64 "
+                         "for child %d ID failed", i);
+                free(span->parent.list);
+                span->parent.list = NULL;
+                return;
+            }
+        }
+    }
+    span->num_parents = size;
+}
+
+struct htrace_span *span_read_msgpack(struct cmp_ctx_s *ctx,
+                                      char *err, size_t err_len)
+{
+    struct htrace_span *span = NULL;
+    uint32_t map_size = 0;
+    char key[8];
+
+    err[0] = '\0';
+    span = calloc(1, sizeof(*span));
+    if (!span) {
+        snprintf(err, err_len, "span_read_msgpack: OOM allocating "
+                 "htrace_span.");
+        goto error;
+    }
+    if (!cmp_read_map(ctx, &map_size)) {
+        snprintf(err, err_len, "span_read_msgpack: cmp_read_map failed to "
+                 "read enclosing map object.\n");
+        goto error;
+    }
+    while (map_size > 0) {
+        if (!span_read_key_str(ctx, key, sizeof(key), err, err_len)) {
+            goto error;
+        }
+        switch (key[0]) {
+        case 'd':
+            if (span->desc) {
+                free(span->desc);
+            }
+            span->desc = cmp_read_malloced_string(ctx, "description",
+                                                  err, err_len);
+            if (err[0]) {
+                goto error;
+            }
+            break;
+        case 'b':
+            if (!cmp_read_u64(ctx, &span->begin_ms)) {
+                snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 "
+                         "failed for span->begin_ms.");
+                goto error;
+            }
+            break;
+        case 'e':
+            if (!cmp_read_u64(ctx, &span->end_ms)) {
+                snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 "
+                         "failed for span->end_ms.");
+                goto error;
+            }
+            break;
+        case 's':
+            if (!cmp_read_u64(ctx, &span->span_id)) {
+                snprintf(err, err_len, "span_read_msgpack: cmp_read_u64 "
+                         "failed for span->span_id");
+                goto error;
+            }
+            break;
+        case 'r':
+            if (span->prid) {
+                free(span->prid);
+            }
+            span->prid = cmp_read_malloced_string(ctx, "process_id",
+                                                  err, err_len);
+            if (err[0]) {
+                goto error;
+            }
+            break;
+        case 'p':
+            span_parse_msgpack_parents(ctx, span, err, err_len);
+            if (err[0]) {
+                goto error;
+            }
+            break;
+        default:
+            snprintf(err, err_len, "span_read_msgpack: can't understand key "
+                     "'%s'.\n", key);
+            goto error;
+        }
+        map_size--;
+    }
+    // Description cannot be NULL.
+    if (!span->desc) {
+        span->desc = strdup("");
+        if (!span->desc) {
+            snprintf(err, err_len, "span_read_msgpack: OOM allocating empty "
+                     "description string.");
+            goto error;
+        }
+    }
+    return span;
+
+error:
+    htrace_span_free(span);
+    return NULL;
+}
+
 // vim:ts=4:sw=4:et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/33ab96fb/htrace-c/src/test/span_util.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/span_util.h b/htrace-c/src/test/span_util.h
index 393e213..8ab197f 100644
--- a/htrace-c/src/test/span_util.h
+++ b/htrace-c/src/test/span_util.h
@@ -22,6 +22,7 @@
 #include <stdint.h>
 #include <unistd.h> /* for size_t */
 
+struct cmp_ctx_s;
 struct htrace_span;
 
 /**
@@ -61,6 +62,19 @@ void span_json_parse(const char *in, struct htrace_span 
**span,
  */
 int span_compare(struct htrace_span *a, struct htrace_span *b);
 
+/**
+ * Read a span from the provided CMP context.
+ *
+ * @param ctx       The CMP context.
+ * @param err       (out param) On error, where the error message will be
+ *                      written.  Will be set to the empty string on success.
+ * @param err_len   The length of the error buffer.  Must be nonzero.
+ *
+ * @return          The span on success; NULL otherwise.
+ */
+struct htrace_span *span_read_msgpack(struct cmp_ctx_s *ctx,
+                                      char *err, size_t err_len);
+
 #endif
 
 // vim:ts=4:sw=4:et

Reply via email to