Repository: incubator-htrace Updated Branches: refs/heads/master 2ccb38813 -> 35053cfc5
HTRACE-308. Deserialize WriteSpans requests incrementally rather than all at once to optimize GC (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/35053cfc Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/35053cfc Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/35053cfc Branch: refs/heads/master Commit: 35053cfc557c24e94b7336fc24801d8bdfafc973 Parents: 2ccb388 Author: Colin P. Mccabe <[email protected]> Authored: Tue Dec 1 13:27:53 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Tue Dec 1 13:27:53 2015 -0800 ---------------------------------------------------------------------- htrace-c/src/receiver/htraced.c | 10 +-- htrace-htraced/go/Godeps/Godeps.json | 2 +- .../go/src/org/apache/htrace/client/client.go | 20 +++-- .../go/src/org/apache/htrace/client/hclient.go | 41 +++++++--- .../go/src/org/apache/htrace/common/rpc.go | 6 +- .../org/apache/htrace/htraced/client_test.go | 31 +++----- .../org/apache/htrace/htraced/datastore_test.go | 4 +- .../go/src/org/apache/htrace/htraced/hrpc.go | 83 ++++++++++++-------- .../org/apache/htrace/htraced/metrics_test.go | 4 +- .../go/src/org/apache/htrace/htraced/rest.go | 38 ++++----- .../go/src/org/apache/htrace/htracedTool/cmd.go | 4 +- .../main/java/org/apache/htrace/impl/Conf.java | 4 +- .../org/apache/htrace/impl/PackedBuffer.java | 8 +- .../apache/htrace/impl/RestBufferManager.java | 23 ++---- 14 files changed, 152 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-c/src/receiver/htraced.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c index d92518d..3a2a091 100644 --- a/htrace-c/src/receiver/htraced.c +++ b/htrace-c/src/receiver/htraced.c @@ -71,7 +71,7 @@ * The maximum length of the message we will send to the server. * This must be the same or shorter than MAX_HRPC_BODY_LENGTH in rpc.go. */ -#define MAX_HRPC_LEN (64ULL * 1024ULL * 1024ULL) +#define MAX_HRPC_LEN (32ULL * 1024ULL * 1024ULL) /** * The maximum length of the prequel in a WriteSpans message. @@ -490,8 +490,8 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t now) #define DEFAULT_TRID_STR "DefaultTrid" #define DEFAULT_TRID_STR_LEN (sizeof(DEFAULT_TRID_STR) - 1) -#define SPANS_STR "Spans" -#define SPANS_STR_LEN (sizeof(SPANS_STR) - 1) +#define NUM_SPANS_STR "NumSpans" +#define NUM_SPANS_STR_LEN (sizeof(NUM_SPANS_STR) - 1) /** * Write the prequel to the WriteSpans message. @@ -511,10 +511,10 @@ static int add_writespans_prequel(struct htraced_rcv *rcv, if (!cmp_write_str(ctx, rcv->tracer->trid, strlen(rcv->tracer->trid))) { return -1; } - if (!cmp_write_fixstr(ctx, SPANS_STR, SPANS_STR_LEN)) { + if (!cmp_write_fixstr(ctx, NUM_SPANS_STR, NUM_SPANS_STR_LEN)) { return -1; } - if (!cmp_write_array(ctx, sbuf->num_spans)) { + if (!cmp_write_uint(ctx, sbuf->num_spans)) { return -1; } return bctx.off; http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/Godeps/Godeps.json ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/Godeps/Godeps.json b/htrace-htraced/go/Godeps/Godeps.json index 7c737fe..2db37be 100644 --- a/htrace-htraced/go/Godeps/Godeps.json +++ b/htrace-htraced/go/Godeps/Godeps.json @@ -24,7 +24,7 @@ }, { "ImportPath": "github.com/ugorji/go/codec", - "Rev": "1a8bf87a90ddcdc7deaa0038f127ac62135fdd58" + "Rev": "ea9cd21fa0bc41ee4bdd50ac7ed8cbc7ea2ed960" } ] } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go index 65b04e4..a2a6f8b 100644 --- a/htrace-htraced/go/src/org/apache/htrace/client/client.go +++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go @@ -142,26 +142,36 @@ func (hcl *Client) FindSpan(sid common.SpanId) (*common.Span, error) { return &span, nil } -func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error { +func (hcl *Client) WriteSpans(spans []*common.Span) error { if hcl.hrpcAddr == "" { - return hcl.writeSpansHttp(req) + return hcl.writeSpansHttp(spans) } hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks) if err != nil { return err } defer hcr.Close() - return hcr.writeSpans(req) + return hcr.writeSpans(spans) } -func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error { +func (hcl *Client) writeSpansHttp(spans []*common.Span) error { + req := common.WriteSpansReq { + NumSpans: len(spans), + } var w bytes.Buffer enc := json.NewEncoder(&w) err := enc.Encode(req) if err != nil { - return errors.New(fmt.Sprintf("Error serializing span: %s", + return errors.New(fmt.Sprintf("Error serializing WriteSpansReq: %s", err.Error())) } + for spanIdx := range(spans) { + err := enc.Encode(spans[spanIdx]) + if err != nil { + return errors.New(fmt.Sprintf("Error serializing span %d out " + + "of %d: %s", spanIdx, len(spans), err.Error())) + } + } _, _, err = hcl.makeRestRequest("POST", "writeSpans", &w) if err != nil { return err http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/client/hclient.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go index 2fcd9a0..43f0c6c 100644 --- a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go +++ b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go @@ -41,20 +41,41 @@ type HrpcClientCodec struct { testHooks *TestHooks } -func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error { - methodId := common.HrpcMethodNameToId(req.ServiceMethod) +func (cdc *HrpcClientCodec) WriteRequest(rr *rpc.Request, msg interface{}) error { + methodId := common.HrpcMethodNameToId(rr.ServiceMethod) if methodId == common.METHOD_ID_NONE { return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method name %s", - req.ServiceMethod)) + rr.ServiceMethod)) } mh := new(codec.MsgpackHandle) mh.WriteExt = true w := bytes.NewBuffer(make([]byte, 0, 2048)) + + var err error enc := codec.NewEncoder(w, mh) - err := enc.Encode(msg) - if err != nil { - return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+ - "message as msgpack: %s", err.Error())) + if methodId == common.METHOD_ID_WRITE_SPANS { + spans := msg.([]*common.Span) + req := &common.WriteSpansReq { + NumSpans: len(spans), + } + err = enc.Encode(req) + if err != nil { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+ + "message as msgpack: %s", err.Error())) + } + for spanIdx := range(spans) { + err = enc.Encode(spans[spanIdx]) + if err != nil { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+ + "span %d out of %d as msgpack: %s", spanIdx, len(spans), err.Error())) + } + } + } else { + err = enc.Encode(msg) + if err != nil { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+ + "message as msgpack: %s", err.Error())) + } } buf := w.Bytes() if len(buf) > common.MAX_HRPC_BODY_LENGTH { @@ -65,7 +86,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro hdr := common.HrpcRequestHeader{ Magic: common.HRPC_MAGIC, MethodId: methodId, - Seq: req.Seq, + Seq: rr.Seq, Length: uint32(len(buf)), } err = binary.Write(cdc.rwc, binary.LittleEndian, &hdr) @@ -154,9 +175,9 @@ func newHClient(hrpcAddr string, testHooks *TestHooks) (*hClient, error) { return &hcr, nil } -func (hcr *hClient) writeSpans(req *common.WriteSpansReq) error { +func (hcr *hClient) writeSpans(spans []*common.Span) error { resp := common.WriteSpansResp{} - return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, req, &resp) + return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, spans, &resp) } func (hcr *hClient) Close() { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/common/rpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go index 2ec5fe9..5f02db6 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go @@ -34,13 +34,13 @@ const METHOD_NAME_WRITE_SPANS = "HrpcHandler.WriteSpans" const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024 // Maximum length of HRPC message body -const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024 +const MAX_HRPC_BODY_LENGTH = 32 * 1024 * 1024 // A request to write spans to htraced. +// This request is followed by a sequence of spans. type WriteSpansReq struct { - Addr string `json:",omitempty"` // This gets filled in by the RPC layer. DefaultTrid string `json:",omitempty"` - Spans []*Span + NumSpans int } // Info returned by /server/version http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go index 3a877f6..7b64914 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go @@ -114,9 +114,7 @@ func TestClientOperations(t *testing.T) { allSpans := createRandomTestSpans(NUM_TEST_SPANS) // Write half of the spans to htraced via the client. - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans[0 : NUM_TEST_SPANS/2], - }) + err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2]) if err != nil { t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2, err.Error()) @@ -209,9 +207,7 @@ func TestDumpAll(t *testing.T) { NUM_TEST_SPANS := 100 allSpans := createRandomTestSpans(NUM_TEST_SPANS) sort.Sort(allSpans) - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans, - }) + err = hcl.WriteSpans(allSpans) if err != nil { t.Fatalf("WriteSpans failed: %s\n", err.Error()) } @@ -325,9 +321,7 @@ func TestHrpcAdmissionsControl(t *testing.T) { allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS) for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ { go func(i int) { - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans[i : i+1], - }) + err = hcl.WriteSpans(allSpans[i : i+1]) if err != nil { t.Fatalf("WriteSpans failed: %s\n", err.Error()) } @@ -379,9 +373,7 @@ func TestHrpcIoTimeout(t *testing.T) { // Keep in mind that we only block until we have seen // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that, // we let requests through so that the test can exit cleanly. - hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans[i : i+1], - }) + hcl.WriteSpans(allSpans[i : i+1]) }(iter) } for { @@ -398,6 +390,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) { htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans", Cnf: map[string]string{ conf.HTRACE_LOG_LEVEL: "INFO", + conf.HTRACE_NUM_HRPC_HANDLERS: "20", }, WrittenSpans: common.NewSemaphore(int64(1 - N)), } @@ -416,7 +409,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) { // body length limit. TODO: a production-quality golang client would do // this internally rather than needing us to do it here in the unit test. bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5 - reqs := make([]*common.WriteSpansReq, 0, 4) + reqs := make([][]*common.Span, 0, 4) curReq := -1 curReqLen := bodyLen var curReqSpans uint32 @@ -429,7 +422,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) { span := allSpans[n] if (curReqSpans >= maxSpansPerRpc) || (curReqLen >= bodyLen) { - reqs = append(reqs, &common.WriteSpansReq{}) + reqs = append(reqs, make([]*common.Span, 0, 16)) curReqLen = 0 curReq++ curReqSpans = 0 @@ -446,7 +439,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) { panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen)) } curReqLen += bufLen - reqs[curReq].Spans = append(reqs[curReq].Spans, span) + reqs[curReq] = append(reqs[curReq], span) curReqSpans++ } ht.Store.lg.Infof("num spans: %d. num WriteSpansReq calls: %d\n", N, len(reqs)) @@ -465,13 +458,13 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) { // Write many random spans. for reqIdx := range reqs { - go func() { - err = hcl.WriteSpans(reqs[reqIdx]) + go func(i int) { + err = hcl.WriteSpans(reqs[i]) if err != nil { panic(fmt.Sprintf("failed to send WriteSpans request %d: %s", - reqIdx, err.Error())) + i, err.Error())) } - }() + }(reqIdx) } // Wait for all the spans to be written. ht.Store.WrittenSpans.Wait() http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go index 4fc400a..ebf3c47 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go @@ -478,9 +478,7 @@ func TestReloadDataStore(t *testing.T) { // Create some random trace spans. NUM_TEST_SPANS := 5 allSpans := createRandomTestSpans(NUM_TEST_SPANS) - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans, - }) + err = hcl.WriteSpans(allSpans) if err != nil { t.Fatalf("WriteSpans failed: %s\n", err.Error()) } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go index a6f6751..ecd13d4 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go @@ -32,7 +32,6 @@ import ( "net/rpc" "org/apache/htrace/common" "org/apache/htrace/conf" - "reflect" "sync" "sync/atomic" "time" @@ -101,6 +100,13 @@ type HrpcServerCodec struct { // The number of messages this connection has handled. numHandled int + + // The buffer for reading requests. These buffers are reused for multiple + // requests to avoid allocating memory. + buf []byte + + // Configuration for msgpack decoding + msgpackHandle codec.MsgpackHandle } func asJson(val interface{}) string { @@ -164,29 +170,55 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { } func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { + remoteAddr := cdc.conn.RemoteAddr().String() if cdc.lg.TraceEnabled() { cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n", - cdc.conn.RemoteAddr(), cdc.length) + remoteAddr, cdc.length) + } + if cap(cdc.buf) < int(cdc.length) { + var pow uint + for pow=0;(1<<pow) < int(cdc.length);pow++ { + } + cdc.buf = make([]byte, 0, 1<<pow) } - mh := new(codec.MsgpackHandle) - mh.WriteExt = true - dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh) - err := dec.Decode(body) + _, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length]) if err != nil { return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+ "request body: %s", cdc.length, err.Error())) } + var zeroTime time.Time + cdc.conn.SetDeadline(zeroTime) + + dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle) + err = dec.Decode(body) if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("%s: read %d-byte request body %s\n", - cdc.conn.RemoteAddr(), cdc.length, asJson(&body)) + cdc.lg.Tracef("%s: read HRPC message: %s\n", + remoteAddr, asJson(&body)) } - val := reflect.ValueOf(body) - addr := val.Elem().FieldByName("Addr") - if addr.IsValid() { - addr.SetString(cdc.conn.RemoteAddr().String()) + req := body.(*common.WriteSpansReq) + if req == nil { + return nil } - var zeroTime time.Time - cdc.conn.SetDeadline(zeroTime) + // We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage + // collector with a ton of trace spans all at once. + startTime := time.Now() + client, _, err := net.SplitHostPort(remoteAddr) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+ + "for %s: %s\n", remoteAddr, err.Error())) + } + hand := cdc.hsv.hand + ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid) + for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ { + var span *common.Span + err := dec.Decode(&span) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d " + + "out of %d: %s\n", spanIdx, req.NumSpans, err.Error())) + } + ing.IngestSpan(span) + } + ing.Close(startTime) return nil } @@ -197,10 +229,8 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e var err error buf := EMPTY if msg != nil { - mh := new(codec.MsgpackHandle) - mh.WriteExt = true w := bytes.NewBuffer(make([]byte, 0, 128)) - enc := codec.NewEncoder(w, mh) + enc := codec.NewEncoder(w, &cdc.msgpackHandle) err := enc.Encode(msg) if err != nil { return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+ @@ -257,20 +287,8 @@ func (cdc *HrpcServerCodec) Close() error { } func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, - resp *common.WriteSpansResp) (err error) { - startTime := time.Now() - hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s). "+ - "defaultTrid = %s\n", len(req.Spans), req.DefaultTrid) - client, _, err := net.SplitHostPort(req.Addr) - if err != nil { - return errors.New(fmt.Sprintf("Failed to split host and port "+ - "for %s: %s\n", req.Addr, err.Error())) - } - ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid) - for spanIdx := range req.Spans { - ing.IngestSpan(req.Spans[spanIdx]) - } - ing.Close(startTime) + resp *common.WriteSpansResp) (err error) { + // Nothing to do here; WriteSpans is handled in ReadRequestBody. return nil } @@ -303,6 +321,9 @@ func CreateHrpcServer(cnf *conf.Config, store *dataStore, hsv.cdcs <- &HrpcServerCodec{ lg: lg, hsv: hsv, + msgpackHandle: codec.MsgpackHandle { + WriteExt: true, + }, } } var err error http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go index bad7889..6daf640 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go @@ -118,9 +118,7 @@ func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) { NUM_TEST_SPANS := 12 allSpans := createRandomTestSpans(NUM_TEST_SPANS) - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans, - }) + err = hcl.WriteSpans(allSpans) if err != nil { t.Fatalf("WriteSpans failed: %s\n", err.Error()) } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go index da82912..74ec0cf 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go @@ -24,8 +24,6 @@ import ( "encoding/json" "fmt" "github.com/gorilla/mux" - "io" - "io/ioutil" "net" "net/http" "org/apache/htrace/common" @@ -230,34 +228,32 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques req.RemoteAddr, serr.Error())) return } - var dec *json.Decoder - if hand.lg.TraceEnabled() { - b, err := ioutil.ReadAll(req.Body) - if err != nil { - writeError(hand.lg, w, http.StatusBadRequest, - fmt.Sprintf("Error reading span data: %s", err.Error())) - return - } - hand.lg.Tracef("writeSpansHandler: read %s\n", string(b)) - dec = json.NewDecoder(bytes.NewBuffer(b)) - } else { - dec = json.NewDecoder(req.Body) - } + dec := json.NewDecoder(req.Body) var msg common.WriteSpansReq err := dec.Decode(&msg) - if (err != nil) && (err != io.EOF) { + if (err != nil) { writeError(hand.lg, w, http.StatusBadRequest, fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error())) return } - hand.lg.Debugf("writeSpansHandler: received %d span(s). defaultTrid = %s\n", - len(msg.Spans), msg.DefaultTrid) - + if hand.lg.TraceEnabled() { + hand.lg.Tracef("%s: read WriteSpans REST message: %s\n", + req.RemoteAddr, asJson(&msg)) + } ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid) - for spanIdx := range msg.Spans { - ing.IngestSpan(msg.Spans[spanIdx]) + for spanIdx := 0; spanIdx < msg.NumSpans; spanIdx++ { + var span *common.Span + err := dec.Decode(&span) + if err != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Failed to decode span %d out of %d: ", + spanIdx, msg.NumSpans, err.Error())) + return + } + ing.IngestSpan(span) } ing.Close(startTime) + return } type queryHandler struct { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go index 9837e94..2eff0a8 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go +++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go @@ -362,9 +362,7 @@ func doLoadSpans(hcl *htrace.Client, reader io.Reader) int { } fmt.Printf("\n") } - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: spans, - }) + err = hcl.WriteSpans(spans) if err != nil { fmt.Println(err.Error()) return EXIT_FAILURE http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java index 3206dd6..e5059f7 100644 --- a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java @@ -113,11 +113,11 @@ class Conf { */ final static String BUFFER_SIZE_KEY = "htraced.receiver.buffer.size"; - final static int BUFFER_SIZE_DEFAULT = 48 * 1024 * 1024; + final static int BUFFER_SIZE_DEFAULT = 16 * 1024 * 1024; static int BUFFER_SIZE_MIN = 4 * 1024 * 1024; // The maximum buffer size should not be longer than // PackedBuffer.MAX_HRPC_BODY_LENGTH. - final static int BUFFER_SIZE_MAX = 63 * 1024 * 1024; + final static int BUFFER_SIZE_MAX = 32 * 1024 * 1024; /** * Set the fraction of the span buffer which needs to fill up before we http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java index f867ad7..dd0a4b9 100644 --- a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java @@ -78,7 +78,7 @@ class PackedBuffer { private static final Log LOG = LogFactory.getLog(PackedBuffer.class); private static final Charset UTF8 = StandardCharsets.UTF_8; - private static final byte SPANS[] = "Spans".getBytes(UTF8); + private static final byte NUM_SPANS[] = "NumSpans".getBytes(UTF8); private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8); private static final byte A[] = "a".getBytes(UTF8); private static final byte B[] = "b".getBytes(UTF8); @@ -401,9 +401,9 @@ class PackedBuffer { packer.writePayload(DEFAULT_PID); packer.packString(defaultPid); } - packer.packRawStringHeader(SPANS.length); - packer.writePayload(SPANS); - packer.packArrayHeader(numSpans); + packer.packRawStringHeader(NUM_SPANS.length); + packer.writePayload(NUM_SPANS); + packer.packInt(numSpans); packer.flush(); success = true; } finally { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java index 377d19f..39e5f99 100644 --- a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java @@ -42,15 +42,12 @@ import org.eclipse.jetty.http.HttpStatus; class RestBufferManager implements BufferManager { private static final Log LOG = LogFactory.getLog(RestBufferManager.class); private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final byte COMMA_BYTE = (byte)0x2c; private static final int MAX_PREQUEL_LENGTH = 512; - private static final int MAX_EPILOGUE_LENGTH = 32; private final Conf conf; private final HttpClient httpClient; private final String urlString; private final ByteBuffer prequel; private final ByteBuffer spans; - private final ByteBuffer epilogue; private int numSpans; private static class RestBufferManagerContentProvider @@ -122,7 +119,6 @@ class RestBufferManager implements BufferManager { conf.endpoint.getPort(), "/writeSpans").toString(); this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH); this.spans = ByteBuffer.allocate(conf.bufferSize); - this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH); clear(); this.httpClient.start(); } @@ -130,11 +126,10 @@ class RestBufferManager implements BufferManager { @Override public void writeSpan(Span span) throws IOException { byte[] spanJsonBytes = span.toString().getBytes(UTF8); - if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) { - // Make sure we have enough space for the span JSON and a comma. + if ((spans.capacity() - spans.position()) < spanJsonBytes.length) { + // Make sure we have enough space for the span JSON. throw new IOException("Not enough space remaining in span buffer."); } - spans.put(COMMA_BYTE); spans.put(spanJsonBytes); numSpans++; } @@ -151,16 +146,14 @@ class RestBufferManager implements BufferManager { @Override public void prepare() throws IOException { - String prequelString = "{\"Spans\":["; + StringBuilder bld = new StringBuilder(); + bld.append("{\"NumSpans\":").append(numSpans).append("}"); + String prequelString = bld.toString(); prequel.put(prequelString.getBytes(UTF8)); prequel.flip(); spans.flip(); - String epilogueString = "]}"; - epilogue.put(epilogueString.toString().getBytes(UTF8)); - epilogue.flip(); - if (LOG.isTraceEnabled()) { LOG.trace("Preparing to send " + contentLength() + " bytes of span " + "data to " + conf.endpointStr + ", containing " + numSpans + @@ -172,12 +165,11 @@ class RestBufferManager implements BufferManager { public void flush() throws IOException { // Position the buffers at the beginning. prequel.position(0); - spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma - epilogue.position(0); + spans.position(0); RestBufferManagerContentProvider contentProvider = new RestBufferManagerContentProvider( - new ByteBuffer[] { prequel, spans, epilogue }); + new ByteBuffer[] { prequel, spans }); long rpcLength = contentProvider.getLength(); try { Request request = httpClient. @@ -206,7 +198,6 @@ class RestBufferManager implements BufferManager { public void clear() { prequel.clear(); spans.clear(); - epilogue.clear(); numSpans = 0; }
