HTRACE-357. Rename htrace-htraced/go/src/org/apache/htrace to htrace-htraced/go/src/htrace (Colin Patrick McCabe via iwasakims)
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/5737e65b Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/5737e65b Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/5737e65b Branch: refs/heads/master Commit: 5737e65b04b0fe7f26ae8bf9a5a9abcf22f14f55 Parents: e629995 Author: Masatake Iwasaki <[email protected]> Authored: Wed Apr 20 00:27:32 2016 +0900 Committer: Masatake Iwasaki <[email protected]> Committed: Wed Apr 20 00:27:32 2016 +0900 ---------------------------------------------------------------------- htrace-htraced/go/gobuild.sh | 6 +- htrace-htraced/go/src/htrace/client/client.go | 285 ++++ htrace-htraced/go/src/htrace/client/hclient.go | 185 +++ htrace-htraced/go/src/htrace/common/log.go | 332 +++++ htrace-htraced/go/src/htrace/common/log_test.go | 170 +++ htrace-htraced/go/src/htrace/common/process.go | 101 ++ .../go/src/htrace/common/process_test.go | 116 ++ htrace-htraced/go/src/htrace/common/query.go | 128 ++ .../go/src/htrace/common/query_test.go | 50 + htrace-htraced/go/src/htrace/common/rpc.go | 159 +++ .../go/src/htrace/common/semaphore.go | 78 + .../go/src/htrace/common/semaphore_test.go | 86 ++ htrace-htraced/go/src/htrace/common/span.go | 217 +++ .../go/src/htrace/common/span_test.go | 116 ++ .../go/src/htrace/common/test_util.go | 91 ++ htrace-htraced/go/src/htrace/common/time.go | 34 + .../go/src/htrace/common/time_test.go | 38 + htrace-htraced/go/src/htrace/conf/config.go | 302 ++++ .../go/src/htrace/conf/config_keys.go | 134 ++ .../go/src/htrace/conf/config_test.go | 144 ++ htrace-htraced/go/src/htrace/conf/xml.go | 61 + .../go/src/htrace/htraced/client_test.go | 484 +++++++ .../go/src/htrace/htraced/datastore.go | 1339 +++++++++++++++++ .../go/src/htrace/htraced/datastore_test.go | 761 ++++++++++ .../go/src/htrace/htraced/heartbeater.go | 125 ++ .../go/src/htrace/htraced/heartbeater_test.go | 100 ++ htrace-htraced/go/src/htrace/htraced/hrpc.go | 386 +++++ htrace-htraced/go/src/htrace/htraced/htraced.go | 181 +++ htrace-htraced/go/src/htrace/htraced/loader.go | 511 +++++++ htrace-htraced/go/src/htrace/htraced/metrics.go | 194 +++ .../go/src/htrace/htraced/metrics_test.go | 172 +++ .../go/src/htrace/htraced/mini_htraced.go | 193 +++ .../go/src/htrace/htraced/reaper_test.go | 83 ++ htrace-htraced/go/src/htrace/htraced/rest.go | 376 +++++ htrace-htraced/go/src/htrace/htracedTool/cmd.go | 442 ++++++ .../go/src/htrace/htracedTool/file.go | 138 ++ .../go/src/htrace/htracedTool/file_test.go | 161 +++ .../go/src/htrace/htracedTool/graph.go | 116 ++ .../go/src/htrace/htracedTool/graph_test.go | 80 ++ .../go/src/htrace/htracedTool/queries.go | 172 +++ .../go/src/htrace/htracedTool/query_test.go | 88 ++ htrace-htraced/go/src/htrace/test/random.go | 80 ++ .../go/src/org/apache/htrace/client/client.go | 285 ---- .../go/src/org/apache/htrace/client/hclient.go | 185 --- .../go/src/org/apache/htrace/common/log.go | 332 ----- .../go/src/org/apache/htrace/common/log_test.go | 170 --- .../go/src/org/apache/htrace/common/process.go | 101 -- .../org/apache/htrace/common/process_test.go | 116 -- .../go/src/org/apache/htrace/common/query.go | 128 -- .../src/org/apache/htrace/common/query_test.go | 50 - .../go/src/org/apache/htrace/common/rpc.go | 159 --- .../src/org/apache/htrace/common/semaphore.go | 78 - .../org/apache/htrace/common/semaphore_test.go | 86 -- .../go/src/org/apache/htrace/common/span.go | 217 --- .../src/org/apache/htrace/common/span_test.go | 116 -- .../src/org/apache/htrace/common/test_util.go | 91 -- .../go/src/org/apache/htrace/common/time.go | 34 - .../src/org/apache/htrace/common/time_test.go | 38 - .../go/src/org/apache/htrace/conf/config.go | 302 ---- .../src/org/apache/htrace/conf/config_keys.go | 134 -- .../src/org/apache/htrace/conf/config_test.go | 144 -- .../go/src/org/apache/htrace/conf/xml.go | 61 - .../org/apache/htrace/htraced/client_test.go | 484 ------- .../src/org/apache/htrace/htraced/datastore.go | 1340 ------------------ .../org/apache/htrace/htraced/datastore_test.go | 761 ---------- .../org/apache/htrace/htraced/heartbeater.go | 125 -- .../apache/htrace/htraced/heartbeater_test.go | 100 -- .../go/src/org/apache/htrace/htraced/hrpc.go | 386 ----- .../go/src/org/apache/htrace/htraced/htraced.go | 181 --- .../go/src/org/apache/htrace/htraced/loader.go | 511 ------- .../go/src/org/apache/htrace/htraced/metrics.go | 194 --- .../org/apache/htrace/htraced/metrics_test.go | 172 --- .../org/apache/htrace/htraced/mini_htraced.go | 193 --- .../org/apache/htrace/htraced/reaper_test.go | 83 -- .../go/src/org/apache/htrace/htraced/rest.go | 376 ----- .../go/src/org/apache/htrace/htracedTool/cmd.go | 442 ------ .../src/org/apache/htrace/htracedTool/file.go | 138 -- .../org/apache/htrace/htracedTool/file_test.go | 161 --- .../src/org/apache/htrace/htracedTool/graph.go | 116 -- .../org/apache/htrace/htracedTool/graph_test.go | 80 -- .../org/apache/htrace/htracedTool/queries.go | 172 --- .../org/apache/htrace/htracedTool/query_test.go | 88 -- .../go/src/org/apache/htrace/test/random.go | 80 -- 83 files changed, 9012 insertions(+), 9013 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/gobuild.sh ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/gobuild.sh b/htrace-htraced/go/gobuild.sh index de9e687..98123be 100755 --- a/htrace-htraced/go/gobuild.sh +++ b/htrace-htraced/go/gobuild.sh @@ -132,15 +132,15 @@ install) # Inject the release and git version into the htraced ldflags. echo "Building ${RELEASE_VERSION} [${GIT_VERSION}]" FLAGS="-X main.RELEASE_VERSION ${RELEASE_VERSION} -X main.GIT_VERSION ${GIT_VERSION}" - go install ${TAGS} -ldflags "${FLAGS}" -v org/apache/htrace/... "$@" \ + go install ${TAGS} -ldflags "${FLAGS}" -v htrace/... "$@" \ || die "go install failed." # Set the RPATH to make bundling leveldb and snappy easier. set_rpath "${GOBIN}/htraced" ;; bench) - go test org/apache/htrace/... ${TAGS} -test.bench=. "$@" + go test htrace/... ${TAGS} -test.bench=. "$@" ;; *) - go ${ACTION} org/apache/htrace/... ${TAGS} "$@" + go ${ACTION} htrace/... ${TAGS} "$@" ;; esac http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/client/client.go b/htrace-htraced/go/src/htrace/client/client.go new file mode 100644 index 0000000..81b45d3 --- /dev/null +++ b/htrace-htraced/go/src/htrace/client/client.go @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package client + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "htrace/common" + "htrace/conf" + "io" + "io/ioutil" + "net/http" +) + +// A golang client for htraced. +// TODO: fancier APIs for streaming spans in the background, optimize TCP stuff +func NewClient(cnf *conf.Config, testHooks *TestHooks) (*Client, error) { + hcl := Client{testHooks: testHooks} + hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS) + if testHooks != nil && testHooks.HrpcDisabled { + hcl.hrpcAddr = "" + } else { + hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS) + } + return &hcl, nil +} + +type TestHooks struct { + // If true, HRPC is disabled. + HrpcDisabled bool + + // A function which gets called after we connect to the server and send the + // message frame, but before we write the message body. + HandleWriteRequestBody func() +} + +type Client struct { + // REST address of the htraced server. + restAddr string + + // HRPC address of the htraced server. + hrpcAddr string + + // The test hooks to use, or nil if test hooks are not enabled. + testHooks *TestHooks +} + +// Get the htraced server version information. +func (hcl *Client) GetServerVersion() (*common.ServerVersion, error) { + buf, _, err := hcl.makeGetRequest("server/info") + if err != nil { + return nil, err + } + var info common.ServerVersion + err = json.Unmarshal(buf, &info) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error: error unmarshalling response "+ + "body %s: %s", string(buf), err.Error())) + } + return &info, nil +} + +// Get the htraced server debug information. +func (hcl *Client) GetServerDebugInfo() (*common.ServerDebugInfo, error) { + buf, _, err := hcl.makeGetRequest("server/debugInfo") + if err != nil { + return nil, err + } + var debugInfo common.ServerDebugInfo + err = json.Unmarshal(buf, &debugInfo) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error: error unmarshalling response "+ + "body %s: %s", string(buf), err.Error())) + } + return &debugInfo, nil +} + +// Get the htraced server statistics. +func (hcl *Client) GetServerStats() (*common.ServerStats, error) { + buf, _, err := hcl.makeGetRequest("server/stats") + if err != nil { + return nil, err + } + var stats common.ServerStats + err = json.Unmarshal(buf, &stats) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error: error unmarshalling response "+ + "body %s: %s", string(buf), err.Error())) + } + return &stats, nil +} + +// Get the htraced server statistics. +func (hcl *Client) GetServerConf() (map[string]string, error) { + buf, _, err := hcl.makeGetRequest("server/conf") + if err != nil { + return nil, err + } + cnf := make(map[string]string) + err = json.Unmarshal(buf, &cnf) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error: error unmarshalling response "+ + "body %s: %s", string(buf), err.Error())) + } + return cnf, nil +} + +// Get information about a trace span. Returns nil, nil if the span was not found. +func (hcl *Client) FindSpan(sid common.SpanId) (*common.Span, error) { + buf, rc, err := hcl.makeGetRequest(fmt.Sprintf("span/%s", sid.String())) + if err != nil { + if rc == http.StatusNoContent { + return nil, nil + } + return nil, err + } + var span common.Span + err = json.Unmarshal(buf, &span) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error unmarshalling response "+ + "body %s: %s", string(buf), err.Error())) + } + return &span, nil +} + +func (hcl *Client) WriteSpans(spans []*common.Span) error { + if hcl.hrpcAddr == "" { + return hcl.writeSpansHttp(spans) + } + hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks) + if err != nil { + return err + } + defer hcr.Close() + return hcr.writeSpans(spans) +} + +func (hcl *Client) writeSpansHttp(spans []*common.Span) error { + req := common.WriteSpansReq{ + NumSpans: len(spans), + } + var w bytes.Buffer + enc := json.NewEncoder(&w) + err := enc.Encode(req) + if err != nil { + return errors.New(fmt.Sprintf("Error serializing WriteSpansReq: %s", + err.Error())) + } + for spanIdx := range spans { + err := enc.Encode(spans[spanIdx]) + if err != nil { + return errors.New(fmt.Sprintf("Error serializing span %d out "+ + "of %d: %s", spanIdx, len(spans), err.Error())) + } + } + _, _, err = hcl.makeRestRequest("POST", "writeSpans", &w) + if err != nil { + return err + } + return nil +} + +// Find the child IDs of a given span ID. +func (hcl *Client) FindChildren(sid common.SpanId, lim int) ([]common.SpanId, error) { + buf, _, err := hcl.makeGetRequest(fmt.Sprintf("span/%s/children?lim=%d", + sid.String(), lim)) + if err != nil { + return nil, err + } + var spanIds []common.SpanId + err = json.Unmarshal(buf, &spanIds) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error: error unmarshalling response "+ + "body %s: %s", string(buf), err.Error())) + } + return spanIds, nil +} + +// Make a query +func (hcl *Client) Query(query *common.Query) ([]common.Span, error) { + in, err := json.Marshal(query) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error marshalling query: %s", err.Error())) + } + var out []byte + var url = fmt.Sprintf("query?query=%s", in) + out, _, err = hcl.makeGetRequest(url) + if err != nil { + return nil, err + } + var spans []common.Span + err = json.Unmarshal(out, &spans) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error unmarshalling results: %s", err.Error())) + } + return spans, nil +} + +func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) { + return hcl.makeRestRequest("GET", reqName, nil) +} + +// Make a general JSON REST request. +// Returns the request body, the response code, and the error. +// Note: if the response code is non-zero, the error will also be non-zero. +func (hcl *Client) makeRestRequest(reqType string, reqName string, + reqBody io.Reader) ([]byte, int, error) { + url := fmt.Sprintf("http://%s/%s", + hcl.restAddr, reqName) + req, err := http.NewRequest(reqType, url, reqBody) + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, -1, errors.New(fmt.Sprintf("Error: error making http request to %s: %s\n", url, + err.Error())) + } + defer resp.Body.Close() + body, err2 := ioutil.ReadAll(resp.Body) + if err2 != nil { + return nil, -1, errors.New(fmt.Sprintf("Error: error reading response body: %s\n", err2.Error())) + } + if resp.StatusCode != http.StatusOK { + return nil, resp.StatusCode, + errors.New(fmt.Sprintf("Error: got bad response status from "+ + "%s: %s\n%s\n", url, resp.Status, body)) + } + return body, 0, nil +} + +// Dump all spans from the htraced daemon. +func (hcl *Client) DumpAll(lim int, out chan *common.Span) error { + defer func() { + close(out) + }() + searchId := common.INVALID_SPAN_ID + for { + q := common.Query{ + Lim: lim, + Predicates: []common.Predicate{ + common.Predicate{ + Op: "ge", + Field: "spanid", + Val: searchId.String(), + }, + }, + } + spans, err := hcl.Query(&q) + if err != nil { + return errors.New(fmt.Sprintf("Error querying spans with IDs at or after "+ + "%s: %s", searchId.String(), err.Error())) + } + if len(spans) == 0 { + return nil + } + for i := range spans { + out <- &spans[i] + } + searchId = spans[len(spans)-1].Id.Next() + } +} + +func (hcl *Client) Close() { + hcl.restAddr = "" + hcl.hrpcAddr = "" +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/client/hclient.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/client/hclient.go b/htrace-htraced/go/src/htrace/client/hclient.go new file mode 100644 index 0000000..a196f2d --- /dev/null +++ b/htrace-htraced/go/src/htrace/client/hclient.go @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package client + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "github.com/ugorji/go/codec" + "htrace/common" + "io" + "net" + "net/rpc" +) + +type hClient struct { + rpcClient *rpc.Client +} + +type HrpcClientCodec struct { + rwc io.ReadWriteCloser + length uint32 + testHooks *TestHooks +} + +func (cdc *HrpcClientCodec) WriteRequest(rr *rpc.Request, msg interface{}) error { + methodId := common.HrpcMethodNameToId(rr.ServiceMethod) + if methodId == common.METHOD_ID_NONE { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method name %s", + rr.ServiceMethod)) + } + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + w := bytes.NewBuffer(make([]byte, 0, 2048)) + + var err error + enc := codec.NewEncoder(w, mh) + if methodId == common.METHOD_ID_WRITE_SPANS { + spans := msg.([]*common.Span) + req := &common.WriteSpansReq{ + NumSpans: len(spans), + } + err = enc.Encode(req) + if err != nil { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+ + "message as msgpack: %s", err.Error())) + } + for spanIdx := range spans { + err = enc.Encode(spans[spanIdx]) + if err != nil { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+ + "span %d out of %d as msgpack: %s", spanIdx, len(spans), err.Error())) + } + } + } else { + err = enc.Encode(msg) + if err != nil { + return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+ + "message as msgpack: %s", err.Error())) + } + } + buf := w.Bytes() + if len(buf) > common.MAX_HRPC_BODY_LENGTH { + return errors.New(fmt.Sprintf("HrpcClientCodec: message body is %d "+ + "bytes, but the maximum message size is %d bytes.", + len(buf), common.MAX_HRPC_BODY_LENGTH)) + } + hdr := common.HrpcRequestHeader{ + Magic: common.HRPC_MAGIC, + MethodId: methodId, + Seq: rr.Seq, + Length: uint32(len(buf)), + } + err = binary.Write(cdc.rwc, binary.LittleEndian, &hdr) + if err != nil { + return errors.New(fmt.Sprintf("Error writing header bytes: %s", + err.Error())) + } + if cdc.testHooks != nil && cdc.testHooks.HandleWriteRequestBody != nil { + cdc.testHooks.HandleWriteRequestBody() + } + _, err = cdc.rwc.Write(buf) + if err != nil { + return errors.New(fmt.Sprintf("Error writing body bytes: %s", + err.Error())) + } + return nil +} + +func (cdc *HrpcClientCodec) ReadResponseHeader(resp *rpc.Response) error { + hdr := common.HrpcResponseHeader{} + err := binary.Read(cdc.rwc, binary.LittleEndian, &hdr) + if err != nil { + return errors.New(fmt.Sprintf("Error reading response header "+ + "bytes: %s", err.Error())) + } + resp.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId) + if resp.ServiceMethod == "" { + return errors.New(fmt.Sprintf("Error reading response header: "+ + "invalid method ID %d.", hdr.MethodId)) + } + resp.Seq = hdr.Seq + if hdr.ErrLength > 0 { + if hdr.ErrLength > common.MAX_HRPC_ERROR_LENGTH { + return errors.New(fmt.Sprintf("Error reading response header: "+ + "error message was %d bytes long, but "+ + "MAX_HRPC_ERROR_LENGTH is %d.", + hdr.ErrLength, common.MAX_HRPC_ERROR_LENGTH)) + } + buf := make([]byte, hdr.ErrLength) + var nread int + nread, err = cdc.rwc.Read(buf) + if uint32(nread) != hdr.ErrLength { + return errors.New(fmt.Sprintf("Error reading response header: "+ + "failed to read %d bytes of error message.", nread)) + } + if err != nil { + return errors.New(fmt.Sprintf("Error reading response header: "+ + "failed to read %d bytes of error message: %s", + nread, err.Error())) + } + resp.Error = string(buf) + } else { + resp.Error = "" + } + cdc.length = hdr.Length + return nil +} + +func (cdc *HrpcClientCodec) ReadResponseBody(body interface{}) error { + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + dec := codec.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)), mh) + err := dec.Decode(body) + if err != nil { + return errors.New(fmt.Sprintf("Failed to read response body: %s", + err.Error())) + } + return nil +} + +func (cdc *HrpcClientCodec) Close() error { + return cdc.rwc.Close() +} + +func newHClient(hrpcAddr string, testHooks *TestHooks) (*hClient, error) { + hcr := hClient{} + conn, err := net.Dial("tcp", hrpcAddr) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+ + "at %s: %s", hrpcAddr, err.Error())) + } + hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{ + rwc: conn, + testHooks: testHooks, + }) + return &hcr, nil +} + +func (hcr *hClient) writeSpans(spans []*common.Span) error { + resp := common.WriteSpansResp{} + return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, spans, &resp) +} + +func (hcr *hClient) Close() { + hcr.rpcClient.Close() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/log.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/log.go b/htrace-htraced/go/src/htrace/common/log.go new file mode 100644 index 0000000..16c94b4 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/log.go @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "errors" + "fmt" + "htrace/conf" + "log" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" +) + +// A logSink is a place logs can be written to. +type logSink struct { + path logPath + file *os.File + lock sync.Mutex + refCount int // protected by logFilesLock +} + +// Write to the logSink. +func (sink *logSink) write(str string) { + sink.lock.Lock() + defer sink.lock.Unlock() + _, err := sink.file.Write([]byte(str)) + if err != nil { + fmt.Fprintf(os.Stderr, "Error logging to '%s': %s\n", sink.path, err.Error()) + } +} + +// Unreference the logSink. If there are no more references, and the logSink is +// closeable, then we will close it here. +func (sink *logSink) Unref() { + logFilesLock.Lock() + defer logFilesLock.Unlock() + sink.refCount-- + if sink.refCount <= 0 { + if sink.path.IsCloseable() { + err := sink.file.Close() + if err != nil { + fmt.Fprintf(os.Stderr, "Error closing log file %s: %s\n", + sink.path, err.Error()) + } + } + logSinks[sink.path] = nil + } +} + +type logPath string + +// An empty LogPath represents "stdout." +const STDOUT_LOG_PATH = "" + +// Convert a path to a logPath. +func logPathFromString(path string) logPath { + if path == STDOUT_LOG_PATH { + return logPath("") + } + absPath, err := filepath.Abs(path) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to get absolute path of %s: %s\n", + path, err.Error()) + return logPath(path) + } + return logPath(absPath) +} + +// Convert the path to a human-readable string. +func (path logPath) String() string { + if path == "" { + return "(stdout)" + } else { + return string(path) + } +} + +// Return true if the path is closeable. stdout is not closeable. +func (path logPath) IsCloseable() bool { + return path != STDOUT_LOG_PATH +} + +func (path logPath) Open() *logSink { + if path == STDOUT_LOG_PATH { + return &logSink{path: path, file: os.Stdout} + } + file, err := os.OpenFile(string(path), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + sink := &logSink{path: STDOUT_LOG_PATH, file: os.Stdout} + fmt.Fprintf(os.Stderr, "Failed to open log file %s: %s\n", + path, err.Error()) + return sink + } + return &logSink{path: path, file: file} +} + +var logFilesLock sync.Mutex + +var logSinks map[logPath]*logSink = make(map[logPath]*logSink) + +func getOrCreateLogSink(pathStr string) *logSink { + path := logPathFromString(pathStr) + logFilesLock.Lock() + defer logFilesLock.Unlock() + sink := logSinks[path] + if sink == nil { + sink = path.Open() + logSinks[path] = sink + } + sink.refCount++ + return sink +} + +type Level int + +const ( + TRACE Level = iota + DEBUG + INFO + WARN + ERROR +) + +var levelToString map[Level]string = map[Level]string{ + TRACE: "TRACE", + DEBUG: "DEBUG", + INFO: "INFO", + WARN: "WARN", + ERROR: "ERROR", +} + +func (level Level) String() string { + return levelToString[level] +} + +func (level Level) LogString() string { + return level.String()[0:1] +} + +func LevelFromString(str string) (Level, error) { + for k, v := range levelToString { + if strings.ToLower(v) == strings.ToLower(str) { + return k, nil + } + } + var levelNames sort.StringSlice + levelNames = make([]string, len(levelToString)) + var i int + for _, v := range levelToString { + levelNames[i] = v + i++ + } + sort.Sort(levelNames) + return TRACE, errors.New(fmt.Sprintf("No such level as '%s'. Valid "+ + "levels are '%v'\n", str, levelNames)) +} + +type Logger struct { + sink *logSink + Level Level +} + +func NewLogger(faculty string, cnf *conf.Config) *Logger { + path, level := parseConf(faculty, cnf) + sink := getOrCreateLogSink(path) + return &Logger{sink: sink, Level: level} +} + +func parseConf(faculty string, cnf *conf.Config) (string, Level) { + facultyLogPathKey := faculty + "." + conf.HTRACE_LOG_PATH + var facultyLogPath string + if cnf.Contains(facultyLogPathKey) { + facultyLogPath = cnf.Get(facultyLogPathKey) + } else { + facultyLogPath = cnf.Get(conf.HTRACE_LOG_PATH) + } + facultyLogLevelKey := faculty + "." + conf.HTRACE_LOG_LEVEL + var facultyLogLevelStr string + if cnf.Contains(facultyLogLevelKey) { + facultyLogLevelStr = cnf.Get(facultyLogLevelKey) + } else { + facultyLogLevelStr = cnf.Get(conf.HTRACE_LOG_LEVEL) + } + level, err := LevelFromString(facultyLogLevelStr) + if err != nil { + fmt.Fprintf(os.Stderr, "Error configuring log level: %s. Using TRACE.\n") + level = TRACE + } + return facultyLogPath, level +} + +func (lg *Logger) Trace(str string) { + lg.Write(TRACE, str) +} + +func (lg *Logger) Tracef(format string, v ...interface{}) { + lg.Write(TRACE, fmt.Sprintf(format, v...)) +} + +func (lg *Logger) Debug(str string) { + lg.Write(DEBUG, str) +} + +func (lg *Logger) Debugf(format string, v ...interface{}) { + lg.Write(DEBUG, fmt.Sprintf(format, v...)) +} + +func (lg *Logger) Info(str string) { + lg.Write(INFO, str) +} + +func (lg *Logger) Infof(format string, v ...interface{}) { + lg.Write(INFO, fmt.Sprintf(format, v...)) +} + +func (lg *Logger) Warn(str string) error { + lg.Write(WARN, str) + return errors.New(str) +} + +func (lg *Logger) Warnf(format string, v ...interface{}) error { + str := fmt.Sprintf(format, v...) + lg.Write(WARN, str) + return errors.New(str) +} + +func (lg *Logger) Error(str string) error { + lg.Write(ERROR, str) + return errors.New(str) +} + +func (lg *Logger) Errorf(format string, v ...interface{}) error { + str := fmt.Sprintf(format, v...) + lg.Write(ERROR, str) + return errors.New(str) +} + +func (lg *Logger) Write(level Level, str string) { + if level >= lg.Level { + lg.sink.write(time.Now().UTC().Format(time.RFC3339) + " " + + level.LogString() + ": " + str) + } +} + +// +// A few functions which can be used to determine if a certain level of tracing +// is enabled. These are useful in situations when evaluating the parameters +// of a logging function is expensive. (Note, however, that we don't pay the +// cost of string concatenation and manipulation when a log message doesn't +// trigger.) +// + +func (lg *Logger) TraceEnabled() bool { + return lg.Level <= TRACE +} + +func (lg *Logger) DebugEnabled() bool { + return lg.Level <= DEBUG +} + +func (lg *Logger) InfoEnabled() bool { + return lg.Level <= INFO +} + +func (lg *Logger) WarnEnabled() bool { + return lg.Level <= WARN +} + +func (lg *Logger) ErrorEnabled() bool { + return lg.Level <= ERROR +} + +func (lg *Logger) LevelEnabled(level Level) bool { + return lg.Level <= level +} + +func (lg *Logger) Close() { + lg.sink.Unref() + lg.sink = nil +} + +// Wraps an htrace logger in a golang standard logger. +// +// This is a bit messy because of the difference in interfaces between the +// golang standard logger and the htrace logger. The golang standard logger +// doesn't support log levels directly, so you must choose up front what htrace +// log level all messages should be treated as. Golang standard loggers expect +// to be able to write to an io.Writer, but make no guarantees about whether +// they will break messages into multiple Write() calls (although this does +// not seem to be a major problem in practice.) +// +// Despite these limitations, it's still useful to have this method to be able +// to log things that come out of the go HTTP server and other standard library +// systems. +type WrappedLogger struct { + lg *Logger + level Level +} + +func (lg *Logger) Wrap(prefix string, level Level) *log.Logger { + wlg := &WrappedLogger{ + lg: lg, + level: level, + } + return log.New(wlg, prefix, 0) +} + +func (wlg *WrappedLogger) Write(p []byte) (int, error) { + wlg.lg.Write(wlg.level, string(p)) + return len(p), nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/log_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/log_test.go b/htrace-htraced/go/src/htrace/common/log_test.go new file mode 100644 index 0000000..adb6a57 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/log_test.go @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "bufio" + "fmt" + "htrace/conf" + "io" + "io/ioutil" + "os" + "strings" + "testing" +) + +func newLogger(faculty string, args ...string) *Logger { + cnfBld := conf.Builder{Defaults: conf.DEFAULTS} + cnf, err := cnfBld.Build() + if err != nil { + panic(fmt.Sprintf("failed to create conf: %s", err.Error())) + } + cnf2 := cnf.Clone(args...) + lg := NewLogger(faculty, cnf2) + return lg +} + +func TestNewLogger(t *testing.T) { + lg := newLogger("foo", "log.level", "TRACE") + lg.Close() +} + +func verifyLines(t *testing.T, rdr io.Reader, lines []string) { + scanner := bufio.NewScanner(rdr) + lineIdx := 0 + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, lines[lineIdx]) { + t.Fatalf("Error on line %d: didn't find substring '%s' in line '%s'\n", + (lineIdx + 1), lines[lineIdx], line) + } + lineIdx++ + } + if err := scanner.Err(); err != nil { + t.Fatal(err.Error()) + } +} + +func TestFileLogs(t *testing.T) { + tempDir, err := ioutil.TempDir(os.TempDir(), "testFileLogs") + if err != nil { + panic(fmt.Sprintf("error creating tempdir: %s\n", err.Error())) + } + defer os.RemoveAll(tempDir) + logPath := tempDir + conf.PATH_SEP + "log" + lg := newLogger("foo", "log.level", "DEBUG", + "foo.log.level", "INFO", + "log.path", logPath) + lg.Tracef("Non-important stuff, ignore this.\n") + lg.Infof("problem with the foobar\n") + lg.Tracef("More non-important stuff, also ignore this.\n") + lg.Infof("and another problem with the foobar\n") + logFile, err := os.Open(logPath) + if err != nil { + t.Fatalf("failed to open file %s: %s\n", logPath, err.Error()) + } + verifyLines(t, logFile, []string{ + "problem with the foobar", + "and another problem with the foobar", + }) + logFile.Close() + lg.Close() +} + +func TestMultipleFileLogs(t *testing.T) { + tempDir, err := ioutil.TempDir(os.TempDir(), "testMultipleFileLogs") + if err != nil { + panic(fmt.Sprintf("error creating tempdir: %s\n", err.Error())) + } + defer os.RemoveAll(tempDir) + logPath := tempDir + conf.PATH_SEP + "log" + fooLg := newLogger("foo", "log.level", "DEBUG", + "foo.log.level", "INFO", + "log.path", logPath) + fooLg.Infof("The foo needs maintenance.\n") + barLg := newLogger("bar", "log.level", "DEBUG", + "foo.log.level", "INFO", + "log.path", logPath) + barLg.Debugf("The bar is open\n") + fooLg.Errorf("Fizz buzz\n") + logFile, err := os.Open(logPath) + if err != nil { + t.Fatalf("failed to open file %s: %s\n", logPath, err.Error()) + } + fooLg.Tracef("Fizz buzz2\n") + barLg.Tracef("Fizz buzz3\n") + verifyLines(t, logFile, []string{ + "The foo needs maintenance.", + "The bar is open", + "Fizz buzz", + "Fizz buzz3", + }) + logFile.Close() + fooLg.Close() + barLg.Close() +} + +func TestLogLevelEnabled(t *testing.T) { + tempDir, err := ioutil.TempDir(os.TempDir(), "TestLogLevelEnabled") + if err != nil { + panic(fmt.Sprintf("error creating tempdir: %s\n", err.Error())) + } + defer os.RemoveAll(tempDir) + // set log level to DEBUG for facility "foo" + logPath := tempDir + conf.PATH_SEP + "log" + lg := newLogger("foo", "log.level", "DEBUG", + "foo.log.level", "INFO", + "log.path", logPath) + if lg.TraceEnabled() { + t.Fatalf("foo logger has TraceEnabled") + } + if lg.DebugEnabled() { + t.Fatalf("foo logger have DebugEnabled") + } + if !lg.InfoEnabled() { + t.Fatalf("foo logger does not have InfoEnabled") + } + if !lg.WarnEnabled() { + t.Fatalf("foo logger does not have WarnEnabled") + } + if !lg.ErrorEnabled() { + t.Fatalf("foo logger does not have ErrorEnabled") + } + lg.Close() + lg = newLogger("foo", "log.level", "WARN", + "foo.log.level", "INFO", + "log.path", logPath) + if lg.TraceEnabled() { + t.Fatalf("foo logger has TraceEnabled") + } + if lg.DebugEnabled() { + t.Fatalf("foo logger has DebugEnabled") + } + if !lg.InfoEnabled() { + t.Fatalf("foo logger does not have InfoEnabled") + } + if !lg.WarnEnabled() { + t.Fatalf("foo logger does not have WarnEnabled") + } + if !lg.ErrorEnabled() { + t.Fatalf("foo logger does not have ErrorEnabled") + } + lg.Close() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/process.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/process.go b/htrace-htraced/go/src/htrace/common/process.go new file mode 100644 index 0000000..8e2a415 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/process.go @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "bytes" + "fmt" + "htrace/conf" + "os" + "os/signal" + "runtime" + "runtime/debug" + "syscall" +) + +func InstallSignalHandlers(cnf *conf.Config) { + fatalSigs := []os.Signal{ + os.Interrupt, + os.Kill, + syscall.SIGINT, + syscall.SIGABRT, + syscall.SIGALRM, + syscall.SIGBUS, + syscall.SIGFPE, + syscall.SIGILL, + syscall.SIGSEGV, + syscall.SIGTERM, + } + fatalSigChan := make(chan os.Signal, 1) + signal.Notify(fatalSigChan, fatalSigs...) + lg := NewLogger("signal", cnf) + go func() { + sig := <-fatalSigChan + lg.Errorf("Terminating on signal: %v\n", sig) + lg.Close() + os.Exit(1) + }() + + sigQuitChan := make(chan os.Signal, 1) + signal.Notify(sigQuitChan, syscall.SIGQUIT) + go func() { + stackTraceBuf := make([]byte, 1<<20) + for { + <-sigQuitChan + GetStackTraces(&stackTraceBuf) + lg.Info("=== received SIGQUIT ===\n") + lg.Info("=== GOROUTINE STACKS ===\n") + lg.Info(string(stackTraceBuf)) + lg.Info("\n=== END GOROUTINE STACKS ===\n") + lg.Info("=== GC STATISTICS ===\n") + lg.Info(GetGCStats()) + lg.Info("=== END GC STATISTICS ===\n") + } + }() +} + +func GetStackTraces(buf *[]byte) { + *buf = (*buf)[0:cap(*buf)] + neededBytes := runtime.Stack(*buf, true) + for neededBytes > len(*buf) { + *buf = make([]byte, neededBytes) + runtime.Stack(*buf, true) + } + *buf = (*buf)[0:neededBytes] +} + +func GetGCStats() string { + gcs := debug.GCStats{} + debug.ReadGCStats(&gcs) + var buf bytes.Buffer + buf.WriteString(fmt.Sprintf("LastGC: %s\n", gcs.LastGC.UTC().String())) + buf.WriteString(fmt.Sprintf("NumGC: %d\n", gcs.NumGC)) + buf.WriteString(fmt.Sprintf("PauseTotal: %v\n", gcs.PauseTotal)) + if gcs.Pause != nil { + pauseStr := "" + prefix := "" + for p := range gcs.Pause { + pauseStr += prefix + gcs.Pause[p].String() + prefix = ", " + } + buf.WriteString(fmt.Sprintf("Pause History: %s\n", pauseStr)) + } + return buf.String() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/process_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/process_test.go b/htrace-htraced/go/src/htrace/common/process_test.go new file mode 100644 index 0000000..cbbf613 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/process_test.go @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "bufio" + "fmt" + "htrace/conf" + "os" + "os/exec" + "strings" + "syscall" + "testing" + "time" +) + +const HTRACED_TEST_HELPER_PROCESS = "HTRACED_TEST_HELPER_PROCESS" + +// This test runs a helper process which will install our htraced signal +// handlers. We will send signals to the subprocess and verify that it has +// caught them and responded appropriately. +func TestSignals(t *testing.T) { + if os.Getenv(HTRACED_TEST_HELPER_PROCESS) == "1" { + runHelperProcess() + os.Exit(0) + } + helper := exec.Command(os.Args[0], "-test.run=TestSignals", "--") + helper.Env = []string{HTRACED_TEST_HELPER_PROCESS + "=1"} + stdoutPipe, err := helper.StdoutPipe() + if err != nil { + panic(fmt.Sprintf("Failed to open pipe to process stdout: %s", + err.Error())) + } + stderrPipe, err := helper.StderrPipe() + if err != nil { + panic(fmt.Sprintf("Failed to open pipe to process stderr: %s", + err.Error())) + } + err = helper.Start() + if err != nil { + t.Fatal("Failed to start command %s: %s\n", os.Args[0], err.Error()) + } + t.Logf("Started suprocess...\n") + done := make(chan interface{}) + go func() { + scanner := bufio.NewScanner(stdoutPipe) + for scanner.Scan() { + text := scanner.Text() + if strings.Contains(text, "=== GOROUTINE STACKS ===") { + break + } + } + t.Logf("Saw 'GOROUTINE STACKS on stdout.' Sending SIGINT.\n") + helper.Process.Signal(syscall.SIGINT) + for scanner.Scan() { + text := scanner.Text() + if strings.Contains(text, "Terminating on signal: SIGINT") { + break + } + } + t.Logf("Saw 'Terminating on signal: SIGINT'. " + + "Helper goroutine exiting.\n") + done <- nil + }() + scanner := bufio.NewScanner(stderrPipe) + for scanner.Scan() { + text := scanner.Text() + if strings.Contains(text, "Signal handler installed.") { + break + } + } + t.Logf("Saw 'Signal handler installed.' Sending SIGINT.") + helper.Process.Signal(syscall.SIGQUIT) + t.Logf("Waiting for helper goroutine to exit.\n") + <-done + t.Logf("Waiting for subprocess to exit.\n") + helper.Wait() + t.Logf("Done.") +} + +// Run the helper process which TestSignals spawns. +func runHelperProcess() { + cnfMap := map[string]string{ + conf.HTRACE_LOG_LEVEL: "TRACE", + conf.HTRACE_LOG_PATH: "", // log to stdout + } + cnfBld := conf.Builder{Values: cnfMap, Defaults: conf.DEFAULTS} + cnf, err := cnfBld.Build() + if err != nil { + fmt.Printf("Error building configuration: %s\n", err.Error()) + os.Exit(1) + } + InstallSignalHandlers(cnf) + fmt.Fprintf(os.Stderr, "Signal handler installed.\n") + // Wait for a signal to be delivered + for { + time.Sleep(time.Hour * 100) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/query.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/query.go b/htrace-htraced/go/src/htrace/common/query.go new file mode 100644 index 0000000..7a9e523 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/query.go @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "encoding/json" +) + +// +// Represents queries that can be sent to htraced. +// +// Each query consists of set of predicates that will be 'AND'ed together to +// return a set of spans. Predicates contain an operation, a field, and a +// value. +// +// For example, a query might be "return the first 100 spans between 5:00pm +// and 5:01pm" This query would have two predicates: time greater than or +// equal to 5:00pm, and time less than or equal to 5:01pm. +// In HTrace, times are always expressed in milliseconds since the Epoch. +// So this would become: +// { "lim" : 100, "pred" : [ +// { "op" : "ge", "field" : "begin", "val" : 1234 }, +// { "op" : "le", "field" : "begin", "val" : 5678 }, +// ] } +// +// Where '1234' and '5678' were replaced by times since the epoch in +// milliseconds. +// + +type Op string + +const ( + CONTAINS Op = "cn" + EQUALS Op = "eq" + LESS_THAN_OR_EQUALS Op = "le" + GREATER_THAN_OR_EQUALS Op = "ge" + GREATER_THAN Op = "gt" +) + +func (op Op) IsDescending() bool { + return op == LESS_THAN_OR_EQUALS +} + +func (op Op) IsValid() bool { + ops := ValidOps() + for i := range ops { + if ops[i] == op { + return true + } + } + return false +} + +func ValidOps() []Op { + return []Op{CONTAINS, EQUALS, LESS_THAN_OR_EQUALS, GREATER_THAN_OR_EQUALS, + GREATER_THAN} +} + +type Field string + +const ( + SPAN_ID Field = "spanid" + DESCRIPTION Field = "description" + BEGIN_TIME Field = "begin" + END_TIME Field = "end" + DURATION Field = "duration" + TRACER_ID Field = "tracerid" +) + +func (field Field) IsValid() bool { + fields := ValidFields() + for i := range fields { + if fields[i] == field { + return true + } + } + return false +} + +func ValidFields() []Field { + return []Field{SPAN_ID, DESCRIPTION, BEGIN_TIME, END_TIME, + DURATION, TRACER_ID} +} + +type Predicate struct { + Op Op `json:"op"` + Field Field `json:"field"` + Val string `val:"val"` +} + +func (pred *Predicate) String() string { + buf, err := json.Marshal(pred) + if err != nil { + panic(err) + } + return string(buf) +} + +type Query struct { + Predicates []Predicate `json:"pred"` + Lim int `json:"lim"` + Prev *Span `json:"prev"` +} + +func (query *Query) String() string { + buf, err := json.Marshal(query) + if err != nil { + panic(err) + } + return string(buf) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/query_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/query_test.go b/htrace-htraced/go/src/htrace/common/query_test.go new file mode 100644 index 0000000..2697d9c --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/query_test.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "testing" +) + +func TestValidOps(t *testing.T) { + for i := range ValidOps() { + op := ValidOps()[i] + if !op.IsValid() { + t.Fatalf("op %s was in ValidOps, but IsValid returned false.\n", op) + } + } + invalidOp := Op("completelybogus") + if invalidOp.IsValid() { + t.Fatalf("op %s was invalid, but IsValid returned true.\n", invalidOp) + } +} + +func TestValidFields(t *testing.T) { + for i := range ValidFields() { + field := ValidFields()[i] + if !field.IsValid() { + t.Fatalf("field %s was in ValidFields, but IsValid returned false.\n", field) + } + } + invalidField := Field("completelybogus") + if invalidField.IsValid() { + t.Fatalf("field %s was invalid, but IsValid returned true.\n", invalidField) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/rpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/rpc.go b/htrace-htraced/go/src/htrace/common/rpc.go new file mode 100644 index 0000000..5f02db6 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/rpc.go @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +// The 4-byte magic number which is sent first in the HRPC header +const HRPC_MAGIC = 0x43525448 + +// Method ID codes. Do not reorder these. +const ( + METHOD_ID_NONE = 0 + METHOD_ID_WRITE_SPANS = iota +) + +const METHOD_NAME_WRITE_SPANS = "HrpcHandler.WriteSpans" + +// Maximum length of the error message passed in an HRPC response +const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024 + +// Maximum length of HRPC message body +const MAX_HRPC_BODY_LENGTH = 32 * 1024 * 1024 + +// A request to write spans to htraced. +// This request is followed by a sequence of spans. +type WriteSpansReq struct { + DefaultTrid string `json:",omitempty"` + NumSpans int +} + +// Info returned by /server/version +type ServerVersion struct { + // The server release version. + ReleaseVersion string + + // The git hash that this software was built with. + GitVersion string +} + +// A response to a WriteSpansReq +type WriteSpansResp struct { +} + +// The header which is sent over the wire for HRPC +type HrpcRequestHeader struct { + Magic uint32 + MethodId uint32 + Seq uint64 + Length uint32 +} + +// The response which is sent over the wire for HRPC +type HrpcResponseHeader struct { + Seq uint64 + MethodId uint32 + ErrLength uint32 + Length uint32 +} + +func HrpcMethodIdToMethodName(id uint32) string { + switch id { + case METHOD_ID_WRITE_SPANS: + return METHOD_NAME_WRITE_SPANS + default: + return "" + } +} + +func HrpcMethodNameToId(name string) uint32 { + switch name { + case METHOD_NAME_WRITE_SPANS: + return METHOD_ID_WRITE_SPANS + default: + return METHOD_ID_NONE + } +} + +type SpanMetrics struct { + // The total number of spans written to HTraced. + Written uint64 + + // The total number of spans dropped by the server. + ServerDropped uint64 +} + +// A map from network address strings to SpanMetrics structures. +type SpanMetricsMap map[string]*SpanMetrics + +// Info returned by /server/stats +type ServerStats struct { + // Statistics for each shard (directory) + Dirs []StorageDirectoryStats + + // Per-host Span Metrics + HostSpanMetrics SpanMetricsMap + + // The time (in UTC milliseconds since the epoch) when the + // datastore was last started. + LastStartMs int64 + + // The current time (in UTC milliseconds since the epoch) on the server. + CurMs int64 + + // The total number of spans which have been reaped. + ReapedSpans uint64 + + // The total number of spans which have been ingested since the server started, by WriteSpans + // requests. This number counts spans that didn't get written to persistent storage as well as + // those that did. + IngestedSpans uint64 + + // The total number of spans which have been written to leveldb since the server started. + WrittenSpans uint64 + + // The total number of spans dropped by the server since the server started. + ServerDroppedSpans uint64 + + // The maximum latency of a writeSpans request, in milliseconds. + MaxWriteSpansLatencyMs uint32 + + // The average latency of a writeSpans request, in milliseconds. + AverageWriteSpansLatencyMs uint32 +} + +type StorageDirectoryStats struct { + Path string + + // The approximate number of bytes on disk present in this shard. + ApproximateBytes uint64 + + // leveldb.stats information + LevelDbStats string +} + +type ServerDebugInfoReq struct { +} + +type ServerDebugInfo struct { + // Stack traces from all goroutines + StackTraces string + + // Garbage collection statistics + GCStats string +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/semaphore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/semaphore.go b/htrace-htraced/go/src/htrace/common/semaphore.go new file mode 100644 index 0000000..1acde76 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/semaphore.go @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "sync" +) + +// A simple lock-and-condition-variable based semaphore implementation. +type Semaphore struct { + lock sync.Mutex + cond *sync.Cond + count int64 +} + +func NewSemaphore(count int64) *Semaphore { + sem := &Semaphore{ + count: int64(count), + } + sem.cond = &sync.Cond{ + L: &sem.lock, + } + return sem +} + +func (sem *Semaphore) Post() { + sem.lock.Lock() + sem.count++ + if sem.count > 0 { + sem.cond.Broadcast() + } + sem.lock.Unlock() +} + +func (sem *Semaphore) Posts(amt int64) { + sem.lock.Lock() + sem.count += amt + if sem.count > 0 { + sem.cond.Broadcast() + } + sem.lock.Unlock() +} + +func (sem *Semaphore) Wait() { + sem.lock.Lock() + for { + if sem.count > 0 { + sem.count-- + sem.lock.Unlock() + return + } + sem.cond.Wait() + } +} + +func (sem *Semaphore) Waits(amt int64) { + var i int64 + for i = 0; i < amt; i++ { + sem.Wait() + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/semaphore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/semaphore_test.go b/htrace-htraced/go/src/htrace/common/semaphore_test.go new file mode 100644 index 0000000..089c51b --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/semaphore_test.go @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "sync/atomic" + "testing" + "time" +) + +func TestSemaphoreWake(t *testing.T) { + var done uint32 + sem := NewSemaphore(0) + go func() { + time.Sleep(10 * time.Nanosecond) + atomic.AddUint32(&done, 1) + sem.Post() + }() + sem.Wait() + doneVal := atomic.LoadUint32(&done) + if doneVal != 1 { + t.Fatalf("sem.Wait did not wait for sem.Post") + } +} + +func TestSemaphoreCount(t *testing.T) { + sem := NewSemaphore(1) + sem.Post() + sem.Wait() + sem.Wait() + + sem = NewSemaphore(-1) + sem.Post() + sem.Post() + sem.Wait() +} + +func TestSemaphoreMultipleGoroutines(t *testing.T) { + var done uint32 + sem := NewSemaphore(0) + sem2 := NewSemaphore(0) + go func() { + sem.Wait() + atomic.AddUint32(&done, 1) + sem2.Post() + }() + go func() { + time.Sleep(10 * time.Nanosecond) + atomic.AddUint32(&done, 1) + sem.Post() + }() + go func() { + time.Sleep(20 * time.Nanosecond) + atomic.AddUint32(&done, 1) + sem.Post() + }() + sem.Wait() + go func() { + time.Sleep(10 * time.Nanosecond) + atomic.AddUint32(&done, 1) + sem.Post() + }() + sem.Wait() + sem2.Wait() + doneVal := atomic.LoadUint32(&done) + if doneVal != 4 { + t.Fatalf("sem.Wait did not wait for sem.Posts") + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/span.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/span.go b/htrace-htraced/go/src/htrace/common/span.go new file mode 100644 index 0000000..1716c5a --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/span.go @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "hash/fnv" +) + +// +// Represents a trace span. +// +// Compatibility notes: +// When converting to JSON, we store the 64-bit numbers as hexadecimal strings rather than as +// integers. This is because JavaScript lacks the ability to handle 64-bit integers. Numbers above +// about 55 bits will be rounded by Javascript. Since the Javascript UI is a primary consumer of +// this JSON data, we have to simply pass it as a string. +// + +type TraceInfoMap map[string]string + +type TimelineAnnotation struct { + Time int64 `json:"t"` + Msg string `json:"m"` +} + +type SpanId []byte + +var INVALID_SPAN_ID SpanId = make([]byte, 16) // all zeroes + +func (id SpanId) String() string { + return fmt.Sprintf("%02x%02x%02x%02x"+ + "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", + id[0], id[1], id[2], id[3], id[4], id[5], id[6], id[7], id[8], + id[9], id[10], id[11], id[12], id[13], id[14], id[15]) +} + +func (id SpanId) Val() []byte { + return []byte(id) +} + +func (id SpanId) FindProblem() string { + if id == nil { + return "The span ID is nil" + } + if len(id) != 16 { + return "The span ID is not exactly 16 bytes." + } + if bytes.Equal(id.Val(), INVALID_SPAN_ID.Val()) { + return "The span ID is all zeros." + } + return "" +} + +func (id SpanId) ToArray() [16]byte { + var ret [16]byte + copy(ret[:], id.Val()[:]) + return ret +} + +// Return the next ID in lexicographical order. For the maximum ID, +// returns the minimum. +func (id SpanId) Next() SpanId { + next := make([]byte, 16) + copy(next, id) + for i := len(next) - 1; i >= 0; i-- { + if next[i] == 0xff { + next[i] = 0 + } else { + next[i] = next[i] + 1 + break + } + } + return next +} + +// Return the previous ID in lexicographical order. For the minimum ID, +// returns the maximum ID. +func (id SpanId) Prev() SpanId { + prev := make([]byte, 16) + copy(prev, id) + for i := len(prev) - 1; i >= 0; i-- { + if prev[i] == 0x00 { + prev[i] = 0xff + } else { + prev[i] = prev[i] - 1 + break + } + } + return prev +} + +func (id SpanId) MarshalJSON() ([]byte, error) { + return []byte(`"` + id.String() + `"`), nil +} + +func (id SpanId) Compare(other SpanId) int { + return bytes.Compare(id.Val(), other.Val()) +} + +func (id SpanId) Equal(other SpanId) bool { + return bytes.Equal(id.Val(), other.Val()) +} + +func (id SpanId) Hash32() uint32 { + h := fnv.New32a() + h.Write(id.Val()) + return h.Sum32() +} + +type SpanSlice []*Span + +func (s SpanSlice) Len() int { + return len(s) +} + +func (s SpanSlice) Less(i, j int) bool { + return s[i].Id.Compare(s[j].Id) < 0 +} + +func (s SpanSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +type SpanIdSlice []SpanId + +func (s SpanIdSlice) Len() int { + return len(s) +} + +func (s SpanIdSlice) Less(i, j int) bool { + return s[i].Compare(s[j]) < 0 +} + +func (s SpanIdSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +const DOUBLE_QUOTE = 0x22 + +func (id *SpanId) UnmarshalJSON(b []byte) error { + if b[0] != DOUBLE_QUOTE { + return errors.New("Expected spanID to start with a string quote.") + } + if b[len(b)-1] != DOUBLE_QUOTE { + return errors.New("Expected spanID to end with a string quote.") + } + return id.FromString(string(b[1 : len(b)-1])) +} + +func (id *SpanId) FromString(str string) error { + i := SpanId(make([]byte, 16)) + n, err := fmt.Sscanf(str, "%02x%02x%02x%02x"+ + "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", + &i[0], &i[1], &i[2], &i[3], &i[4], &i[5], &i[6], &i[7], &i[8], + &i[9], &i[10], &i[11], &i[12], &i[13], &i[14], &i[15]) + if err != nil { + return err + } + if n != 16 { + return errors.New("Failed to find 16 hex digits in the SpanId") + } + *id = i + return nil +} + +type SpanData struct { + Begin int64 `json:"b"` + End int64 `json:"e"` + Description string `json:"d"` + Parents []SpanId `json:"p"` + Info TraceInfoMap `json:"n,omitempty"` + TracerId string `json:"r"` + TimelineAnnotations []TimelineAnnotation `json:"t,omitempty"` +} + +type Span struct { + Id SpanId `json:"a"` + SpanData +} + +func (span *Span) ToJson() []byte { + jbytes, err := json.Marshal(*span) + if err != nil { + panic(err) + } + return jbytes +} + +func (span *Span) String() string { + return string(span.ToJson()) +} + +// Compute the span duration. We ignore overflow since we never deal with negative times. +func (span *Span) Duration() int64 { + return span.End - span.Begin +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/span_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/span_test.go b/htrace-htraced/go/src/htrace/common/span_test.go new file mode 100644 index 0000000..7fb128d --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/span_test.go @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "bytes" + "encoding/hex" + "fmt" + "github.com/ugorji/go/codec" + "testing" +) + +func TestSpanToJson(t *testing.T) { + t.Parallel() + span := Span{Id: TestId("33f25a1a750a471db5bafa59309d7d6f"), + SpanData: SpanData{ + Begin: 123, + End: 456, + Description: "getFileDescriptors", + Parents: []SpanId{}, + TracerId: "testTracerId", + }} + ExpectStrEqual(t, + `{"a":"33f25a1a750a471db5bafa59309d7d6f","b":123,"e":456,"d":"getFileDescriptors","p":[],"r":"testTracerId"}`, + string(span.ToJson())) +} + +func TestAnnotatedSpanToJson(t *testing.T) { + t.Parallel() + span := Span{Id: TestId("11eace42e6404b40a7644214cb779a08"), + SpanData: SpanData{ + Begin: 1234, + End: 4567, + Description: "getFileDescriptors2", + Parents: []SpanId{}, + TracerId: "testAnnotatedTracerId", + TimelineAnnotations: []TimelineAnnotation{ + TimelineAnnotation{ + Time: 7777, + Msg: "contactedServer", + }, + TimelineAnnotation{ + Time: 8888, + Msg: "passedFd", + }, + }, + }} + ExpectStrEqual(t, + `{"a":"11eace42e6404b40a7644214cb779a08","b":1234,"e":4567,"d":"getFileDescriptors2","p":[],"r":"testAnnotatedTracerId","t":[{"t":7777,"m":"contactedServer"},{"t":8888,"m":"passedFd"}]}`, + string(span.ToJson())) +} + +func TestSpanNext(t *testing.T) { + ExpectStrEqual(t, TestId("00000000000000000000000000000001").String(), + TestId("00000000000000000000000000000000").Next().String()) + ExpectStrEqual(t, TestId("00000000000000000000000000f00000").String(), + TestId("00000000000000000000000000efffff").Next().String()) + ExpectStrEqual(t, TestId("00000000000000000000000000000000").String(), + TestId("ffffffffffffffffffffffffffffffff").Next().String()) +} + +func TestSpanPrev(t *testing.T) { + ExpectStrEqual(t, TestId("00000000000000000000000000000000").String(), + TestId("00000000000000000000000000000001").Prev().String()) + ExpectStrEqual(t, TestId("00000000000000000000000000efffff").String(), + TestId("00000000000000000000000000f00000").Prev().String()) + ExpectStrEqual(t, TestId("ffffffffffffffffffffffffffffffff").String(), + TestId("00000000000000000000000000000000").Prev().String()) +} + +func TestSpanMsgPack(t *testing.T) { + span := Span{Id: TestId("33f25a1a750a471db5bafa59309d7d6f"), + SpanData: SpanData{ + Begin: 1234, + End: 5678, + Description: "getFileDescriptors", + Parents: []SpanId{}, + TracerId: "testTracerId", + }} + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + w := bytes.NewBuffer(make([]byte, 0, 2048)) + enc := codec.NewEncoder(w, mh) + err := enc.Encode(span) + if err != nil { + t.Fatal("Error encoding span as msgpack: " + err.Error()) + } + buf := w.Bytes() + fmt.Printf("span: %s\n", hex.EncodeToString(buf)) + mh = new(codec.MsgpackHandle) + mh.WriteExt = true + dec := codec.NewDecoder(bytes.NewReader(buf), mh) + var span2 Span + err = dec.Decode(&span2) + if err != nil { + t.Fatal("Failed to reverse msgpack encoding for " + span.String()) + } + ExpectSpansEqual(t, &span, &span2) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/test_util.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/test_util.go b/htrace-htraced/go/src/htrace/common/test_util.go new file mode 100644 index 0000000..740354c --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/test_util.go @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "fmt" + "strings" + "testing" + "time" +) + +type Int64Slice []int64 + +func (p Int64Slice) Len() int { return len(p) } +func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +type SupplierFun func() bool + +// +// Wait for a configurable amount of time for a precondition to become true. +// +// Example: +// WaitFor(time.Minute * 1, time.Millisecond * 1, func() bool { +// return ht.Store.GetStatistics().NumSpansWritten >= 3 +// }) +// +func WaitFor(dur time.Duration, poll time.Duration, fun SupplierFun) { + if poll == 0 { + poll = dur / 10 + } + if poll <= 0 { + panic("Can't have a polling time less than zero.") + } + endTime := time.Now().Add(dur) + for { + if fun() { + return + } + if !time.Now().Before(endTime) { + break + } + time.Sleep(poll) + } + panic(fmt.Sprintf("Timed out after %s", dur)) +} + +// Trigger a test failure if two strings are not equal. +func ExpectStrEqual(t *testing.T, expect string, actual string) { + if expect != actual { + t.Fatalf("Expected:\n%s\nGot:\n%s\n", expect, actual) + } +} + +// Trigger a test failure if the JSON representation of two spans are not equals. +func ExpectSpansEqual(t *testing.T, spanA *Span, spanB *Span) { + ExpectStrEqual(t, string(spanA.ToJson()), string(spanB.ToJson())) +} + +func TestId(str string) SpanId { + var spanId SpanId + err := spanId.FromString(str) + if err != nil { + panic(err.Error()) + } + return spanId +} + +func AssertErrContains(t *testing.T, err error, str string) { + if !strings.Contains(err.Error(), str) { + t.Fatalf("expected the error to contain %s, but it was %s\n", + str, err.Error()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/time.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/time.go b/htrace-htraced/go/src/htrace/common/time.go new file mode 100644 index 0000000..8b4b6b8 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/time.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "time" +) + +func TimeToUnixMs(t time.Time) int64 { + return t.UnixNano() / 1000000 +} + +func UnixMsToTime(u int64) time.Time { + secs := u / 1000 + nanos := u - (secs * 1000) + return time.Unix(secs, nanos) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/common/time_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/common/time_test.go b/htrace-htraced/go/src/htrace/common/time_test.go new file mode 100644 index 0000000..11e2733 --- /dev/null +++ b/htrace-htraced/go/src/htrace/common/time_test.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "testing" +) + +func testRoundTrip(t *testing.T, u int64) { + tme := UnixMsToTime(u) + u2 := TimeToUnixMs(tme) + if u2 != u { + t.Fatalf("Error taking %d on a round trip: came back as "+ + "%d instead.\n", u, u2) + } +} + +func TestTimeConversions(t *testing.T) { + testRoundTrip(t, 0) + testRoundTrip(t, 1445540632000) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/conf/config.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/conf/config.go b/htrace-htraced/go/src/htrace/conf/config.go new file mode 100644 index 0000000..24170b2 --- /dev/null +++ b/htrace-htraced/go/src/htrace/conf/config.go @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "bufio" + "bytes" + "fmt" + "io" + "log" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "syscall" +) + +// +// The configuration code for HTraced. +// +// HTraced can be configured via Hadoop-style XML configuration files, or by passing -Dkey=value +// command line arguments. Command-line arguments without an equals sign, such as "-Dkey", will be +// treated as setting the key to "true". +// +// Configuration key constants should be defined in config_keys.go. Each key should have a default, +// which will be used if the user supplies no value, or supplies an invalid value. +// For that reason, it is not necessary for the Get, GetInt, etc. functions to take a default value +// argument. +// +// Configuration objects are immutable. However, you can make a copy of a configuration which adds +// some changes using Configuration#Clone(). +// + +type Config struct { + settings map[string]string + defaults map[string]string +} + +type Builder struct { + // If non-nil, the XML configuration file to read. + Reader io.Reader + + // If non-nil, the configuration values to use. + Values map[string]string + + // If non-nil, the default configuration values to use. + Defaults map[string]string + + // If non-nil, the command-line arguments to use. + Argv []string + + // The name of the application. Configuration keys that start with this + // string will be converted to their unprefixed forms. + AppPrefix string +} + +func getDefaultHTracedConfDir() string { + return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf" +} + +func getHTracedConfDirs(dlog io.Writer) []string { + confDir := os.Getenv("HTRACED_CONF_DIR") + paths := filepath.SplitList(confDir) + if len(paths) < 1 { + def := getDefaultHTracedConfDir() + io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR defaulting to %s\n", def)) + return []string{def} + } + io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR=%s\n", confDir)) + return paths +} + +// Load a configuration from the application's argv, configuration file, and the standard +// defaults. +func LoadApplicationConfig(appPrefix string) (*Config, io.Reader) { + dlog := new(bytes.Buffer) + reader := openFile(CONFIG_FILE_NAME, getHTracedConfDirs(dlog), dlog) + bld := Builder{} + if reader != nil { + defer reader.Close() + bld.Reader = bufio.NewReader(reader) + } + bld.Argv = os.Args[1:] + bld.Defaults = DEFAULTS + bld.AppPrefix = appPrefix + cnf, err := bld.Build() + if err != nil { + log.Fatal("Error building configuration: " + err.Error()) + } + os.Args = append(os.Args[0:1], bld.Argv...) + keys := make(sort.StringSlice, 0, 20) + for k, _ := range cnf.settings { + keys = append(keys, k) + } + sort.Sort(keys) + prefix := "" + io.WriteString(dlog, "Read configuration: ") + for i := range keys { + io.WriteString(dlog, fmt.Sprintf(`%s%s = "%s"`, + prefix, keys[i], cnf.settings[keys[i]])) + prefix = ", " + } + return cnf, dlog +} + +// Attempt to open a configuration file somewhere on the provided list of paths. +func openFile(cnfName string, paths []string, dlog io.Writer) io.ReadCloser { + for p := range paths { + path := fmt.Sprintf("%s%c%s", paths[p], os.PathSeparator, cnfName) + file, err := os.Open(path) + if err == nil { + io.WriteString(dlog, fmt.Sprintf("Reading configuration from %s.\n", path)) + return file + } + if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOENT { + continue + } + io.WriteString(dlog, fmt.Sprintf("Error opening %s for read: %s\n", path, err.Error())) + } + return nil +} + +// Try to parse a command-line element as a key=value pair. +func parseAsConfigFlag(flag string) (string, string) { + var confPart string + if strings.HasPrefix(flag, "-D") { + confPart = flag[2:] + } else if strings.HasPrefix(flag, "--D") { + confPart = flag[3:] + } else { + return "", "" + } + if len(confPart) == 0 { + return "", "" + } + idx := strings.Index(confPart, "=") + if idx == -1 { + return confPart, "true" + } + return confPart[0:idx], confPart[idx+1:] +} + +// Build a new configuration object from the provided conf.Builder. +func (bld *Builder) Build() (*Config, error) { + // Load values and defaults + cnf := Config{} + cnf.settings = make(map[string]string) + if bld.Values != nil { + for k, v := range bld.Values { + cnf.settings[k] = v + } + } + cnf.defaults = make(map[string]string) + if bld.Defaults != nil { + for k, v := range bld.Defaults { + cnf.defaults[k] = v + } + } + + // Process the configuration file, if we have one + if bld.Reader != nil { + parseXml(bld.Reader, cnf.settings) + } + + // Process command line arguments + var i int + for i < len(bld.Argv) { + str := bld.Argv[i] + key, val := parseAsConfigFlag(str) + if key != "" { + cnf.settings[key] = val + bld.Argv = append(bld.Argv[:i], bld.Argv[i+1:]...) + } else { + i++ + } + } + cnf.settings = bld.removeApplicationPrefixes(cnf.settings) + cnf.defaults = bld.removeApplicationPrefixes(cnf.defaults) + return &cnf, nil +} + +func (bld *Builder) removeApplicationPrefixes(in map[string]string) map[string]string { + out := make(map[string]string) + for k, v := range in { + if strings.HasPrefix(k, bld.AppPrefix) { + out[k[len(bld.AppPrefix):]] = v + } else { + out[k] = v + } + } + return out +} + +// Returns true if the configuration has a non-default value for the given key. +func (cnf *Config) Contains(key string) bool { + _, ok := cnf.settings[key] + return ok +} + +// Get a string configuration key. +func (cnf *Config) Get(key string) string { + ret, hadKey := cnf.settings[key] + if hadKey { + return ret + } + return cnf.defaults[key] +} + +// Get a boolean configuration key. +func (cnf *Config) GetBool(key string) bool { + str := cnf.settings[key] + ret, err := strconv.ParseBool(str) + if err == nil { + return ret + } + str = cnf.defaults[key] + ret, err = strconv.ParseBool(str) + if err == nil { + return ret + } + return false +} + +// Get an integer configuration key. +func (cnf *Config) GetInt(key string) int { + str := cnf.settings[key] + ret, err := strconv.Atoi(str) + if err == nil { + return ret + } + str = cnf.defaults[key] + ret, err = strconv.Atoi(str) + if err == nil { + return ret + } + return 0 +} + +// Get an int64 configuration key. +func (cnf *Config) GetInt64(key string) int64 { + str := cnf.settings[key] + ret, err := strconv.ParseInt(str, 10, 64) + if err == nil { + return ret + } + str = cnf.defaults[key] + ret, err = strconv.ParseInt(str, 10, 64) + if err == nil { + return ret + } + return 0 +} + +// Make a deep copy of the given configuration. +// Optionally, you can specify particular key/value pairs to change. +// Example: +// cnf2 := cnf.Copy("my.changed.key", "my.new.value") +func (cnf *Config) Clone(args ...string) *Config { + if len(args)%2 != 0 { + panic("The arguments to Config#copy are key1, value1, " + + "key2, value2, and so on. You must specify an even number of arguments.") + } + ncnf := &Config{defaults: cnf.defaults} + ncnf.settings = make(map[string]string) + for k, v := range cnf.settings { + ncnf.settings[k] = v + } + for i := 0; i < len(args); i += 2 { + ncnf.settings[args[i]] = args[i+1] + } + return ncnf +} + +// Export the configuration as a map +func (cnf *Config) Export() map[string]string { + m := make(map[string]string) + for k, v := range cnf.defaults { + m[k] = v + } + for k, v := range cnf.settings { + m[k] = v + } + return m +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/conf/config_keys.go b/htrace-htraced/go/src/htrace/conf/config_keys.go new file mode 100644 index 0000000..08e2de4 --- /dev/null +++ b/htrace-htraced/go/src/htrace/conf/config_keys.go @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "fmt" + "os" +) + +// +// Configuration keys for HTrace. +// + +// The platform-specific path separator. Usually slash. +var PATH_SEP string = fmt.Sprintf("%c", os.PathSeparator) + +// The platform-specific path list separator. Usually colon. +var PATH_LIST_SEP string = fmt.Sprintf("%c", os.PathListSeparator) + +// The name of the XML configuration file to look for. +const CONFIG_FILE_NAME = "htraced-conf.xml" + +// An environment variable containing a list of paths to search for the +// configuration file in. +const HTRACED_CONF_DIR = "HTRACED_CONF_DIR" + +// The web address to start the REST server on. +const HTRACE_WEB_ADDRESS = "web.address" + +// The default port for the Htrace web address. +const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9096 + +// The web address to start the REST server on. +const HTRACE_HRPC_ADDRESS = "hrpc.address" + +// The default port for the Htrace HRPC address. +const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075 + +// The directories to put the data store into. Separated by PATH_LIST_SEP. +const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories" + +// Boolean key which indicates whether we should clear data on startup. +const HTRACE_DATA_STORE_CLEAR = "data.store.clear" + +// How many writes to buffer before applying backpressure to span senders. +const HTRACE_DATA_STORE_SPAN_BUFFER_SIZE = "data.store.span.buffer.size" + +// Path to put the logs from htrace, or the empty string to use stdout. +const HTRACE_LOG_PATH = "log.path" + +// The log level to use for the logs in htrace. +const HTRACE_LOG_LEVEL = "log.level" + +// The period between datastore heartbeats. This is the approximate interval at which we will +// prune expired spans. +const HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS = "datastore.heartbeat.period.ms" + +// The maximum number of addresses for which we will maintain metrics. +const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries" + +// The number of milliseconds we should keep spans before discarding them. +const HTRACE_SPAN_EXPIRY_MS = "span.expiry.ms" + +// The period between updates to the span reaper +const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms" + +// A host:port pair to send information to on startup. This is used in unit +// tests to determine the (random) port of the htraced process that has been +// started. +const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address" + +// The maximum number of HRPC handler goroutines we will create at once. If +// this is too small, we won't get enough concurrency; if it's too big, we will +// buffer too much data in memory while waiting for the datastore to process +// requests. +const HTRACE_NUM_HRPC_HANDLERS = "num.hrpc.handlers" + +// The I/O timeout HRPC will use, in milliseconds. If it takes longer than +// this to read or write a message, we will abort the connection. +const HTRACE_HRPC_IO_TIMEOUT_MS = "hrpc.io.timeout.ms" + +// The leveldb write buffer size, or 0 to use the library default, which is 4 +// MB in leveldb 1.16. See leveldb's options.h for more details. +const HTRACE_LEVELDB_WRITE_BUFFER_SIZE = "leveldb.write.buffer.size" + +// The LRU cache size for leveldb, in bytes. +const HTRACE_LEVELDB_CACHE_SIZE = "leveldb.cache.size" + +// Default values for HTrace configuration keys. +var DEFAULTS = map[string]string{ + HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT), + HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_HRPC_ADDRESS_DEFAULT_PORT), + HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" + + PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2", + HTRACE_DATA_STORE_CLEAR: "false", + HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100", + HTRACE_LOG_PATH: "", + HTRACE_LOG_LEVEL: "INFO", + HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000), + HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000", + HTRACE_SPAN_EXPIRY_MS: "0", + HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000), + HTRACE_NUM_HRPC_HANDLERS: "20", + HTRACE_HRPC_IO_TIMEOUT_MS: "60000", + HTRACE_LEVELDB_WRITE_BUFFER_SIZE: "0", + HTRACE_LEVELDB_CACHE_SIZE: fmt.Sprintf("%d", 100*1024*1024), +} + +// Values to be used when creating test configurations +func TEST_VALUES() map[string]string { + return map[string]string{ + HTRACE_HRPC_ADDRESS: ":0", // use a random port for the HRPC server + HTRACE_LOG_LEVEL: "TRACE", // show all log messages in tests + HTRACE_WEB_ADDRESS: ":0", // use a random port for the REST server + HTRACE_SPAN_EXPIRY_MS: "0", // never time out spans (unless testing the reaper) + } +}
