Repository: incubator-htrace Updated Branches: refs/heads/master 26fdb7cd8 -> 5210d97d2
HTRACE-150. htraced: add HRPC endpoint for writeSpans (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/5210d97d Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/5210d97d Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/5210d97d Branch: refs/heads/master Commit: 5210d97d2ff5c275570a916a1f6758578b23a772 Parents: 26fdb7c Author: Colin P. Mccabe <[email protected]> Authored: Wed Apr 22 15:16:55 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Wed Apr 22 15:16:55 2015 -0700 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/client/client.go | 75 ++++++- .../go/src/org/apache/htrace/client/hclient.go | 151 ++++++++++++++ .../src/go/src/org/apache/htrace/common/rpc.go | 81 ++++++++ .../src/org/apache/htrace/conf/config_keys.go | 9 +- .../src/go/src/org/apache/htrace/htrace/cmd.go | 44 +++-- .../org/apache/htrace/htraced/client_test.go | 20 +- .../org/apache/htrace/htraced/datastore_test.go | 9 +- .../go/src/org/apache/htrace/htraced/hrpc.go | 197 +++++++++++++++++++ .../go/src/org/apache/htrace/htraced/htraced.go | 15 ++ .../org/apache/htrace/htraced/mini_htraced.go | 14 +- 10 files changed, 569 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go b/htrace-core/src/go/src/org/apache/htrace/client/client.go index 5e594d9..44e2f69 100644 --- a/htrace-core/src/go/src/org/apache/htrace/client/client.go +++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go @@ -35,13 +35,24 @@ import ( // TODO: fancier APIs for streaming spans in the background, optimize TCP stuff func NewClient(cnf *conf.Config) (*Client, error) { - hcl := Client{restAddr: cnf.Get(conf.HTRACE_WEB_ADDRESS)} + hcl := Client{} + hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS) + if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" { + var err error + hcl.hcr, err = newHClient(cnf) + if err != nil { + return nil, err + } + } return &hcl, nil } type Client struct { // REST address of the htraced server. restAddr string + + // The HRPC client, or null if it is not enabled. + hcr *hClient } // Get the htraced server information. @@ -77,12 +88,42 @@ func (hcl *Client) FindSpan(sid common.SpanId) (*common.Span, error) { return &span, nil } -func (hcl *Client) WriteSpan(span *common.Span) error { - buf, err := json.Marshal(span) - if err != nil { - return err +func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error { + if hcl.hcr != nil { + return hcl.hcr.writeSpans(req) + } else { + return hcl.writeSpansHttp(req) } - _, _, err = hcl.makeRestRequest("POST", "writeSpans", bytes.NewReader(buf)) +} + +func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error { + var w bytes.Buffer + var err error + for i := range req.Spans { + var buf []byte + buf, err = json.Marshal(req.Spans[i]) + if err != nil { + return errors.New(fmt.Sprintf("Error serializing span: %s", + err.Error())) + } + _, err = w.Write(buf) + if err != nil { + return errors.New(fmt.Sprintf("Error writing span: %s", + err.Error())) + } + _, err = w.Write([]byte{'\n'}) + //err = io.WriteString(&w, "\n") + if err != nil { + return errors.New(fmt.Sprintf("Error writing: %s", + err.Error())) + } + } + customHeaders := make(map[string]string) + if req.DefaultPid != "" { + customHeaders["htrace-pid"] = req.DefaultPid + } + _, _, err = hcl.makeRestRequest("POST", "writeSpans", + &w, customHeaders) if err != nil { return err } @@ -90,7 +131,6 @@ func (hcl *Client) WriteSpan(span *common.Span) error { } // Find the child IDs of a given span ID. -// TODO: add offset as well as limit? func (hcl *Client) FindChildren(sid common.SpanId, lim int) ([]common.SpanId, error) { buf, _, err := hcl.makeGetRequest(fmt.Sprintf("span/%016x/children?lim=%d", uint64(sid), lim)) @@ -126,17 +166,24 @@ func (hcl *Client) Query(query *common.Query) ([]common.Span, error) { return spans, nil } +var EMPTY = make(map[string]string) + func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) { - return hcl.makeRestRequest("GET", reqName, nil) + return hcl.makeRestRequest("GET", reqName, nil, EMPTY) } // Make a general JSON REST request. // Returns the request body, the response code, and the error. // Note: if the response code is non-zero, the error will also be non-zero. -func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader) ([]byte, int, error) { - url := fmt.Sprintf("http://%s/%s", hcl.restAddr, reqName) +func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader, + customHeaders map[string]string) ([]byte, int, error) { + url := fmt.Sprintf("http://%s/%s", + hcl.restAddr, reqName) req, err := http.NewRequest(reqType, url, reqBody) req.Header.Set("Content-Type", "application/json") + for k, v := range customHeaders { + req.Header.Set(k, v) + } client := &http.Client{} resp, err := client.Do(req) if err != nil { @@ -187,3 +234,11 @@ func (hcl *Client) DumpAll(lim int, out chan *common.Span) error { searchId = spans[len(spans)-1].Id + 1 } } + +func (hcl *Client) Close() { + if hcl.hcr != nil { + hcl.hcr.Close() + } + hcl.restAddr = "" + hcl.hcr = nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/client/hclient.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/client/hclient.go b/htrace-core/src/go/src/org/apache/htrace/client/hclient.go new file mode 100644 index 0000000..1730c02 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/client/hclient.go @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package client + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/rpc" + "org/apache/htrace/common" + "org/apache/htrace/conf" +) + +type hClient struct { + rpcClient *rpc.Client +} + +type HrpcClientCodec struct { + rwc io.ReadWriteCloser + length uint32 +} + +func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error { + methodId := common.HrpcMethodNameToId(req.ServiceMethod) + if methodId == common.METHOD_ID_NONE { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method name %s", + req.ServiceMethod)) + } + buf, err := json.Marshal(msg) + if err != nil { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+ + "message as JSON: %s", err.Error())) + } + if len(buf) > common.MAX_HRPC_BODY_LENGTH { + return errors.New(fmt.Sprintf("HrpcClientCodec: message body is %d "+ + "bytes, but the maximum message size is %d bytes.", + len(buf), common.MAX_HRPC_BODY_LENGTH)) + } + hdr := common.HrpcRequestHeader{ + Magic: common.HRPC_MAGIC, + MethodId: methodId, + Seq: req.Seq, + Length: uint32(len(buf)), + } + err = binary.Write(cdc.rwc, binary.BigEndian, &hdr) + if err != nil { + return errors.New(fmt.Sprintf("Error writing header bytes: %s", + err.Error())) + } + _, err = cdc.rwc.Write(buf) + if err != nil { + return errors.New(fmt.Sprintf("Error writing body bytes: %s", + err.Error())) + } + return nil +} + +func (cdc *HrpcClientCodec) ReadResponseHeader(resp *rpc.Response) error { + hdr := common.HrpcResponseHeader{} + err := binary.Read(cdc.rwc, binary.BigEndian, &hdr) + if err != nil { + return errors.New(fmt.Sprintf("Error reading response header "+ + "bytes: %s", err.Error())) + } + resp.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId) + if resp.ServiceMethod == "" { + return errors.New(fmt.Sprintf("Error reading response header: "+ + "invalid method ID %d.", hdr.MethodId)) + } + resp.Seq = hdr.Seq + if hdr.ErrLength > 0 { + if hdr.ErrLength > common.MAX_HRPC_ERROR_LENGTH { + return errors.New(fmt.Sprintf("Error reading response header: "+ + "error message was %d bytes long, but "+ + "MAX_HRPC_ERROR_LENGTH is %d.", + hdr.ErrLength, common.MAX_HRPC_ERROR_LENGTH)) + } + buf := make([]byte, hdr.ErrLength) + var nread int + nread, err = cdc.rwc.Read(buf) + if uint32(nread) != hdr.ErrLength { + return errors.New(fmt.Sprintf("Error reading response header: "+ + "failed to read %d bytes of error message.", nread)) + } + if err != nil { + return errors.New(fmt.Sprintf("Error reading response header: "+ + "failed to read %d bytes of error message: %s", + nread, err.Error())) + } + resp.Error = string(buf) + } else { + resp.Error = "" + } + cdc.length = hdr.Length + return nil +} + +func (cdc *HrpcClientCodec) ReadResponseBody(body interface{}) error { + dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length))) + err := dec.Decode(body) + if err != nil { + return errors.New(fmt.Sprintf("Failed to read response body: %s", + err.Error())) + } + return nil +} + +func (cdc *HrpcClientCodec) Close() error { + return cdc.rwc.Close() +} + +func newHClient(cnf *conf.Config) (*hClient, error) { + hcr := hClient{} + addr := cnf.Get(conf.HTRACE_HRPC_ADDRESS) + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+ + "at %s: %s", addr, err.Error())) + } + hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn}) + return &hcr, nil +} + +func (hcr *hClient) writeSpans(req *common.WriteSpansReq) error { + resp := common.WriteSpansResp{} + return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, req, &resp) +} + +func (hcr *hClient) Close() { + hcr.rpcClient.Close() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/common/rpc.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/rpc.go b/htrace-core/src/go/src/org/apache/htrace/common/rpc.go new file mode 100644 index 0000000..cdf7e08 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/common/rpc.go @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +// The 4-byte magic number which is sent first in the HRPC header +const HRPC_MAGIC = 0x48545243 + +// Method ID codes. Do not reorder these. +const ( + METHOD_ID_NONE = 0 + METHOD_ID_WRITE_SPANS = iota +) + +const METHOD_NAME_WRITE_SPANS = "HrpcHandler.WriteSpans" + +// Maximum length of the error message passed in an HRPC response +const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024 + +// Maximum length of HRPC message body +const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024 + +// A request to write spans to htraced. +type WriteSpansReq struct { + DefaultPid string + Spans []*Span +} + +// A response to a WriteSpansReq +type WriteSpansResp struct { +} + +// The header which is sent over the wire for HRPC +type HrpcRequestHeader struct { + Magic uint32 + MethodId uint32 + Seq uint64 + Length uint32 +} + +// The response which is sent over the wire for HRPC +type HrpcResponseHeader struct { + Seq uint64 + MethodId uint32 + ErrLength uint32 + Length uint32 +} + +func HrpcMethodIdToMethodName(id uint32) string { + switch id { + case METHOD_ID_WRITE_SPANS: + return METHOD_NAME_WRITE_SPANS + default: + return "" + } +} + +func HrpcMethodNameToId(name string) uint32 { + switch name { + case METHOD_NAME_WRITE_SPANS: + return METHOD_ID_WRITE_SPANS + default: + return METHOD_ID_NONE + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go index ba63f2d..ccb09e0 100644 --- a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go +++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go @@ -47,6 +47,12 @@ const HTRACE_WEB_ADDRESS = "web.address" // The default port for the Htrace web address. const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9095 +// The web address to start the REST server on. +const HTRACE_HRPC_ADDRESS = "hrpc.address" + +// The default port for the Htrace HRPC address. +const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075 + // The directories to put the data store into. Separated by PATH_LIST_SEP. const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories" @@ -69,7 +75,8 @@ const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address" // Default values for HTrace configuration keys. var DEFAULTS = map[string]string{ - HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT), + HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT), + HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_HRPC_ADDRESS_DEFAULT_PORT), HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" + PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2", HTRACE_DATA_STORE_CLEAR: "false", http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go index 0317237..38cdb58 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go +++ b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go @@ -21,6 +21,7 @@ package main import ( "bufio" + "bytes" "encoding/json" "errors" "fmt" @@ -203,8 +204,17 @@ func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int { return EXIT_FAILURE } defer file.Close() - in := bufio.NewReader(file) - dec := json.NewDecoder(in) + return doLoadSpans(hcl, bufio.NewReader(file)) +} + +func doLoadSpanJson(hcl *htrace.Client, spanJson string) int { + return doLoadSpans(hcl, bytes.NewBufferString(spanJson)) +} + +func doLoadSpans(hcl *htrace.Client, reader io.Reader) int { + dec := json.NewDecoder(reader) + spans := make([]*common.Span, 0, 32) + var err error for { var span common.Span if err = dec.Decode(&span); err != nil { @@ -214,26 +224,20 @@ func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int { fmt.Printf("Failed to decode JSON: %s\n", err.Error()) return EXIT_FAILURE } - if *verbose { - fmt.Printf("wrote %s\n", span.ToJson()) - } - if err = hcl.WriteSpan(&span); err != nil { - fmt.Println(err.Error()) - return EXIT_FAILURE - } + spans = append(spans, &span) } - return EXIT_SUCCESS -} - -func doLoadSpanJson(hcl *htrace.Client, spanJson string) int { - spanBytes := []byte(spanJson) - var span common.Span - err := json.Unmarshal(spanBytes, &span) - if err != nil { - fmt.Printf("Error parsing provided JSON: %s\n", err.Error()) - return EXIT_FAILURE + if *verbose { + fmt.Printf("Writing ") + prefix := "" + for i := range spans { + fmt.Printf("%s%s", prefix, spans[i].ToJson()) + prefix = ", " + } + fmt.Printf("\n") } - err = hcl.WriteSpan(&span) + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: spans, + }) if err != nil { fmt.Println(err.Error()) return EXIT_FAILURE http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go index f9e66ce..218c1c8 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go @@ -79,10 +79,12 @@ func TestClientOperations(t *testing.T) { allSpans := createRandomTestSpans(NUM_TEST_SPANS) // Write half of the spans to htraced via the client. - for i := 0; i < NUM_TEST_SPANS/2; i++ { - if err := hcl.WriteSpan(allSpans[i]); err != nil { - t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error()) - } + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans[0 : NUM_TEST_SPANS/2], + }) + if err != nil { + t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2, + err.Error()) } // Look up the first half of the spans. They should be found. @@ -165,11 +167,11 @@ func TestDumpAll(t *testing.T) { NUM_TEST_SPANS := 100 allSpans := createRandomTestSpans(NUM_TEST_SPANS) sort.Sort(allSpans) - for i := range allSpans { - err = hcl.WriteSpan(allSpans[i]) - if err != nil { - t.Fatalf("failed to write span %d: %s", i, err.Error()) - } + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans, + }) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) } out := make(chan *common.Span, 50) var dumpErr error http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go index 77497c5..79a7c4f 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go @@ -374,10 +374,11 @@ func TestReloadDataStore(t *testing.T) { // Create some random trace spans. NUM_TEST_SPANS := 5 allSpans := createRandomTestSpans(NUM_TEST_SPANS) - for i := 0; i < NUM_TEST_SPANS; i++ { - if err := hcl.WriteSpan(allSpans[i]); err != nil { - t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error()) - } + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans, + }) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) } // Look up the spans we wrote. http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go new file mode 100644 index 0000000..9696cbc --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/rpc" + "org/apache/htrace/common" + "org/apache/htrace/conf" +) + +// Handles HRPC calls +type HrpcHandler struct { + lg *common.Logger + store *dataStore +} + +// The HRPC server +type HrpcServer struct { + *rpc.Server + hand *HrpcHandler + listener net.Listener +} + +// Codec which encodes HRPC data via JSON +type HrpcServerCodec struct { + rwc io.ReadWriteCloser + length uint32 +} + +func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { + hdr := common.HrpcRequestHeader{} + err := binary.Read(cdc.rwc, binary.BigEndian, &hdr) + if err != nil { + return errors.New(fmt.Sprintf("Error reading header bytes: %s", err.Error())) + } + if hdr.Magic != common.HRPC_MAGIC { + return errors.New(fmt.Sprintf("Invalid request header: expected "+ + "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic)) + } + if hdr.Length > common.MAX_HRPC_BODY_LENGTH { + return errors.New(fmt.Sprintf("Length prefix was too long. Maximum "+ + "length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, hdr.Length)) + } + req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId) + if req.ServiceMethod == "" { + return errors.New(fmt.Sprintf("Unknown MethodID code 0x%04x", + hdr.MethodId)) + } + req.Seq = hdr.Seq + cdc.length = hdr.Length + return nil +} + +func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { + dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length))) + err := dec.Decode(body) + if err != nil { + return errors.New(fmt.Sprintf("Failed to read request body: %s", + err.Error())) + } + return nil +} + +var EMPTY []byte = make([]byte, 0) + +func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error { + var err error + buf := EMPTY + if msg != nil { + buf, err = json.Marshal(msg) + if err != nil { + return errors.New(fmt.Sprintf("Failed to marshal response message: %s", + err.Error())) + } + } + hdr := common.HrpcResponseHeader{} + hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod) + hdr.Seq = resp.Seq + hdr.ErrLength = uint32(len(resp.Error)) + hdr.Length = uint32(len(buf)) + writer := bufio.NewWriterSize(cdc.rwc, 256) + err = binary.Write(writer, binary.BigEndian, &hdr) + if err != nil { + return errors.New(fmt.Sprintf("Failed to write response header: %s", + err.Error())) + } + if hdr.ErrLength > 0 { + _, err = io.WriteString(writer, resp.Error) + if err != nil { + return errors.New(fmt.Sprintf("Failed to write error string: %s", + err.Error())) + } + } + if hdr.Length > 0 { + var length int + length, err = writer.Write(buf) + if err != nil { + return errors.New(fmt.Sprintf("Failed to write response "+ + "message: %s", err.Error())) + } + if uint32(length) != hdr.Length { + return errors.New(fmt.Sprintf("Failed to write all of response "+ + "message: %s", err.Error())) + } + } + err = writer.Flush() + if err != nil { + return errors.New(fmt.Sprintf("Failed to write the response bytes: "+ + "%s", err.Error())) + } + return nil +} + +func (cdc *HrpcServerCodec) Close() error { + return cdc.rwc.Close() +} + +func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, + resp *common.WriteSpansResp) (err error) { + hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s). "+ + "defaultPid = %s\n", len(req.Spans), req.DefaultPid) + for i := range req.Spans { + span := req.Spans[i] + if span.ProcessId == "" { + span.ProcessId = req.DefaultPid + } + hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson()) + hand.store.WriteSpan(span) + } + return nil +} + +func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) { + lg := common.NewLogger("hrpc", cnf) + hsv := &HrpcServer{ + Server: rpc.NewServer(), + hand: &HrpcHandler{ + lg: lg, + store: store, + }, + } + var err error + hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS)) + if err != nil { + return nil, err + } + hsv.Server.Register(hsv.hand) + go hsv.run() + lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String()) + return hsv, nil +} + +func (hsv *HrpcServer) run() { + lg := hsv.hand.lg + for { + conn, err := hsv.listener.Accept() + if err != nil { + lg.Errorf("HRPC Accept error: %s\n", err.Error()) + continue + } + go hsv.ServeCodec(&HrpcServerCodec{ + rwc: conn, + }) + } +} + +func (hsv *HrpcServer) Addr() net.Addr { + return hsv.listener.Addr() +} + +func (hsv *HrpcServer) Close() { + hsv.listener.Close() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go index c3432b4..64da457 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go @@ -78,12 +78,26 @@ func main() { lg.Errorf("Error creating REST server: %s\n", err.Error()) os.Exit(1) } + var hsv *HrpcServer + if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" { + hsv, err = CreateHrpcServer(cnf, store) + if err != nil { + lg.Errorf("Error creating HRPC server: %s\n", err.Error()) + os.Exit(1) + } + } else { + lg.Infof("Not starting HRPC server because no value was given for %s.\n", + conf.HTRACE_HRPC_ADDRESS) + } naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS) if naddr != "" { notif := StartupNotification{ HttpAddr: rsv.Addr().String(), ProcessId: os.Getpid(), } + if hsv != nil { + notif.HrpcAddr = hsv.Addr().String() + } err = sendStartupNotification(naddr, ¬if) if err != nil { fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+ @@ -100,6 +114,7 @@ func main() { // Used by unit tests. type StartupNotification struct { HttpAddr string + HrpcAddr string ProcessId int } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go index be7e284..a54f2cb 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go @@ -62,6 +62,7 @@ type MiniHTraced struct { DataDirs []string Store *dataStore Rsv *RestServer + Hsv *HrpcServer Lg *common.Logger KeepDataDirsOnClose bool } @@ -70,6 +71,7 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { var err error var store *dataStore var rsv *RestServer + var hsv *HrpcServer if bld.Name == "" { bld.Name = "HTraceTest" } @@ -90,7 +92,8 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { } bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] = strings.Join(bld.DataDirs, conf.PATH_LIST_SEP) - bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server + bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server + bld.Cnf[conf.HTRACE_HRPC_ADDRESS] = ":0" // use a random port for the HRPC server bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE" cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS} cnf, err := cnfBld.Build() @@ -123,6 +126,11 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { if err != nil { return nil, err } + hsv, err = CreateHrpcServer(cnf, store) + if err != nil { + return nil, err + } + lg.Infof("Created MiniHTraced %s\n", bld.Name) return &MiniHTraced{ Name: bld.Name, @@ -130,6 +138,7 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { DataDirs: bld.DataDirs, Store: store, Rsv: rsv, + Hsv: hsv, Lg: lg, KeepDataDirsOnClose: bld.KeepDataDirsOnClose, }, nil @@ -137,7 +146,8 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { // Return a Config object that clients can use to connect to this MiniHTraceD. func (ht *MiniHTraced) ClientConf() *conf.Config { - return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String()) + return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(), + conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String()) } func (ht *MiniHTraced) Close() {
