Repository: incubator-htrace Updated Branches: refs/heads/master 817742a0a -> 5f871b619
HTRACE-104. Add more capabilities to the htrace command. (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/5f871b61 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/5f871b61 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/5f871b61 Branch: refs/heads/master Commit: 5f871b619f4138e1b07b804c97beee5b2d2f982e Parents: 817742a Author: Colin P. Mccabe <[email protected]> Authored: Thu Feb 5 19:59:52 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Tue Feb 24 18:21:36 2015 -0800 ---------------------------------------------------------------------- htrace-core/src/go/gobuild.sh | 4 +- .../go/src/org/apache/htrace/client/client.go | 32 ++++ .../go/src/org/apache/htrace/common/query.go | 29 ++++ .../src/org/apache/htrace/common/query_test.go | 50 ++++++ .../src/go/src/org/apache/htrace/common/span.go | 16 +- .../src/go/src/org/apache/htrace/htrace/cmd.go | 137 +++++++++++++--- .../src/go/src/org/apache/htrace/htrace/file.go | 138 ++++++++++++++++ .../src/org/apache/htrace/htrace/file_test.go | 164 +++++++++++++++++++ .../go/src/org/apache/htrace/htrace/graph.go | 111 +++++++++++++ .../src/org/apache/htrace/htrace/graph_test.go | 83 ++++++++++ .../go/src/org/apache/htrace/htrace/queries.go | 161 ++++++++++++++++++ .../org/apache/htrace/htraced/client_test.go | 76 ++++++++- .../src/go/src/org/apache/htrace/test/util.go | 33 ++++ 13 files changed, 1004 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/gobuild.sh ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/gobuild.sh b/htrace-core/src/go/gobuild.sh index 4aad01b..91cefbf 100755 --- a/htrace-core/src/go/gobuild.sh +++ b/htrace-core/src/go/gobuild.sh @@ -108,9 +108,9 @@ install) go install -ldflags "${FLAGS}" -v org/apache/htrace/... "$@" ;; bench) - go test -v org/apache/htrace/... -test.bench=. "$@" + go test org/apache/htrace/... -test.bench=. "$@" ;; *) - go ${ACTION} -v org/apache/htrace/... "$@" + go ${ACTION} org/apache/htrace/... "$@" ;; esac http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go b/htrace-core/src/go/src/org/apache/htrace/client/client.go index 82400fe..5e594d9 100644 --- a/htrace-core/src/go/src/org/apache/htrace/client/client.go +++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go @@ -155,3 +155,35 @@ func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Re } return body, 0, nil } + +// Dump all spans from the htraced daemon. +func (hcl *Client) DumpAll(lim int, out chan *common.Span) error { + defer func() { + close(out) + }() + searchId := common.SpanId(0) + for { + q := common.Query{ + Lim: lim, + Predicates: []common.Predicate{ + common.Predicate{ + Op: "ge", + Field: "spanid", + Val: searchId.String(), + }, + }, + } + spans, err := hcl.Query(&q) + if err != nil { + return errors.New(fmt.Sprintf("Error querying spans with IDs at or after "+ + "%s: %s", searchId.String(), err.Error())) + } + if len(spans) == 0 { + return nil + } + for i := range spans { + out <- &spans[i] + } + searchId = spans[len(spans)-1].Id + 1 + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/common/query.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/query.go b/htrace-core/src/go/src/org/apache/htrace/common/query.go index b59cbbe..6e7d6c7 100644 --- a/htrace-core/src/go/src/org/apache/htrace/common/query.go +++ b/htrace-core/src/go/src/org/apache/htrace/common/query.go @@ -58,6 +58,21 @@ func (op Op) IsDescending() bool { return op == LESS_THAN_OR_EQUALS } +func (op Op) IsValid() bool { + ops := ValidOps() + for i := range(ops) { + if ops[i] == op { + return true + } + } + return false +} + +func ValidOps() []Op { + return []Op{CONTAINS, EQUALS, LESS_THAN_OR_EQUALS, GREATER_THAN_OR_EQUALS, + GREATER_THAN} +} + type Field string const ( @@ -68,6 +83,20 @@ const ( DURATION Field = "duration" ) +func (field Field) IsValid() bool { + fields := ValidFields() + for i := range(fields) { + if fields[i] == field { + return true + } + } + return false +} + +func ValidFields() []Field { + return []Field{SPAN_ID, DESCRIPTION, BEGIN_TIME, END_TIME, DURATION} +} + type Predicate struct { Op Op `json:"op"` Field Field `json:"field"` http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/common/query_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/query_test.go b/htrace-core/src/go/src/org/apache/htrace/common/query_test.go new file mode 100644 index 0000000..2697d9c --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/common/query_test.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "testing" +) + +func TestValidOps(t *testing.T) { + for i := range ValidOps() { + op := ValidOps()[i] + if !op.IsValid() { + t.Fatalf("op %s was in ValidOps, but IsValid returned false.\n", op) + } + } + invalidOp := Op("completelybogus") + if invalidOp.IsValid() { + t.Fatalf("op %s was invalid, but IsValid returned true.\n", invalidOp) + } +} + +func TestValidFields(t *testing.T) { + for i := range ValidFields() { + field := ValidFields()[i] + if !field.IsValid() { + t.Fatalf("field %s was in ValidFields, but IsValid returned false.\n", field) + } + } + invalidField := Field("completelybogus") + if invalidField.IsValid() { + t.Fatalf("field %s was invalid, but IsValid returned true.\n", invalidField) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/common/span.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span.go b/htrace-core/src/go/src/org/apache/htrace/common/span.go index 64975d2..9b7af22 100644 --- a/htrace-core/src/go/src/org/apache/htrace/common/span.go +++ b/htrace-core/src/go/src/org/apache/htrace/common/span.go @@ -36,7 +36,7 @@ import ( // this JSON data, we have to simply pass it as a string. // -type TraceInfoMap map[string][]byte +type TraceInfoMap map[string]string type TimelineAnnotation struct { Time int64 `json:"t"` @@ -57,6 +57,20 @@ func (id SpanId) MarshalJSON() ([]byte, error) { return []byte(`"` + fmt.Sprintf("%016x", uint64(id)) + `"`), nil } +type SpanSlice []*Span + +func (s SpanSlice) Len() int { + return len(s) +} + +func (s SpanSlice) Less(i, j int) bool { + return s[i].Id < s[j].Id +} + +func (s SpanSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + type SpanIdSlice []SpanId func (s SpanIdSlice) Len() int { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go index 7007343..290984e 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go +++ b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go @@ -22,6 +22,7 @@ package main import ( "bufio" "encoding/json" + "errors" "fmt" "github.com/alecthomas/kingpin" "io" @@ -29,6 +30,7 @@ import ( "org/apache/htrace/common" "org/apache/htrace/conf" "os" + "time" ) var RELEASE_VERSION string @@ -65,10 +67,27 @@ func main() { parentSpanId := findChildren.Arg("id", "Span ID to print children for. Example: 0x123456789abcdef"). Required().Uint64() childLim := findChildren.Flag("lim", "Maximum number of child IDs to print.").Default("20").Int() - writeSpans := app.Command("writeSpans", "Write spans to the server in JSON form.") - spanJson := writeSpans.Flag("json", "The JSON span data to write to the server.").String() - spanFile := writeSpans.Flag("file", - "A file containing JSON span data to write to the server.").String() + loadFile := app.Command("loadFile", "Write whitespace-separated JSON spans from a file to the server.") + loadFilePath := loadFile.Arg("path", + "A file containing whitespace-separated span JSON.").Required().String() + dumpAll := app.Command("dumpAll", "Dump all spans from the htraced daemon.") + dumpAllOutPath := dumpAll.Flag("path", "The path to dump the trace spans to.").Default("-").String() + dumpAllLim := dumpAll.Flag("lim", "The number of spans to transfer from the server at once."). + Default("100").Int() + loadJson := app.Command("load", "Write JSON spans from the command-line to the server.") + loadJsonArg := loadJson.Arg("json", "A JSON span to write to the server.").Required().String() + graph := app.Command("graph", "Visualize span JSON as a graph.") + graphJsonFile := graph.Flag("input", "The JSON file to load").Required().String() + graphDotFile := graph.Flag("output", + "The path to write a GraphViz dotfile to. This file can be used as input to "+ + "GraphViz, in order to generate a pretty picture. See graphviz.org for more "+ + "information about generating pictures of graphs.").Default("-").String() + query := app.Command("query", "Send a query to htraced.") + queryLim := query.Flag("lim", "Maximum number of spans to retrieve.").Default("20").Int() + queryArg := query.Arg("query", "The query string to send. Query strings have the format "+ + "[TYPE] [OPERATOR] [CONST], joined by AND statements.").Required().String() + rawQuery := app.Command("rawQuery", "Send a raw JSON query to htraced.") + rawQueryArg := query.Arg("json", "The query JSON to send.").Required().String() cmd := kingpin.MustParse(app.Parse(os.Args[1:])) // Add the command-line settings into the configuration. @@ -76,6 +95,19 @@ func main() { cnf = cnf.Clone(conf.HTRACE_WEB_ADDRESS, *addr) } + // Handle commands that don't require an HTrace client. + switch cmd { + case version.FullCommand(): + os.Exit(printVersion()) + case graph.FullCommand(): + err := jsonSpanFileToDotFile(*graphJsonFile, *graphDotFile) + if err != nil { + fmt.Printf("graphing error: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) + } + os.Exit(EXIT_SUCCESS) + } + // Create HTrace client hcl, err := htrace.NewClient(cnf) if err != nil { @@ -83,7 +115,7 @@ func main() { os.Exit(EXIT_FAILURE) } - // Handle operation + // Handle commands that require an HTrace client. switch cmd { case version.FullCommand(): os.Exit(printVersion()) @@ -93,19 +125,31 @@ func main() { os.Exit(doFindSpan(hcl, common.SpanId(*findSpanId))) case findChildren.FullCommand(): os.Exit(doFindChildren(hcl, common.SpanId(*parentSpanId), *childLim)) - case writeSpans.FullCommand(): - if *spanJson != "" { - if *spanFile != "" { - fmt.Printf("You must specify either --json or --file, " + - "but not both.\n") - os.Exit(EXIT_FAILURE) - } - os.Exit(doWriteSpanJson(hcl, *spanJson)) - } else if *spanFile != "" { - os.Exit(doWriteSpanJsonFile(hcl, *spanFile)) + case loadJson.FullCommand(): + os.Exit(doLoadSpanJson(hcl, *loadJsonArg)) + case loadFile.FullCommand(): + os.Exit(doLoadSpanJsonFile(hcl, *loadFilePath)) + case dumpAll.FullCommand(): + err := doDumpAll(hcl, *dumpAllOutPath, *dumpAllLim) + if err != nil { + fmt.Printf("dumpAll error: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) } - fmt.Printf("You must specify either --json or --file.\n") - os.Exit(EXIT_FAILURE) + os.Exit(EXIT_SUCCESS) + case query.FullCommand(): + err := doQueryFromString(hcl, *queryArg, *queryLim) + if err != nil { + fmt.Printf("query error: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) + } + os.Exit(EXIT_SUCCESS) + case rawQuery.FullCommand(): + err := doRawQuery(hcl, *rawQueryArg) + if err != nil { + fmt.Printf("raw query error: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) + } + os.Exit(EXIT_SUCCESS) } app.UsageErrorf(os.Stderr, "You must supply a command to run.") @@ -148,8 +192,12 @@ func doFindSpan(hcl *htrace.Client, sid common.SpanId) int { return EXIT_SUCCESS } -func doWriteSpanJsonFile(hcl *htrace.Client, spanFile string) int { - file, err := os.Open(spanFile) +func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int { + if spanFile == "" { + fmt.Printf("You must specify the json file to load.\n") + return EXIT_FAILURE + } + file, err := OpenInputFile(spanFile) if err != nil { fmt.Printf("Failed to open %s: %s\n", spanFile, err.Error()) return EXIT_FAILURE @@ -177,7 +225,7 @@ func doWriteSpanJsonFile(hcl *htrace.Client, spanFile string) int { return EXIT_SUCCESS } -func doWriteSpanJson(hcl *htrace.Client, spanJson string) int { +func doLoadSpanJson(hcl *htrace.Client, spanJson string) int { spanBytes := []byte(spanJson) var span common.Span err := json.Unmarshal(spanBytes, &span) @@ -208,3 +256,52 @@ func doFindChildren(hcl *htrace.Client, sid common.SpanId, lim int) int { fmt.Printf("%s\n", string(pbuf)) return 0 } + +// Dump all spans from the htraced daemon. +func doDumpAll(hcl *htrace.Client, outPath string, lim int) error { + file, err := CreateOutputFile(outPath) + if err != nil { + return err + } + defer func() { + if file != nil { + file.Close() + } + }() + out := make(chan *common.Span, 50) + var dumpErr error + go func() { + dumpErr = hcl.DumpAll(lim, out) + }() + var numSpans int64 + nextLogTime := time.Now().Add(time.Second * 5) + for { + span, channelOpen := <-out + if !channelOpen { + break + } + if err != nil { + _, err = fmt.Fprintf(file, "%s\n", span.ToJson()) + } + if *verbose { + numSpans++ + now := time.Now() + if !now.Before(nextLogTime) { + nextLogTime = now.Add(time.Second * 5) + fmt.Printf("wrote %d span(s)...\n", numSpans) + } + } + } + if err != nil { + return errors.New(fmt.Sprintf("Write error %s", err.Error())) + } + if dumpErr != nil { + return errors.New(fmt.Sprintf("Dump error %s", dumpErr.Error())) + } + err = file.Close() + file = nil + if err != nil { + return err + } + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/htrace/file.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/file.go b/htrace-core/src/go/src/org/apache/htrace/htrace/file.go new file mode 100644 index 0000000..ea214be --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htrace/file.go @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "org/apache/htrace/common" + "os" +) + +// A file used for input. +// Transparently supports using stdin for input. +type InputFile struct { + *os.File + path string +} + +// Open an input file. Stdin will be used when path is - +func OpenInputFile(path string) (*InputFile, error) { + if path == "-" { + return &InputFile{File: os.Stdin, path: path}, nil + } + file, err := os.Open(path) + if err != nil { + return nil, err + } + return &InputFile{File: file, path: path}, nil +} + +func (file *InputFile) Close() { + if file.path != "-" { + file.File.Close() + } +} + +// A file used for output. +// Transparently supports using stdout for output. +type OutputFile struct { + *os.File + path string +} + +// Create an output file. Stdout will be used when path is - +func CreateOutputFile(path string) (*OutputFile, error) { + if path == "-" { + return &OutputFile{File: os.Stdout, path: path}, nil + } + file, err := os.Create(path) + if err != nil { + return nil, err + } + return &OutputFile{File: file, path: path}, nil +} + +func (file *OutputFile) Close() error { + if file.path != "-" { + return file.File.Close() + } + return nil +} + +// FailureDeferringWriter is a writer which allows us to call Printf multiple +// times and then check if all the printfs succeeded at the very end, rather +// than checking after each call. We will not attempt to write more data +// after the first write failure. +type FailureDeferringWriter struct { + io.Writer + err error +} + +func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter { + return &FailureDeferringWriter{writer, nil} +} + +func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) { + if w.err != nil { + return + } + str := fmt.Sprintf(format, v...) + _, err := w.Writer.Write([]byte(str)) + if err != nil { + w.err = err + } +} + +func (w *FailureDeferringWriter) Error() error { + return w.err +} + +// Read a file full of whitespace-separated span JSON into a slice of spans. +func readSpansFile(path string) (common.SpanSlice, error) { + file, err := OpenInputFile(path) + if err != nil { + return nil, err + } + defer file.Close() + return readSpans(bufio.NewReader(file)) +} + +// Read whitespace-separated span JSON into a slice of spans. +func readSpans(reader io.Reader) (common.SpanSlice, error) { + spans := make(common.SpanSlice, 0) + dec := json.NewDecoder(reader) + for { + var span common.Span + err := dec.Decode(&span) + if err != nil { + if err != io.EOF { + return nil, errors.New(fmt.Sprintf("Decode error after decoding %d "+ + "span(s): %s", len(spans), err.Error())) + } + break + } + spans = append(spans, &span) + } + return spans, nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/htrace/file_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/file_test.go b/htrace-core/src/go/src/org/apache/htrace/htrace/file_test.go new file mode 100644 index 0000000..b6f9cac --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htrace/file_test.go @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "errors" + "io" + "io/ioutil" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "org/apache/htrace/test" + "os" + "strings" + "testing" +) + +func TestInputFileAndOutputFile(t *testing.T) { + tdir, err := ioutil.TempDir(os.TempDir(), "TestInputFileAndOutputFile") + if err != nil { + t.Fatalf("failed to create TempDir: %s\n", err.Error()) + } + defer os.RemoveAll(tdir) + tpath := tdir + conf.PATH_SEP + "test" + var ofile *OutputFile + ofile, err = CreateOutputFile(tpath) + if err != nil { + t.Fatalf("failed to create OutputFile at %s: %s\n", tpath, err.Error()) + } + defer func() { + if ofile != nil { + ofile.Close() + } + }() + w := NewFailureDeferringWriter(ofile) + w.Printf("Hello, world!\n") + w.Printf("2 + 2 = %d\n", 4) + if w.Error() != nil { + t.Fatalf("got unexpected error writing to %s: %s\n", tpath, w.Error().Error()) + } + err = ofile.Close() + ofile = nil + if err != nil { + t.Fatalf("error on closing OutputFile for %s: %s\n", tpath, err.Error()) + } + var ifile *InputFile + ifile, err = OpenInputFile(tpath) + defer ifile.Close() + expected := "Hello, world!\n2 + 2 = 4\n" + buf := make([]byte, len(expected)) + _, err = io.ReadAtLeast(ifile, buf, len(buf)) + if err != nil { + t.Fatalf("unexpected error on reading %s: %s\n", tpath, err.Error()) + } + str := string(buf) + if str != expected { + t.Fatalf("Could not read back what we wrote to %s.\n"+ + "Got:\n%s\nExpected:\n%s\n", tpath, str, expected) + } +} + +type LimitedBufferWriter struct { + buf []byte + off int +} + +const LIMITED_BUFFER_MESSAGE = "There isn't enough buffer to go around!" + +func (w *LimitedBufferWriter) Write(p []byte) (int, error) { + var nwritten int + for i := range p { + if w.off >= len(w.buf) { + return nwritten, errors.New(LIMITED_BUFFER_MESSAGE) + } + w.buf[w.off] = p[i] + w.off = w.off + 1 + nwritten++ + } + return nwritten, nil +} + +func TestFailureDeferringWriter(t *testing.T) { + lw := LimitedBufferWriter{buf: make([]byte, 20), off: 0} + w := NewFailureDeferringWriter(&lw) + w.Printf("Zippity do dah #%d\n", 1) + w.Printf("Zippity do dah #%d\n", 2) + if w.Error() == nil { + t.Fatalf("expected FailureDeferringWriter to experience a failure due to " + + "limited buffer size, but it did not.") + } + if w.Error().Error() != LIMITED_BUFFER_MESSAGE { + t.Fatalf("expected FailureDeferringWriter to have the error message %s, but "+ + "the message was %s\n", LIMITED_BUFFER_MESSAGE, w.Error().Error()) + } + expected := "Zippity do dah #1\nZi" + if string(lw.buf) != expected { + t.Fatalf("expected LimitedBufferWriter to contain %s, but it contained %s "+ + "instead.\n", expected, string(lw.buf)) + } +} + +func TestReadSpans(t *testing.T) { + SPAN_TEST_STR := `{"i":"bdd6d4ee48de59bf","s":"c0681027d3ea4928",` + + `"b":1424736225037,"e":1424736225901,"d":"ClientNamenodeProtocol#getFileInfo",` + + `"r":"FsShell","p":["60538dfb4df91418"]} +{"i":"bdd6d4ee48de59bf","s":"60538dfb4df91418","b":1424736224969,` + + `"e":1424736225960,"d":"getFileInfo","r":"FsShell","p":[],"n":{"path":"/"}} +` + r := strings.NewReader(SPAN_TEST_STR) + spans, err := readSpans(r) + if err != nil { + t.Fatalf("Failed to read spans from string via readSpans: %s\n", err.Error()) + } + SPAN_TEST_EXPECTED := common.SpanSlice{ + &common.Span{ + Id: test.SpanId("c0681027d3ea4928"), + SpanData: common.SpanData{ + TraceId: test.SpanId("bdd6d4ee48de59bf"), + Begin: 1424736225037, + End: 1424736225901, + Description: "ClientNamenodeProtocol#getFileInfo", + ProcessId: "FsShell", + Parents: []common.SpanId{test.SpanId("60538dfb4df91418")}, + }, + }, + &common.Span{ + Id: test.SpanId("60538dfb4df91418"), + SpanData: common.SpanData{ + TraceId: test.SpanId("bdd6d4ee48de59bf"), + Begin: 1424736224969, + End: 1424736225960, + Description: "getFileInfo", + ProcessId: "FsShell", + Parents: []common.SpanId{}, + Info: common.TraceInfoMap{ + "path": "/", + }, + }, + }, + } + if len(spans) != len(SPAN_TEST_EXPECTED) { + t.Fatalf("Expected %d spans, but got %d\n", + len(SPAN_TEST_EXPECTED), len(spans)) + } + for i := range SPAN_TEST_EXPECTED { + common.ExpectSpansEqual(t, spans[i], SPAN_TEST_EXPECTED[i]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/htrace/graph.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/graph.go b/htrace-core/src/go/src/org/apache/htrace/htrace/graph.go new file mode 100644 index 0000000..fd55592 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htrace/graph.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "errors" + "fmt" + "io" + "org/apache/htrace/common" + "os" + "sort" +) + +// Create a dotfile from a json file. +func jsonSpanFileToDotFile(jsonFile string, dotFile string) error { + spans, err := readSpansFile(jsonFile) + if err != nil { + return errors.New(fmt.Sprintf("error reading %s: %s", + jsonFile, err.Error())) + } + var file *OutputFile + file, err = CreateOutputFile(dotFile) + if err != nil { + return errors.New(fmt.Sprintf("error opening %s for write: %s", + dotFile, err.Error())) + } + defer func() { + if file != nil { + file.Close() + } + }() + writer := bufio.NewWriter(file) + err = spansToDot(spans, writer) + if err != nil { + return err + } + err = file.Close() + file = nil + return err +} + +// Create output in dotfile format from a set of spans. +func spansToDot(spans common.SpanSlice, writer io.Writer) error { + sort.Sort(spans) + idMap := make(map[common.SpanId]*common.Span) + for i := range spans { + span := spans[i] + if idMap[span.Id] != nil { + fmt.Fprintf(os.Stderr, "There were multiple spans listed which "+ + "had ID %s.\nFirst:%s\nOther:%s\n", span.Id.String(), + idMap[span.Id].ToJson(), span.ToJson()) + } else { + idMap[span.Id] = span + } + } + childMap := make(map[common.SpanId][]*common.Span) + for i := range spans { + child := spans[i] + for j := range child.Parents { + parent := idMap[child.Parents[j]] + if parent == nil { + fmt.Fprintf(os.Stderr, "Can't find parent id %s for %s\n", + child.Parents[j].String(), child.ToJson()) + } else { + children := childMap[parent.Id] + if children == nil { + children = make([]*common.Span, 0) + } + children = append(children, child) + childMap[parent.Id] = children + } + } + } + w := NewFailureDeferringWriter(writer) + w.Printf("digraph spans {\n") + // Write out the nodes with their descriptions. + for i := range spans { + w.Printf(fmt.Sprintf(` "%s" [label="%s"];`+"\n", + spans[i].Id.String(), spans[i].Description)) + } + // Write out the edges between nodes... the parent/children relationships + for i := range spans { + children := childMap[spans[i].Id] + if children != nil { + for c := range children { + w.Printf(fmt.Sprintf(` "%s" -> "%s";`+"\n", + spans[i].Id.String(), children[c].Id)) + } + } + } + w.Printf("}") + return w.Error() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/htrace/graph_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/graph_test.go b/htrace-core/src/go/src/org/apache/htrace/htrace/graph_test.go new file mode 100644 index 0000000..6cf0c3a --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htrace/graph_test.go @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "org/apache/htrace/common" + "org/apache/htrace/test" + "testing" +) + +func TestSpansToDot(t *testing.T) { + TEST_SPANS := common.SpanSlice{ + &common.Span{ + Id: test.SpanId("6af3cc058e5d829d"), + SpanData: common.SpanData{ + TraceId: test.SpanId("0e4716fe911244de"), + Begin: 1424813349020, + End: 1424813349134, + Description: "newDFSInputStream", + ProcessId: "FsShell", + Parents: []common.SpanId{}, + Info: common.TraceInfoMap{ + "path": "/", + }, + }, + }, + &common.Span{ + Id: test.SpanId("75d16cc5b2c07d8a"), + SpanData: common.SpanData{ + TraceId: test.SpanId("0e4716fe911244de"), + Begin: 1424813349025, + End: 1424813349133, + Description: "getBlockLocations", + ProcessId: "FsShell", + Parents: []common.SpanId{test.SpanId("6af3cc058e5d829d")}, + }, + }, + &common.Span{ + Id: test.SpanId("e2c7273efb280a8c"), + SpanData: common.SpanData{ + TraceId: test.SpanId("0e4716fe911244de"), + Begin: 1424813349027, + End: 1424813349073, + Description: "ClientNamenodeProtocol#getBlockLocations", + ProcessId: "FsShell", + Parents: []common.SpanId{test.SpanId("75d16cc5b2c07d8a")}, + }, + }, + } + w := bytes.NewBuffer(make([]byte, 0, 2048)) + err := spansToDot(TEST_SPANS, w) + if err != nil { + t.Fatalf("spansToDot failed: error %s\n", err.Error()) + } + EXPECTED_STR := `digraph spans { + "e2c7273efb280a8c" [label="ClientNamenodeProtocol#getBlockLocations"]; + "6af3cc058e5d829d" [label="newDFSInputStream"]; + "75d16cc5b2c07d8a" [label="getBlockLocations"]; + "6af3cc058e5d829d" -> "75d16cc5b2c07d8a"; + "75d16cc5b2c07d8a" -> "e2c7273efb280a8c"; +}` + if w.String() != EXPECTED_STR { + t.Fatalf("Expected to get:\n%s\nGot:\n%s\n", EXPECTED_STR, w.String()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/htrace/queries.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/queries.go b/htrace-core/src/go/src/org/apache/htrace/htrace/queries.go new file mode 100644 index 0000000..4ff246c --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htrace/queries.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "encoding/json" + "errors" + "fmt" + htrace "org/apache/htrace/client" + "org/apache/htrace/common" + "strings" + "unicode" +) + +// Convert a string into a whitespace-separated sequence of strings. +func tokenize(str string) []string { + prevQuote := rune(0) + f := func(c rune) bool { + switch { + case c == prevQuote: + prevQuote = rune(0) + return false + case prevQuote != rune(0): + return false + case unicode.In(c, unicode.Quotation_Mark): + prevQuote = c + return false + default: + return unicode.IsSpace(c) + } + } + return strings.FieldsFunc(str, f) +} + +// Parses a query string in the format of a series of +// [TYPE] [OPERATOR] [CONST] tuples, joined by AND statements. +type predicateParser struct { + tokens []string + curToken int +} + +func (ps *predicateParser) Parse() (*common.Predicate, error) { + if ps.curToken > len(ps.tokens) { + return nil, nil + } + if ps.curToken > 0 { + if strings.ToLower(ps.tokens[ps.curToken]) != "and" { + return nil, errors.New(fmt.Sprintf("Error parsing on token %d: "+ + "expected predicates to be joined by 'and', but found '%s'", + ps.curToken, ps.tokens[ps.curToken])) + } + ps.curToken++ + if ps.curToken > len(ps.tokens) { + return nil, errors.New(fmt.Sprintf("Nothing found after 'and' at "+ + "token %d", ps.curToken)) + } + } + field := common.Field(ps.tokens[ps.curToken]) + if !field.IsValid() { + return nil, errors.New(fmt.Sprintf("Invalid field specifier at token %d. "+ + "Can't understand %s. Valid field specifiers are %v", ps.curToken, + ps.tokens[ps.curToken], common.ValidFields())) + } + ps.curToken++ + if ps.curToken > len(ps.tokens) { + return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+ + "at token %d", ps.curToken)) + } + op := common.Op(ps.tokens[ps.curToken]) + if !op.IsValid() { + return nil, errors.New(fmt.Sprintf("Invalid operation specifier at token %d. "+ + "Can't understand %s. Valid operation specifiers are %v", ps.curToken, + ps.tokens[ps.curToken], common.ValidOps())) + } + ps.curToken++ + if ps.curToken > len(ps.tokens) { + return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+ + "at token %d", ps.curToken)) + } + val := ps.tokens[ps.curToken] + return &common.Predicate{Op: op, Field: field, Val: val}, nil +} + +func parseQueryString(str string) ([]common.Predicate, error) { + ps := predicateParser{tokens: tokenize(str)} + preds := make([]common.Predicate, 0) + for { + pred, err := ps.Parse() + if pred == nil { + break + } + if err != nil { + return nil, err + } + } + if len(preds) == 0 { + return nil, errors.New("Empty query string") + } + return preds, nil +} + +// Send a query from a query string. +func doQueryFromString(hcl *htrace.Client, str string, lim int) error { + query := &common.Query{Lim: lim} + var err error + query.Predicates, err = parseQueryString(str) + if err != nil { + return err + } + return doQuery(hcl, query) +} + +// Send a query from a raw JSON string. +func doRawQuery(hcl *htrace.Client, str string) error { + jsonBytes := []byte(str) + var query common.Query + err := json.Unmarshal(jsonBytes, &query) + if err != nil { + return errors.New(fmt.Sprintf("Error parsing provided JSON: %s\n", err.Error())) + } + return doQuery(hcl, &query) +} + +// Send a query. +func doQuery(hcl *htrace.Client, query *common.Query) error { + if *verbose { + qbytes, err := json.Marshal(*query) + if err != nil { + qbytes = []byte("marshaling error: " + err.Error()) + } + fmt.Printf("Sending query: %s\n", string(qbytes)) + } + spans, err := hcl.Query(query) + if err != nil { + return err + } + if *verbose { + fmt.Printf("%d results...\n", len(spans)) + } + for i := range spans { + fmt.Printf("%s\n", spans[i].ToJson()) + } + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go index 044c7ba..07e7a2c 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go @@ -20,11 +20,14 @@ package main import ( + "fmt" "math/rand" htrace "org/apache/htrace/client" "org/apache/htrace/common" "org/apache/htrace/test" + "sort" "testing" + "time" ) func TestClientGetServerInfo(t *testing.T) { @@ -45,6 +48,17 @@ func TestClientGetServerInfo(t *testing.T) { } } +func createRandomTestSpans(amount int) common.SpanSlice { + rnd := rand.New(rand.NewSource(2)) + allSpans := make(common.SpanSlice, amount) + allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0]) + for i := 1; i < amount; i++ { + allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i]) + } + allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)} + return allSpans +} + func TestClientOperations(t *testing.T) { htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations", NumDataDirs: 2} ht, err := htraceBld.Build() @@ -60,13 +74,7 @@ func TestClientOperations(t *testing.T) { // Create some random trace spans. NUM_TEST_SPANS := 30 - rnd := rand.New(rand.NewSource(2)) - allSpans := make([]*common.Span, NUM_TEST_SPANS) - allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0]) - for i := 1; i < NUM_TEST_SPANS; i++ { - allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i]) - } - allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)} + allSpans := createRandomTestSpans(NUM_TEST_SPANS) // Write half of the spans to htraced via the client. for i := 0; i < NUM_TEST_SPANS/2; i++ { @@ -137,3 +145,57 @@ func TestClientOperations(t *testing.T) { "children: expected %d, got %d\n", 10, 10, len(spans)) } } + +func TestDumpAll(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll", NumDataDirs: 2} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + + NUM_TEST_SPANS := 100 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + sort.Sort(allSpans) + for i := range allSpans { + err = hcl.WriteSpan(allSpans[i]) + if err != nil { + t.Fatalf("failed to write span %d: %s", i, err.Error()) + } + } + out := make(chan *common.Span, 50) + var dumpErr error + go func() { + dumpErr = hcl.DumpAll(3, out) + }() + var numSpans int + nextLogTime := time.Now().Add(time.Millisecond * 5) + for { + span, channelOpen := <-out + if !channelOpen { + break + } + common.ExpectSpansEqual(t, allSpans[numSpans], span) + numSpans++ + if testing.Verbose() { + now := time.Now() + if !now.Before(nextLogTime) { + nextLogTime = now + nextLogTime = nextLogTime.Add(time.Millisecond * 5) + fmt.Printf("read back %d span(s)...\n", numSpans) + } + } + } + if numSpans != len(allSpans) { + t.Fatalf("expected to read %d spans... but only read %d\n", + len(allSpans), numSpans) + } + if dumpErr != nil { + t.Fatalf("got dump error %s\n", dumpErr.Error()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5f871b61/htrace-core/src/go/src/org/apache/htrace/test/util.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/test/util.go b/htrace-core/src/go/src/org/apache/htrace/test/util.go new file mode 100644 index 0000000..cc058e0 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/test/util.go @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package test + +import ( + "org/apache/htrace/common" +) + +func SpanId(str string) common.SpanId { + var spanId common.SpanId + err := spanId.FromString(str) + if err != nil { + panic(err.Error()) + } + return spanId +}
