Repository: incubator-htrace
Updated Branches:
  refs/heads/master 2ccb38813 -> 35053cfc5


HTRACE-308. Deserialize WriteSpans requests incrementally rather than all at 
once to optimize GC (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/35053cfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/35053cfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/35053cfc

Branch: refs/heads/master
Commit: 35053cfc557c24e94b7336fc24801d8bdfafc973
Parents: 2ccb388
Author: Colin P. Mccabe <[email protected]>
Authored: Tue Dec 1 13:27:53 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Tue Dec 1 13:27:53 2015 -0800

----------------------------------------------------------------------
 htrace-c/src/receiver/htraced.c                 | 10 +--
 htrace-htraced/go/Godeps/Godeps.json            |  2 +-
 .../go/src/org/apache/htrace/client/client.go   | 20 +++--
 .../go/src/org/apache/htrace/client/hclient.go  | 41 +++++++---
 .../go/src/org/apache/htrace/common/rpc.go      |  6 +-
 .../org/apache/htrace/htraced/client_test.go    | 31 +++-----
 .../org/apache/htrace/htraced/datastore_test.go |  4 +-
 .../go/src/org/apache/htrace/htraced/hrpc.go    | 83 ++++++++++++--------
 .../org/apache/htrace/htraced/metrics_test.go   |  4 +-
 .../go/src/org/apache/htrace/htraced/rest.go    | 38 ++++-----
 .../go/src/org/apache/htrace/htracedTool/cmd.go |  4 +-
 .../main/java/org/apache/htrace/impl/Conf.java  |  4 +-
 .../org/apache/htrace/impl/PackedBuffer.java    |  8 +-
 .../apache/htrace/impl/RestBufferManager.java   | 23 ++----
 14 files changed, 152 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-c/src/receiver/htraced.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c
index d92518d..3a2a091 100644
--- a/htrace-c/src/receiver/htraced.c
+++ b/htrace-c/src/receiver/htraced.c
@@ -71,7 +71,7 @@
  * The maximum length of the message we will send to the server.
  * This must be the same or shorter than MAX_HRPC_BODY_LENGTH in rpc.go.
  */
-#define MAX_HRPC_LEN (64ULL * 1024ULL * 1024ULL)
+#define MAX_HRPC_LEN (32ULL * 1024ULL * 1024ULL)
 
 /**
  * The maximum length of the prequel in a WriteSpans message.
@@ -490,8 +490,8 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t 
now)
 
 #define DEFAULT_TRID_STR        "DefaultTrid"
 #define DEFAULT_TRID_STR_LEN    (sizeof(DEFAULT_TRID_STR) - 1)
-#define SPANS_STR               "Spans"
-#define SPANS_STR_LEN           (sizeof(SPANS_STR) - 1)
+#define NUM_SPANS_STR               "NumSpans"
+#define NUM_SPANS_STR_LEN           (sizeof(NUM_SPANS_STR) - 1)
 
 /**
  * Write the prequel to the WriteSpans message.
@@ -511,10 +511,10 @@ static int add_writespans_prequel(struct htraced_rcv *rcv,
     if (!cmp_write_str(ctx, rcv->tracer->trid, strlen(rcv->tracer->trid))) {
         return -1;
     }
-    if (!cmp_write_fixstr(ctx, SPANS_STR, SPANS_STR_LEN)) {
+    if (!cmp_write_fixstr(ctx, NUM_SPANS_STR, NUM_SPANS_STR_LEN)) {
         return -1;
     }
-    if (!cmp_write_array(ctx, sbuf->num_spans)) {
+    if (!cmp_write_uint(ctx, sbuf->num_spans)) {
         return -1;
     }
     return bctx.off;

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/Godeps/Godeps.json
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/Godeps/Godeps.json 
b/htrace-htraced/go/Godeps/Godeps.json
index 7c737fe..2db37be 100644
--- a/htrace-htraced/go/Godeps/Godeps.json
+++ b/htrace-htraced/go/Godeps/Godeps.json
@@ -24,7 +24,7 @@
         },
         {
             "ImportPath": "github.com/ugorji/go/codec",
-            "Rev": "1a8bf87a90ddcdc7deaa0038f127ac62135fdd58"
+            "Rev": "ea9cd21fa0bc41ee4bdd50ac7ed8cbc7ea2ed960"
         }
     ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go 
b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 65b04e4..a2a6f8b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -142,26 +142,36 @@ func (hcl *Client) FindSpan(sid common.SpanId) 
(*common.Span, error) {
        return &span, nil
 }
 
-func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
+func (hcl *Client) WriteSpans(spans []*common.Span) error {
        if hcl.hrpcAddr == "" {
-               return hcl.writeSpansHttp(req)
+               return hcl.writeSpansHttp(spans)
        }
        hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks)
        if err != nil {
                return err
        }
        defer hcr.Close()
-       return hcr.writeSpans(req)
+       return hcr.writeSpans(spans)
 }
 
-func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
+func (hcl *Client) writeSpansHttp(spans []*common.Span) error {
+       req := common.WriteSpansReq {
+               NumSpans: len(spans),
+       }
        var w bytes.Buffer
        enc := json.NewEncoder(&w)
        err := enc.Encode(req)
        if err != nil {
-               return errors.New(fmt.Sprintf("Error serializing span: %s",
+               return errors.New(fmt.Sprintf("Error serializing WriteSpansReq: 
%s",
                        err.Error()))
        }
+       for spanIdx := range(spans) {
+               err := enc.Encode(spans[spanIdx])
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Error serializing span 
%d out " +
+                               "of %d: %s", spanIdx, len(spans), err.Error()))
+               }
+       }
        _, _, err = hcl.makeRestRequest("POST", "writeSpans", &w)
        if err != nil {
                return err

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go 
b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
index 2fcd9a0..43f0c6c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
@@ -41,20 +41,41 @@ type HrpcClientCodec struct {
        testHooks *TestHooks
 }
 
-func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) 
error {
-       methodId := common.HrpcMethodNameToId(req.ServiceMethod)
+func (cdc *HrpcClientCodec) WriteRequest(rr *rpc.Request, msg interface{}) 
error {
+       methodId := common.HrpcMethodNameToId(rr.ServiceMethod)
        if methodId == common.METHOD_ID_NONE {
                return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method 
name %s",
-                       req.ServiceMethod))
+                       rr.ServiceMethod))
        }
        mh := new(codec.MsgpackHandle)
        mh.WriteExt = true
        w := bytes.NewBuffer(make([]byte, 0, 2048))
+
+       var err error
        enc := codec.NewEncoder(w, mh)
-       err := enc.Encode(msg)
-       if err != nil {
-               return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to 
marshal "+
-                       "message as msgpack: %s", err.Error()))
+       if methodId == common.METHOD_ID_WRITE_SPANS {
+               spans := msg.([]*common.Span)
+               req := &common.WriteSpansReq {
+                       NumSpans: len(spans),
+               }
+               err = enc.Encode(req)
+               if err != nil {
+                       return errors.New(fmt.Sprintf("HrpcClientCodec: Unable 
to marshal "+
+                               "message as msgpack: %s", err.Error()))
+               }
+               for spanIdx := range(spans) {
+                       err = enc.Encode(spans[spanIdx])
+                       if err != nil {
+                               return errors.New(fmt.Sprintf("HrpcClientCodec: 
Unable to marshal "+
+                                       "span %d out of %d as msgpack: %s", 
spanIdx, len(spans), err.Error()))
+                       }
+               }
+       } else {
+               err = enc.Encode(msg)
+               if err != nil {
+                       return errors.New(fmt.Sprintf("HrpcClientCodec: Unable 
to marshal "+
+                               "message as msgpack: %s", err.Error()))
+               }
        }
        buf := w.Bytes()
        if len(buf) > common.MAX_HRPC_BODY_LENGTH {
@@ -65,7 +86,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, 
msg interface{}) erro
        hdr := common.HrpcRequestHeader{
                Magic:    common.HRPC_MAGIC,
                MethodId: methodId,
-               Seq:      req.Seq,
+               Seq:      rr.Seq,
                Length:   uint32(len(buf)),
        }
        err = binary.Write(cdc.rwc, binary.LittleEndian, &hdr)
@@ -154,9 +175,9 @@ func newHClient(hrpcAddr string, testHooks *TestHooks) 
(*hClient, error) {
        return &hcr, nil
 }
 
-func (hcr *hClient) writeSpans(req *common.WriteSpansReq) error {
+func (hcr *hClient) writeSpans(spans []*common.Span) error {
        resp := common.WriteSpansResp{}
-       return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, req, &resp)
+       return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, spans, &resp)
 }
 
 func (hcr *hClient) Close() {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go 
b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 2ec5fe9..5f02db6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -34,13 +34,13 @@ const METHOD_NAME_WRITE_SPANS = "HrpcHandler.WriteSpans"
 const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024
 
 // Maximum length of HRPC message body
-const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
+const MAX_HRPC_BODY_LENGTH = 32 * 1024 * 1024
 
 // A request to write spans to htraced.
+// This request is followed by a sequence of spans.
 type WriteSpansReq struct {
-       Addr        string `json:",omitempty"` // This gets filled in by the 
RPC layer.
        DefaultTrid string `json:",omitempty"`
-       Spans       []*Span
+       NumSpans    int
 }
 
 // Info returned by /server/version

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 3a877f6..7b64914 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -114,9 +114,7 @@ func TestClientOperations(t *testing.T) {
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)
 
        // Write half of the spans to htraced via the client.
-       err = hcl.WriteSpans(&common.WriteSpansReq{
-               Spans: allSpans[0 : NUM_TEST_SPANS/2],
-       })
+       err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2])
        if err != nil {
                t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
                        err.Error())
@@ -209,9 +207,7 @@ func TestDumpAll(t *testing.T) {
        NUM_TEST_SPANS := 100
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)
        sort.Sort(allSpans)
-       err = hcl.WriteSpans(&common.WriteSpansReq{
-               Spans: allSpans,
-       })
+       err = hcl.WriteSpans(allSpans)
        if err != nil {
                t.Fatalf("WriteSpans failed: %s\n", err.Error())
        }
@@ -325,9 +321,7 @@ func TestHrpcAdmissionsControl(t *testing.T) {
        allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
        for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
                go func(i int) {
-                       err = hcl.WriteSpans(&common.WriteSpansReq{
-                               Spans: allSpans[i : i+1],
-                       })
+                       err = hcl.WriteSpans(allSpans[i : i+1])
                        if err != nil {
                                t.Fatalf("WriteSpans failed: %s\n", err.Error())
                        }
@@ -379,9 +373,7 @@ func TestHrpcIoTimeout(t *testing.T) {
                        // Keep in mind that we only block until we have seen
                        // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- 
after that,
                        // we let requests through so that the test can exit 
cleanly.
-                       hcl.WriteSpans(&common.WriteSpansReq{
-                               Spans: allSpans[i : i+1],
-                       })
+                       hcl.WriteSpans(allSpans[i : i+1])
                }(iter)
        }
        for {
@@ -398,6 +390,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc 
uint32, b *testing.B) {
        htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
                Cnf: map[string]string{
                        conf.HTRACE_LOG_LEVEL: "INFO",
+                       conf.HTRACE_NUM_HRPC_HANDLERS: "20",
                },
                WrittenSpans: common.NewSemaphore(int64(1 - N)),
        }
@@ -416,7 +409,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc 
uint32, b *testing.B) {
        // body length limit.  TODO: a production-quality golang client would do
        // this internally rather than needing us to do it here in the unit 
test.
        bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
-       reqs := make([]*common.WriteSpansReq, 0, 4)
+       reqs := make([][]*common.Span, 0, 4)
        curReq := -1
        curReqLen := bodyLen
        var curReqSpans uint32
@@ -429,7 +422,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc 
uint32, b *testing.B) {
                span := allSpans[n]
                if (curReqSpans >= maxSpansPerRpc) ||
                        (curReqLen >= bodyLen) {
-                       reqs = append(reqs, &common.WriteSpansReq{})
+                       reqs = append(reqs, make([]*common.Span, 0, 16))
                        curReqLen = 0
                        curReq++
                        curReqSpans = 0
@@ -446,7 +439,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc 
uint32, b *testing.B) {
                        panic(fmt.Sprintf("Span too long at %d bytes\n", 
bufLen))
                }
                curReqLen += bufLen
-               reqs[curReq].Spans = append(reqs[curReq].Spans, span)
+               reqs[curReq] = append(reqs[curReq], span)
                curReqSpans++
        }
        ht.Store.lg.Infof("num spans: %d.  num WriteSpansReq calls: %d\n", N, 
len(reqs))
@@ -465,13 +458,13 @@ func doWriteSpans(name string, N int, maxSpansPerRpc 
uint32, b *testing.B) {
 
        // Write many random spans.
        for reqIdx := range reqs {
-               go func() {
-                       err = hcl.WriteSpans(reqs[reqIdx])
+               go func(i int) {
+                       err = hcl.WriteSpans(reqs[i])
                        if err != nil {
                                panic(fmt.Sprintf("failed to send WriteSpans 
request %d: %s",
-                                       reqIdx, err.Error()))
+                                       i, err.Error()))
                        }
-               }()
+               }(reqIdx)
        }
        // Wait for all the spans to be written.
        ht.Store.WrittenSpans.Wait()

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 4fc400a..ebf3c47 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -478,9 +478,7 @@ func TestReloadDataStore(t *testing.T) {
        // Create some random trace spans.
        NUM_TEST_SPANS := 5
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-       err = hcl.WriteSpans(&common.WriteSpansReq{
-               Spans: allSpans,
-       })
+       err = hcl.WriteSpans(allSpans)
        if err != nil {
                t.Fatalf("WriteSpans failed: %s\n", err.Error())
        }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index a6f6751..ecd13d4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -32,7 +32,6 @@ import (
        "net/rpc"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
-       "reflect"
        "sync"
        "sync/atomic"
        "time"
@@ -101,6 +100,13 @@ type HrpcServerCodec struct {
 
        // The number of messages this connection has handled.
        numHandled int
+
+       // The buffer for reading requests.  These buffers are reused for 
multiple
+       // requests to avoid allocating memory.
+       buf []byte
+
+       // Configuration for msgpack decoding
+       msgpackHandle codec.MsgpackHandle
 }
 
 func asJson(val interface{}) string {
@@ -164,29 +170,55 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req 
*rpc.Request) error {
 }
 
 func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+       remoteAddr := cdc.conn.RemoteAddr().String()
        if cdc.lg.TraceEnabled() {
                cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
-                       cdc.conn.RemoteAddr(), cdc.length)
+                       remoteAddr, cdc.length)
+       }
+       if cap(cdc.buf) < int(cdc.length) {
+               var pow uint
+               for pow=0;(1<<pow) < int(cdc.length);pow++ {
+               }
+               cdc.buf = make([]byte, 0, 1<<pow)
        }
-       mh := new(codec.MsgpackHandle)
-       mh.WriteExt = true
-       dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh)
-       err := dec.Decode(body)
+       _, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
        if err != nil {
                return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte 
"+
                        "request body: %s", cdc.length, err.Error()))
        }
+       var zeroTime time.Time
+       cdc.conn.SetDeadline(zeroTime)
+
+       dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
+       err = dec.Decode(body)
        if cdc.lg.TraceEnabled() {
-               cdc.lg.Tracef("%s: read %d-byte request body %s\n",
-                       cdc.conn.RemoteAddr(), cdc.length, asJson(&body))
+               cdc.lg.Tracef("%s: read HRPC message: %s\n",
+                       remoteAddr, asJson(&body))
        }
-       val := reflect.ValueOf(body)
-       addr := val.Elem().FieldByName("Addr")
-       if addr.IsValid() {
-               addr.SetString(cdc.conn.RemoteAddr().String())
+       req := body.(*common.WriteSpansReq)
+       if req == nil {
+               return nil
        }
-       var zeroTime time.Time
-       cdc.conn.SetDeadline(zeroTime)
+       // We decode WriteSpans requests in a streaming fashion, to avoid 
overloading the garbage
+       // collector with a ton of trace spans all at once.
+       startTime := time.Now()
+       client, _, err := net.SplitHostPort(remoteAddr)
+       if err != nil {
+               return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host 
and port "+
+                       "for %s: %s\n", remoteAddr, err.Error()))
+       }
+       hand := cdc.hsv.hand
+       ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+       for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
+               var span *common.Span
+               err := dec.Decode(&span)
+               if err != nil {
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to 
decode span %d " +
+                               "out of %d: %s\n", spanIdx, req.NumSpans, 
err.Error()))
+               }
+               ing.IngestSpan(span)
+       }
+       ing.Close(startTime)
        return nil
 }
 
@@ -197,10 +229,8 @@ func (cdc *HrpcServerCodec) WriteResponse(resp 
*rpc.Response, msg interface{}) e
        var err error
        buf := EMPTY
        if msg != nil {
-               mh := new(codec.MsgpackHandle)
-               mh.WriteExt = true
                w := bytes.NewBuffer(make([]byte, 0, 128))
-               enc := codec.NewEncoder(w, mh)
+               enc := codec.NewEncoder(w, &cdc.msgpackHandle)
                err := enc.Encode(msg)
                if err != nil {
                        return newIoErrorWarn(cdc, fmt.Sprintf("Failed to 
marshal "+
@@ -257,20 +287,8 @@ func (cdc *HrpcServerCodec) Close() error {
 }
 
 func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
-       resp *common.WriteSpansResp) (err error) {
-       startTime := time.Now()
-       hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s).  "+
-               "defaultTrid = %s\n", len(req.Spans), req.DefaultTrid)
-       client, _, err := net.SplitHostPort(req.Addr)
-       if err != nil {
-               return errors.New(fmt.Sprintf("Failed to split host and port "+
-                       "for %s: %s\n", req.Addr, err.Error()))
-       }
-       ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
-       for spanIdx := range req.Spans {
-               ing.IngestSpan(req.Spans[spanIdx])
-       }
-       ing.Close(startTime)
+               resp *common.WriteSpansResp) (err error) {
+       // Nothing to do here; WriteSpans is handled in ReadRequestBody.
        return nil
 }
 
@@ -303,6 +321,9 @@ func CreateHrpcServer(cnf *conf.Config, store *dataStore,
                hsv.cdcs <- &HrpcServerCodec{
                        lg:  lg,
                        hsv: hsv,
+                       msgpackHandle: codec.MsgpackHandle {
+                               WriteExt: true,
+                       },
                }
        }
        var err error

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index bad7889..6daf640 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -118,9 +118,7 @@ func testIngestedSpansMetricsImpl(t *testing.T, usePacked 
bool) {
 
        NUM_TEST_SPANS := 12
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-       err = hcl.WriteSpans(&common.WriteSpansReq{
-               Spans: allSpans,
-       })
+       err = hcl.WriteSpans(allSpans)
        if err != nil {
                t.Fatalf("WriteSpans failed: %s\n", err.Error())
        }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index da82912..74ec0cf 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -24,8 +24,6 @@ import (
        "encoding/json"
        "fmt"
        "github.com/gorilla/mux"
-       "io"
-       "io/ioutil"
        "net"
        "net/http"
        "org/apache/htrace/common"
@@ -230,34 +228,32 @@ func (hand *writeSpansHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Reques
                                req.RemoteAddr, serr.Error()))
                return
        }
-       var dec *json.Decoder
-       if hand.lg.TraceEnabled() {
-               b, err := ioutil.ReadAll(req.Body)
-               if err != nil {
-                       writeError(hand.lg, w, http.StatusBadRequest,
-                               fmt.Sprintf("Error reading span data: %s", 
err.Error()))
-                       return
-               }
-               hand.lg.Tracef("writeSpansHandler: read %s\n", string(b))
-               dec = json.NewDecoder(bytes.NewBuffer(b))
-       } else {
-               dec = json.NewDecoder(req.Body)
-       }
+       dec := json.NewDecoder(req.Body)
        var msg common.WriteSpansReq
        err := dec.Decode(&msg)
-       if (err != nil) && (err != io.EOF) {
+       if (err != nil) {
                writeError(hand.lg, w, http.StatusBadRequest,
                        fmt.Sprintf("Error parsing WriteSpansReq: %s", 
err.Error()))
                return
        }
-       hand.lg.Debugf("writeSpansHandler: received %d span(s).  defaultTrid = 
%s\n",
-               len(msg.Spans), msg.DefaultTrid)
-
+       if hand.lg.TraceEnabled() {
+               hand.lg.Tracef("%s: read WriteSpans REST message: %s\n",
+                       req.RemoteAddr, asJson(&msg))
+       }
        ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
-       for spanIdx := range msg.Spans {
-               ing.IngestSpan(msg.Spans[spanIdx])
+       for spanIdx := 0; spanIdx < msg.NumSpans; spanIdx++ {
+               var span *common.Span
+               err := dec.Decode(&span)
+               if err != nil {
+                       writeError(hand.lg, w, http.StatusBadRequest,
+                               fmt.Sprintf("Failed to decode span %d out of 
%d: ",
+                                       spanIdx, msg.NumSpans, err.Error()))
+                       return
+               }
+               ing.IngestSpan(span)
        }
        ing.Close(startTime)
+       return
 }
 
 type queryHandler struct {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go 
b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 9837e94..2eff0a8 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -362,9 +362,7 @@ func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
                }
                fmt.Printf("\n")
        }
-       err = hcl.WriteSpans(&common.WriteSpansReq{
-               Spans: spans,
-       })
+       err = hcl.WriteSpans(spans)
        if err != nil {
                fmt.Println(err.Error())
                return EXIT_FAILURE

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
index 3206dd6..e5059f7 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
@@ -113,11 +113,11 @@ class Conf {
    */
   final static String BUFFER_SIZE_KEY =
       "htraced.receiver.buffer.size";
-  final static int BUFFER_SIZE_DEFAULT = 48 * 1024 * 1024;
+  final static int BUFFER_SIZE_DEFAULT = 16 * 1024 * 1024;
   static int BUFFER_SIZE_MIN = 4 * 1024 * 1024;
   // The maximum buffer size should not be longer than
   // PackedBuffer.MAX_HRPC_BODY_LENGTH.
-  final static int BUFFER_SIZE_MAX = 63 * 1024 * 1024;
+  final static int BUFFER_SIZE_MAX = 32 * 1024 * 1024;
 
   /**
    * Set the fraction of the span buffer which needs to fill up before we

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
index f867ad7..dd0a4b9 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
@@ -78,7 +78,7 @@ class PackedBuffer {
 
   private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
   private static final Charset UTF8 = StandardCharsets.UTF_8;
-  private static final byte SPANS[] = "Spans".getBytes(UTF8);
+  private static final byte NUM_SPANS[] = "NumSpans".getBytes(UTF8);
   private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8);
   private static final byte A[] = "a".getBytes(UTF8);
   private static final byte B[] = "b".getBytes(UTF8);
@@ -401,9 +401,9 @@ class PackedBuffer {
         packer.writePayload(DEFAULT_PID);
         packer.packString(defaultPid);
       }
-      packer.packRawStringHeader(SPANS.length);
-      packer.writePayload(SPANS);
-      packer.packArrayHeader(numSpans);
+      packer.packRawStringHeader(NUM_SPANS.length);
+      packer.writePayload(NUM_SPANS);
+      packer.packInt(numSpans);
       packer.flush();
       success = true;
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
index 377d19f..39e5f99 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
@@ -42,15 +42,12 @@ import org.eclipse.jetty.http.HttpStatus;
 class RestBufferManager implements BufferManager {
   private static final Log LOG = LogFactory.getLog(RestBufferManager.class);
   private static final Charset UTF8 = Charset.forName("UTF-8");
-  private static final byte COMMA_BYTE = (byte)0x2c;
   private static final int MAX_PREQUEL_LENGTH = 512;
-  private static final int MAX_EPILOGUE_LENGTH = 32;
   private final Conf conf;
   private final HttpClient httpClient;
   private final String urlString;
   private final ByteBuffer prequel;
   private final ByteBuffer spans;
-  private final ByteBuffer epilogue;
   private int numSpans;
 
   private static class RestBufferManagerContentProvider
@@ -122,7 +119,6 @@ class RestBufferManager implements BufferManager {
         conf.endpoint.getPort(), "/writeSpans").toString();
     this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH);
     this.spans = ByteBuffer.allocate(conf.bufferSize);
-    this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH);
     clear();
     this.httpClient.start();
   }
@@ -130,11 +126,10 @@ class RestBufferManager implements BufferManager {
   @Override
   public void writeSpan(Span span) throws IOException {
     byte[] spanJsonBytes = span.toString().getBytes(UTF8);
-    if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) {
-      // Make sure we have enough space for the span JSON and a comma.
+    if ((spans.capacity() - spans.position()) < spanJsonBytes.length) {
+      // Make sure we have enough space for the span JSON.
       throw new IOException("Not enough space remaining in span buffer.");
     }
-    spans.put(COMMA_BYTE);
     spans.put(spanJsonBytes);
     numSpans++;
   }
@@ -151,16 +146,14 @@ class RestBufferManager implements BufferManager {
 
   @Override
   public void prepare() throws IOException {
-    String prequelString = "{\"Spans\":[";
+    StringBuilder bld = new StringBuilder();
+    bld.append("{\"NumSpans\":").append(numSpans).append("}");
+    String prequelString = bld.toString();
     prequel.put(prequelString.getBytes(UTF8));
     prequel.flip();
 
     spans.flip();
 
-    String epilogueString = "]}";
-    epilogue.put(epilogueString.toString().getBytes(UTF8));
-    epilogue.flip();
-
     if (LOG.isTraceEnabled()) {
       LOG.trace("Preparing to send " + contentLength() + " bytes of span " +
           "data to " + conf.endpointStr + ", containing " + numSpans +
@@ -172,12 +165,11 @@ class RestBufferManager implements BufferManager {
   public void flush() throws IOException {
     // Position the buffers at the beginning.
     prequel.position(0);
-    spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma
-    epilogue.position(0);
+    spans.position(0);
 
     RestBufferManagerContentProvider contentProvider =
         new RestBufferManagerContentProvider(
-            new ByteBuffer[] { prequel, spans, epilogue });
+            new ByteBuffer[] { prequel, spans });
     long rpcLength = contentProvider.getLength();
     try {
       Request request = httpClient.
@@ -206,7 +198,6 @@ class RestBufferManager implements BufferManager {
   public void clear() {
     prequel.clear();
     spans.clear();
-    epilogue.clear();
     numSpans = 0;
   }
 

Reply via email to