http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/conf/xml.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/conf/xml.go b/htrace-htraced/src/go/src/org/apache/htrace/conf/xml.go deleted file mode 100644 index de14bc5..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/conf/xml.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package conf - -import ( - "encoding/xml" - "io" - "log" -) - -type configuration struct { - Properties []propertyXml `xml:"property"` -} - -type propertyXml struct { - Name string `xml:"name"` - Value string `xml:"value"` -} - -// Parse an XML configuration file. -func parseXml(reader io.Reader, m map[string]string) error { - dec := xml.NewDecoder(reader) - configurationXml := configuration{} - err := dec.Decode(&configurationXml) - if err != nil { - return err - } - props := configurationXml.Properties - for p := range props { - key := props[p].Name - value := props[p].Value - if key == "" { - log.Println("Warning: ignoring element with missing or empty <name>.") - continue - } - if value == "" { - log.Println("Warning: ignoring element with key " + key + " with missing or empty <value>.") - continue - } - //log.Printf("setting %s to %s\n", key, value) - m[key] = value - } - return nil -}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htrace/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/cmd.go deleted file mode 100644 index 38cdb58..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htrace/cmd.go +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bufio" - "bytes" - "encoding/json" - "errors" - "fmt" - "github.com/alecthomas/kingpin" - "io" - htrace "org/apache/htrace/client" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "os" - "time" -) - -var RELEASE_VERSION string -var GIT_VERSION string - -const EXIT_SUCCESS = 0 -const EXIT_FAILURE = 1 - -var verbose *bool - -const USAGE = `The Apache HTrace command-line tool. This tool retrieves and modifies settings and -other data on a running htraced daemon. - -If we find an ` + conf.CONFIG_FILE_NAME + ` configuration file in the list of directories -specified in ` + conf.HTRACED_CONF_DIR + `, we will use that configuration; otherwise, -the defaults will be used. -` - -func main() { - // Load htraced configuration - cnf := common.LoadApplicationConfig() - - // Parse argv - app := kingpin.New(os.Args[0], USAGE) - app.Flag("Dmy.key", "Set configuration key 'my.key' to 'my.value'. Replace 'my.key' "+ - "with any key you want to set.").Default("my.value").String() - addr := app.Flag("addr", "Server address.").String() - verbose = app.Flag("verbose", "Verbose.").Default("false").Bool() - version := app.Command("version", "Print the version of this program.") - serverInfo := app.Command("serverInfo", "Print information retrieved from an htraced server.") - findSpan := app.Command("findSpan", "Print information about a trace span with a given ID.") - findSpanId := findSpan.Arg("id", "Span ID to find. Example: 0x123456789abcdef").Required().Uint64() - findChildren := app.Command("findChildren", "Print out the span IDs that are children of a given span ID.") - parentSpanId := findChildren.Arg("id", "Span ID to print children for. Example: 0x123456789abcdef"). - Required().Uint64() - childLim := findChildren.Flag("lim", "Maximum number of child IDs to print.").Default("20").Int() - loadFile := app.Command("loadFile", "Write whitespace-separated JSON spans from a file to the server.") - loadFilePath := loadFile.Arg("path", - "A file containing whitespace-separated span JSON.").Required().String() - loadJson := app.Command("load", "Write JSON spans from the command-line to the server.") - loadJsonArg := loadJson.Arg("json", "A JSON span to write to the server.").Required().String() - dumpAll := app.Command("dumpAll", "Dump all spans from the htraced daemon.") - dumpAllOutPath := dumpAll.Arg("path", "The path to dump the trace spans to.").Default("-").String() - dumpAllLim := dumpAll.Flag("lim", "The number of spans to transfer from the server at once."). - Default("100").Int() - graph := app.Command("graph", "Visualize span JSON as a graph.") - graphJsonFile := graph.Arg("input", "The JSON file to load").Required().String() - graphDotFile := graph.Flag("output", - "The path to write a GraphViz dotfile to. This file can be used as input to "+ - "GraphViz, in order to generate a pretty picture. See graphviz.org for more "+ - "information about generating pictures of graphs.").Default("-").String() - query := app.Command("query", "Send a query to htraced.") - queryLim := query.Flag("lim", "Maximum number of spans to retrieve.").Default("20").Int() - queryArg := query.Arg("query", "The query string to send. Query strings have the format "+ - "[TYPE] [OPERATOR] [CONST], joined by AND statements.").Required().String() - rawQuery := app.Command("rawQuery", "Send a raw JSON query to htraced.") - rawQueryArg := query.Arg("json", "The query JSON to send.").Required().String() - cmd := kingpin.MustParse(app.Parse(os.Args[1:])) - - // Add the command-line settings into the configuration. - if *addr != "" { - cnf = cnf.Clone(conf.HTRACE_WEB_ADDRESS, *addr) - } - - // Handle commands that don't require an HTrace client. - switch cmd { - case version.FullCommand(): - os.Exit(printVersion()) - case graph.FullCommand(): - err := jsonSpanFileToDotFile(*graphJsonFile, *graphDotFile) - if err != nil { - fmt.Printf("graphing error: %s\n", err.Error()) - os.Exit(EXIT_FAILURE) - } - os.Exit(EXIT_SUCCESS) - } - - // Create HTrace client - hcl, err := htrace.NewClient(cnf) - if err != nil { - fmt.Printf("Failed to create HTrace client: %s\n", err.Error()) - os.Exit(EXIT_FAILURE) - } - - // Handle commands that require an HTrace client. - switch cmd { - case version.FullCommand(): - os.Exit(printVersion()) - case serverInfo.FullCommand(): - os.Exit(printServerInfo(hcl)) - case findSpan.FullCommand(): - os.Exit(doFindSpan(hcl, common.SpanId(*findSpanId))) - case findChildren.FullCommand(): - os.Exit(doFindChildren(hcl, common.SpanId(*parentSpanId), *childLim)) - case loadJson.FullCommand(): - os.Exit(doLoadSpanJson(hcl, *loadJsonArg)) - case loadFile.FullCommand(): - os.Exit(doLoadSpanJsonFile(hcl, *loadFilePath)) - case dumpAll.FullCommand(): - err := doDumpAll(hcl, *dumpAllOutPath, *dumpAllLim) - if err != nil { - fmt.Printf("dumpAll error: %s\n", err.Error()) - os.Exit(EXIT_FAILURE) - } - os.Exit(EXIT_SUCCESS) - case query.FullCommand(): - err := doQueryFromString(hcl, *queryArg, *queryLim) - if err != nil { - fmt.Printf("query error: %s\n", err.Error()) - os.Exit(EXIT_FAILURE) - } - os.Exit(EXIT_SUCCESS) - case rawQuery.FullCommand(): - err := doRawQuery(hcl, *rawQueryArg) - if err != nil { - fmt.Printf("raw query error: %s\n", err.Error()) - os.Exit(EXIT_FAILURE) - } - os.Exit(EXIT_SUCCESS) - } - - app.UsageErrorf(os.Stderr, "You must supply a command to run.") -} - -// Print the version of the htrace binary. -func printVersion() int { - fmt.Printf("Running htrace command version %s.\n", RELEASE_VERSION) - return EXIT_SUCCESS -} - -// Print information retrieved from an htraced server via /server/info -func printServerInfo(hcl *htrace.Client) int { - info, err := hcl.GetServerInfo() - if err != nil { - fmt.Println(err.Error()) - return EXIT_FAILURE - } - fmt.Printf("HTraced server version %s (%s)\n", info.ReleaseVersion, info.GitVersion) - return EXIT_SUCCESS -} - -// Print information about a trace span. -func doFindSpan(hcl *htrace.Client, sid common.SpanId) int { - span, err := hcl.FindSpan(sid) - if err != nil { - fmt.Println(err.Error()) - return EXIT_FAILURE - } - if span == nil { - fmt.Printf("Span ID not found.\n") - return EXIT_FAILURE - } - pbuf, err := json.MarshalIndent(span, "", " ") - if err != nil { - fmt.Printf("Error: error pretty-printing span to JSON: %s\n", err.Error()) - return EXIT_FAILURE - } - fmt.Printf("%s\n", string(pbuf)) - return EXIT_SUCCESS -} - -func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int { - if spanFile == "" { - fmt.Printf("You must specify the json file to load.\n") - return EXIT_FAILURE - } - file, err := OpenInputFile(spanFile) - if err != nil { - fmt.Printf("Failed to open %s: %s\n", spanFile, err.Error()) - return EXIT_FAILURE - } - defer file.Close() - return doLoadSpans(hcl, bufio.NewReader(file)) -} - -func doLoadSpanJson(hcl *htrace.Client, spanJson string) int { - return doLoadSpans(hcl, bytes.NewBufferString(spanJson)) -} - -func doLoadSpans(hcl *htrace.Client, reader io.Reader) int { - dec := json.NewDecoder(reader) - spans := make([]*common.Span, 0, 32) - var err error - for { - var span common.Span - if err = dec.Decode(&span); err != nil { - if err == io.EOF { - break - } - fmt.Printf("Failed to decode JSON: %s\n", err.Error()) - return EXIT_FAILURE - } - spans = append(spans, &span) - } - if *verbose { - fmt.Printf("Writing ") - prefix := "" - for i := range spans { - fmt.Printf("%s%s", prefix, spans[i].ToJson()) - prefix = ", " - } - fmt.Printf("\n") - } - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: spans, - }) - if err != nil { - fmt.Println(err.Error()) - return EXIT_FAILURE - } - return EXIT_SUCCESS -} - -// Find information about the children of a span. -func doFindChildren(hcl *htrace.Client, sid common.SpanId, lim int) int { - spanIds, err := hcl.FindChildren(sid, lim) - if err != nil { - fmt.Printf("%s\n", err.Error()) - return EXIT_FAILURE - } - pbuf, err := json.MarshalIndent(spanIds, "", " ") - if err != nil { - fmt.Println("Error: error pretty-printing span IDs to JSON: %s", err.Error()) - return 1 - } - fmt.Printf("%s\n", string(pbuf)) - return 0 -} - -// Dump all spans from the htraced daemon. -func doDumpAll(hcl *htrace.Client, outPath string, lim int) error { - file, err := CreateOutputFile(outPath) - if err != nil { - return err - } - w := bufio.NewWriter(file) - defer func() { - if file != nil { - w.Flush() - file.Close() - } - }() - out := make(chan *common.Span, 50) - var dumpErr error - go func() { - dumpErr = hcl.DumpAll(lim, out) - }() - var numSpans int64 - nextLogTime := time.Now().Add(time.Second * 5) - for { - span, channelOpen := <-out - if !channelOpen { - break - } - if err == nil { - _, err = fmt.Fprintf(w, "%s\n", span.ToJson()) - } - if *verbose { - numSpans++ - now := time.Now() - if !now.Before(nextLogTime) { - nextLogTime = now.Add(time.Second * 5) - fmt.Printf("received %d span(s)...\n", numSpans) - } - } - } - if err != nil { - return errors.New(fmt.Sprintf("Write error %s", err.Error())) - } - if dumpErr != nil { - return errors.New(fmt.Sprintf("Dump error %s", dumpErr.Error())) - } - err = w.Flush() - if err != nil { - return err - } - err = file.Close() - file = nil - if err != nil { - return err - } - return nil -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htrace/file.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/file.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/file.go deleted file mode 100644 index ea214be..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htrace/file.go +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bufio" - "encoding/json" - "errors" - "fmt" - "io" - "org/apache/htrace/common" - "os" -) - -// A file used for input. -// Transparently supports using stdin for input. -type InputFile struct { - *os.File - path string -} - -// Open an input file. Stdin will be used when path is - -func OpenInputFile(path string) (*InputFile, error) { - if path == "-" { - return &InputFile{File: os.Stdin, path: path}, nil - } - file, err := os.Open(path) - if err != nil { - return nil, err - } - return &InputFile{File: file, path: path}, nil -} - -func (file *InputFile) Close() { - if file.path != "-" { - file.File.Close() - } -} - -// A file used for output. -// Transparently supports using stdout for output. -type OutputFile struct { - *os.File - path string -} - -// Create an output file. Stdout will be used when path is - -func CreateOutputFile(path string) (*OutputFile, error) { - if path == "-" { - return &OutputFile{File: os.Stdout, path: path}, nil - } - file, err := os.Create(path) - if err != nil { - return nil, err - } - return &OutputFile{File: file, path: path}, nil -} - -func (file *OutputFile) Close() error { - if file.path != "-" { - return file.File.Close() - } - return nil -} - -// FailureDeferringWriter is a writer which allows us to call Printf multiple -// times and then check if all the printfs succeeded at the very end, rather -// than checking after each call. We will not attempt to write more data -// after the first write failure. -type FailureDeferringWriter struct { - io.Writer - err error -} - -func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter { - return &FailureDeferringWriter{writer, nil} -} - -func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) { - if w.err != nil { - return - } - str := fmt.Sprintf(format, v...) - _, err := w.Writer.Write([]byte(str)) - if err != nil { - w.err = err - } -} - -func (w *FailureDeferringWriter) Error() error { - return w.err -} - -// Read a file full of whitespace-separated span JSON into a slice of spans. -func readSpansFile(path string) (common.SpanSlice, error) { - file, err := OpenInputFile(path) - if err != nil { - return nil, err - } - defer file.Close() - return readSpans(bufio.NewReader(file)) -} - -// Read whitespace-separated span JSON into a slice of spans. -func readSpans(reader io.Reader) (common.SpanSlice, error) { - spans := make(common.SpanSlice, 0) - dec := json.NewDecoder(reader) - for { - var span common.Span - err := dec.Decode(&span) - if err != nil { - if err != io.EOF { - return nil, errors.New(fmt.Sprintf("Decode error after decoding %d "+ - "span(s): %s", len(spans), err.Error())) - } - break - } - spans = append(spans, &span) - } - return spans, nil -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htrace/file_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/file_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/file_test.go deleted file mode 100644 index b6f9cac..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htrace/file_test.go +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "errors" - "io" - "io/ioutil" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "org/apache/htrace/test" - "os" - "strings" - "testing" -) - -func TestInputFileAndOutputFile(t *testing.T) { - tdir, err := ioutil.TempDir(os.TempDir(), "TestInputFileAndOutputFile") - if err != nil { - t.Fatalf("failed to create TempDir: %s\n", err.Error()) - } - defer os.RemoveAll(tdir) - tpath := tdir + conf.PATH_SEP + "test" - var ofile *OutputFile - ofile, err = CreateOutputFile(tpath) - if err != nil { - t.Fatalf("failed to create OutputFile at %s: %s\n", tpath, err.Error()) - } - defer func() { - if ofile != nil { - ofile.Close() - } - }() - w := NewFailureDeferringWriter(ofile) - w.Printf("Hello, world!\n") - w.Printf("2 + 2 = %d\n", 4) - if w.Error() != nil { - t.Fatalf("got unexpected error writing to %s: %s\n", tpath, w.Error().Error()) - } - err = ofile.Close() - ofile = nil - if err != nil { - t.Fatalf("error on closing OutputFile for %s: %s\n", tpath, err.Error()) - } - var ifile *InputFile - ifile, err = OpenInputFile(tpath) - defer ifile.Close() - expected := "Hello, world!\n2 + 2 = 4\n" - buf := make([]byte, len(expected)) - _, err = io.ReadAtLeast(ifile, buf, len(buf)) - if err != nil { - t.Fatalf("unexpected error on reading %s: %s\n", tpath, err.Error()) - } - str := string(buf) - if str != expected { - t.Fatalf("Could not read back what we wrote to %s.\n"+ - "Got:\n%s\nExpected:\n%s\n", tpath, str, expected) - } -} - -type LimitedBufferWriter struct { - buf []byte - off int -} - -const LIMITED_BUFFER_MESSAGE = "There isn't enough buffer to go around!" - -func (w *LimitedBufferWriter) Write(p []byte) (int, error) { - var nwritten int - for i := range p { - if w.off >= len(w.buf) { - return nwritten, errors.New(LIMITED_BUFFER_MESSAGE) - } - w.buf[w.off] = p[i] - w.off = w.off + 1 - nwritten++ - } - return nwritten, nil -} - -func TestFailureDeferringWriter(t *testing.T) { - lw := LimitedBufferWriter{buf: make([]byte, 20), off: 0} - w := NewFailureDeferringWriter(&lw) - w.Printf("Zippity do dah #%d\n", 1) - w.Printf("Zippity do dah #%d\n", 2) - if w.Error() == nil { - t.Fatalf("expected FailureDeferringWriter to experience a failure due to " + - "limited buffer size, but it did not.") - } - if w.Error().Error() != LIMITED_BUFFER_MESSAGE { - t.Fatalf("expected FailureDeferringWriter to have the error message %s, but "+ - "the message was %s\n", LIMITED_BUFFER_MESSAGE, w.Error().Error()) - } - expected := "Zippity do dah #1\nZi" - if string(lw.buf) != expected { - t.Fatalf("expected LimitedBufferWriter to contain %s, but it contained %s "+ - "instead.\n", expected, string(lw.buf)) - } -} - -func TestReadSpans(t *testing.T) { - SPAN_TEST_STR := `{"i":"bdd6d4ee48de59bf","s":"c0681027d3ea4928",` + - `"b":1424736225037,"e":1424736225901,"d":"ClientNamenodeProtocol#getFileInfo",` + - `"r":"FsShell","p":["60538dfb4df91418"]} -{"i":"bdd6d4ee48de59bf","s":"60538dfb4df91418","b":1424736224969,` + - `"e":1424736225960,"d":"getFileInfo","r":"FsShell","p":[],"n":{"path":"/"}} -` - r := strings.NewReader(SPAN_TEST_STR) - spans, err := readSpans(r) - if err != nil { - t.Fatalf("Failed to read spans from string via readSpans: %s\n", err.Error()) - } - SPAN_TEST_EXPECTED := common.SpanSlice{ - &common.Span{ - Id: test.SpanId("c0681027d3ea4928"), - SpanData: common.SpanData{ - TraceId: test.SpanId("bdd6d4ee48de59bf"), - Begin: 1424736225037, - End: 1424736225901, - Description: "ClientNamenodeProtocol#getFileInfo", - ProcessId: "FsShell", - Parents: []common.SpanId{test.SpanId("60538dfb4df91418")}, - }, - }, - &common.Span{ - Id: test.SpanId("60538dfb4df91418"), - SpanData: common.SpanData{ - TraceId: test.SpanId("bdd6d4ee48de59bf"), - Begin: 1424736224969, - End: 1424736225960, - Description: "getFileInfo", - ProcessId: "FsShell", - Parents: []common.SpanId{}, - Info: common.TraceInfoMap{ - "path": "/", - }, - }, - }, - } - if len(spans) != len(SPAN_TEST_EXPECTED) { - t.Fatalf("Expected %d spans, but got %d\n", - len(SPAN_TEST_EXPECTED), len(spans)) - } - for i := range SPAN_TEST_EXPECTED { - common.ExpectSpansEqual(t, spans[i], SPAN_TEST_EXPECTED[i]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph.go deleted file mode 100644 index dabf2df..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph.go +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bufio" - "errors" - "fmt" - "io" - "org/apache/htrace/common" - "os" - "sort" -) - -// Create a dotfile from a json file. -func jsonSpanFileToDotFile(jsonFile string, dotFile string) error { - spans, err := readSpansFile(jsonFile) - if err != nil { - return errors.New(fmt.Sprintf("error reading %s: %s", - jsonFile, err.Error())) - } - var file *OutputFile - file, err = CreateOutputFile(dotFile) - if err != nil { - return errors.New(fmt.Sprintf("error opening %s for write: %s", - dotFile, err.Error())) - } - defer func() { - if file != nil { - file.Close() - } - }() - writer := bufio.NewWriter(file) - err = spansToDot(spans, writer) - if err != nil { - return err - } - err = writer.Flush() - if err != nil { - return err - } - err = file.Close() - file = nil - return err -} - -// Create output in dotfile format from a set of spans. -func spansToDot(spans common.SpanSlice, writer io.Writer) error { - sort.Sort(spans) - idMap := make(map[common.SpanId]*common.Span) - for i := range spans { - span := spans[i] - if idMap[span.Id] != nil { - fmt.Fprintf(os.Stderr, "There were multiple spans listed which "+ - "had ID %s.\nFirst:%s\nOther:%s\n", span.Id.String(), - idMap[span.Id].ToJson(), span.ToJson()) - } else { - idMap[span.Id] = span - } - } - childMap := make(map[common.SpanId]common.SpanSlice) - for i := range spans { - child := spans[i] - for j := range child.Parents { - parent := idMap[child.Parents[j]] - if parent == nil { - fmt.Fprintf(os.Stderr, "Can't find parent id %s for %s\n", - child.Parents[j].String(), child.ToJson()) - } else { - children := childMap[parent.Id] - if children == nil { - children = make(common.SpanSlice, 0) - } - children = append(children, child) - childMap[parent.Id] = children - } - } - } - w := NewFailureDeferringWriter(writer) - w.Printf("digraph spans {\n") - // Write out the nodes with their descriptions. - for i := range spans { - w.Printf(fmt.Sprintf(` "%s" [label="%s"];`+"\n", - spans[i].Id.String(), spans[i].Description)) - } - // Write out the edges between nodes... the parent/children relationships - for i := range spans { - children := childMap[spans[i].Id] - sort.Sort(children) - if children != nil { - for c := range children { - w.Printf(fmt.Sprintf(` "%s" -> "%s";`+"\n", - spans[i].Id.String(), children[c].Id)) - } - } - } - w.Printf("}\n") - return w.Error() -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph_test.go deleted file mode 100644 index 8698a98..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph_test.go +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bytes" - "org/apache/htrace/common" - "org/apache/htrace/test" - "testing" -) - -func TestSpansToDot(t *testing.T) { - TEST_SPANS := common.SpanSlice{ - &common.Span{ - Id: test.SpanId("6af3cc058e5d829d"), - SpanData: common.SpanData{ - TraceId: test.SpanId("0e4716fe911244de"), - Begin: 1424813349020, - End: 1424813349134, - Description: "newDFSInputStream", - ProcessId: "FsShell", - Parents: []common.SpanId{}, - Info: common.TraceInfoMap{ - "path": "/", - }, - }, - }, - &common.Span{ - Id: test.SpanId("75d16cc5b2c07d8a"), - SpanData: common.SpanData{ - TraceId: test.SpanId("0e4716fe911244de"), - Begin: 1424813349025, - End: 1424813349133, - Description: "getBlockLocations", - ProcessId: "FsShell", - Parents: []common.SpanId{test.SpanId("6af3cc058e5d829d")}, - }, - }, - &common.Span{ - Id: test.SpanId("e2c7273efb280a8c"), - SpanData: common.SpanData{ - TraceId: test.SpanId("0e4716fe911244de"), - Begin: 1424813349027, - End: 1424813349073, - Description: "ClientNamenodeProtocol#getBlockLocations", - ProcessId: "FsShell", - Parents: []common.SpanId{test.SpanId("75d16cc5b2c07d8a")}, - }, - }, - } - w := bytes.NewBuffer(make([]byte, 0, 2048)) - err := spansToDot(TEST_SPANS, w) - if err != nil { - t.Fatalf("spansToDot failed: error %s\n", err.Error()) - } - EXPECTED_STR := `digraph spans { - "6af3cc058e5d829d" [label="newDFSInputStream"]; - "75d16cc5b2c07d8a" [label="getBlockLocations"]; - "e2c7273efb280a8c" [label="ClientNamenodeProtocol#getBlockLocations"]; - "6af3cc058e5d829d" -> "75d16cc5b2c07d8a"; - "75d16cc5b2c07d8a" -> "e2c7273efb280a8c"; -} -` - if w.String() != EXPECTED_STR { - t.Fatalf("Expected to get:\n%s\nGot:\n%s\n", EXPECTED_STR, w.String()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htrace/queries.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/queries.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/queries.go deleted file mode 100644 index 4ff246c..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htrace/queries.go +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "encoding/json" - "errors" - "fmt" - htrace "org/apache/htrace/client" - "org/apache/htrace/common" - "strings" - "unicode" -) - -// Convert a string into a whitespace-separated sequence of strings. -func tokenize(str string) []string { - prevQuote := rune(0) - f := func(c rune) bool { - switch { - case c == prevQuote: - prevQuote = rune(0) - return false - case prevQuote != rune(0): - return false - case unicode.In(c, unicode.Quotation_Mark): - prevQuote = c - return false - default: - return unicode.IsSpace(c) - } - } - return strings.FieldsFunc(str, f) -} - -// Parses a query string in the format of a series of -// [TYPE] [OPERATOR] [CONST] tuples, joined by AND statements. -type predicateParser struct { - tokens []string - curToken int -} - -func (ps *predicateParser) Parse() (*common.Predicate, error) { - if ps.curToken > len(ps.tokens) { - return nil, nil - } - if ps.curToken > 0 { - if strings.ToLower(ps.tokens[ps.curToken]) != "and" { - return nil, errors.New(fmt.Sprintf("Error parsing on token %d: "+ - "expected predicates to be joined by 'and', but found '%s'", - ps.curToken, ps.tokens[ps.curToken])) - } - ps.curToken++ - if ps.curToken > len(ps.tokens) { - return nil, errors.New(fmt.Sprintf("Nothing found after 'and' at "+ - "token %d", ps.curToken)) - } - } - field := common.Field(ps.tokens[ps.curToken]) - if !field.IsValid() { - return nil, errors.New(fmt.Sprintf("Invalid field specifier at token %d. "+ - "Can't understand %s. Valid field specifiers are %v", ps.curToken, - ps.tokens[ps.curToken], common.ValidFields())) - } - ps.curToken++ - if ps.curToken > len(ps.tokens) { - return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+ - "at token %d", ps.curToken)) - } - op := common.Op(ps.tokens[ps.curToken]) - if !op.IsValid() { - return nil, errors.New(fmt.Sprintf("Invalid operation specifier at token %d. "+ - "Can't understand %s. Valid operation specifiers are %v", ps.curToken, - ps.tokens[ps.curToken], common.ValidOps())) - } - ps.curToken++ - if ps.curToken > len(ps.tokens) { - return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+ - "at token %d", ps.curToken)) - } - val := ps.tokens[ps.curToken] - return &common.Predicate{Op: op, Field: field, Val: val}, nil -} - -func parseQueryString(str string) ([]common.Predicate, error) { - ps := predicateParser{tokens: tokenize(str)} - preds := make([]common.Predicate, 0) - for { - pred, err := ps.Parse() - if pred == nil { - break - } - if err != nil { - return nil, err - } - } - if len(preds) == 0 { - return nil, errors.New("Empty query string") - } - return preds, nil -} - -// Send a query from a query string. -func doQueryFromString(hcl *htrace.Client, str string, lim int) error { - query := &common.Query{Lim: lim} - var err error - query.Predicates, err = parseQueryString(str) - if err != nil { - return err - } - return doQuery(hcl, query) -} - -// Send a query from a raw JSON string. -func doRawQuery(hcl *htrace.Client, str string) error { - jsonBytes := []byte(str) - var query common.Query - err := json.Unmarshal(jsonBytes, &query) - if err != nil { - return errors.New(fmt.Sprintf("Error parsing provided JSON: %s\n", err.Error())) - } - return doQuery(hcl, &query) -} - -// Send a query. -func doQuery(hcl *htrace.Client, query *common.Query) error { - if *verbose { - qbytes, err := json.Marshal(*query) - if err != nil { - qbytes = []byte("marshaling error: " + err.Error()) - } - fmt.Printf("Sending query: %s\n", string(qbytes)) - } - spans, err := hcl.Query(query) - if err != nil { - return err - } - if *verbose { - fmt.Printf("%d results...\n", len(spans)) - } - for i := range spans { - fmt.Printf("%s\n", spans[i].ToJson()) - } - return nil -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/client_test.go deleted file mode 100644 index 218c1c8..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/client_test.go +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "fmt" - "math/rand" - htrace "org/apache/htrace/client" - "org/apache/htrace/common" - "org/apache/htrace/test" - "sort" - "testing" - "time" -) - -func TestClientGetServerInfo(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerInfo", - DataDirs: make([]string, 2)} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - _, err = hcl.GetServerInfo() - if err != nil { - t.Fatalf("failed to call GetServerInfo: %s", err.Error()) - } -} - -func createRandomTestSpans(amount int) common.SpanSlice { - rnd := rand.New(rand.NewSource(2)) - allSpans := make(common.SpanSlice, amount) - allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0]) - for i := 1; i < amount; i++ { - allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i]) - } - allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)} - return allSpans -} - -func TestClientOperations(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations", - DataDirs: make([]string, 2)} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - - // Create some random trace spans. - NUM_TEST_SPANS := 30 - allSpans := createRandomTestSpans(NUM_TEST_SPANS) - - // Write half of the spans to htraced via the client. - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans[0 : NUM_TEST_SPANS/2], - }) - if err != nil { - t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2, - err.Error()) - } - - // Look up the first half of the spans. They should be found. - var span *common.Span - for i := 0; i < NUM_TEST_SPANS/2; i++ { - span, err = hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - common.ExpectSpansEqual(t, allSpans[i], span) - } - - // Look up the second half of the spans. They should not be found. - for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ { - span, err = hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - if span != nil { - t.Fatalf("Unexpectedly found a span we never write to "+ - "the server: FindSpan(%d) succeeded\n", i) - } - } - - // Test FindChildren - childSpan := allSpans[1] - parentId := childSpan.Parents[0] - var children []common.SpanId - children, err = hcl.FindChildren(parentId, 1) - if err != nil { - t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error()) - } - if len(children) != 1 { - t.Fatalf("FindChildren(%s) returned an invalid number of "+ - "children: expected %d, got %d\n", parentId, 1, len(children)) - } - if children[0] != childSpan.Id { - t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+ - " got %s\n", parentId, childSpan.Id, children[0]) - } - - // Test FindChildren on a span that has no children - childlessSpan := allSpans[NUM_TEST_SPANS/2] - children, err = hcl.FindChildren(childlessSpan.Id, 10) - if err != nil { - t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error()) - } - if len(children) != 0 { - t.Fatalf("FindChildren(%d) returned an invalid number of "+ - "children: expected %d, got %d\n", childlessSpan.Id, 0, len(children)) - } - - // Test Query - var query common.Query - query = common.Query{Lim: 10} - spans, err := hcl.Query(&query) - if err != nil { - t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error()) - } - if len(spans) != 10 { - t.Fatalf("Query({lim: %d}) returned an invalid number of "+ - "children: expected %d, got %d\n", 10, 10, len(spans)) - } -} - -func TestDumpAll(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll", - DataDirs: make([]string, 2)} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - - NUM_TEST_SPANS := 100 - allSpans := createRandomTestSpans(NUM_TEST_SPANS) - sort.Sort(allSpans) - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans, - }) - if err != nil { - t.Fatalf("WriteSpans failed: %s\n", err.Error()) - } - out := make(chan *common.Span, 50) - var dumpErr error - go func() { - dumpErr = hcl.DumpAll(3, out) - }() - var numSpans int - nextLogTime := time.Now().Add(time.Millisecond * 5) - for { - span, channelOpen := <-out - if !channelOpen { - break - } - common.ExpectSpansEqual(t, allSpans[numSpans], span) - numSpans++ - if testing.Verbose() { - now := time.Now() - if !now.Before(nextLogTime) { - nextLogTime = now - nextLogTime = nextLogTime.Add(time.Millisecond * 5) - fmt.Printf("read back %d span(s)...\n", numSpans) - } - } - } - if numSpans != len(allSpans) { - t.Fatalf("expected to read %d spans... but only read %d\n", - len(allSpans), numSpans) - } - if dumpErr != nil { - t.Fatalf("got dump error %s\n", dumpErr.Error()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go deleted file mode 100644 index 0742555..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go +++ /dev/null @@ -1,999 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bytes" - "encoding/gob" - "errors" - "fmt" - "github.com/jmhodges/levigo" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "os" - "strconv" - "strings" - "sync/atomic" -) - -// -// The data store code for HTraced. -// -// This code stores the trace spans. We use levelDB here so that we don't have to store everything -// in memory at all times. The data is sharded across multiple levelDB databases in multiple -// directories. Normally, these multiple directories will be on multiple disk drives. -// -// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data -// coming from many daemons. Durability is not as big a concern as in some data stores, since -// losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package -// for serialization. We assume that there will be many more writes than reads. -// -// Schema -// m -> dataStoreVersion -// s[8-byte-big-endian-sid] -> SpanData -// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {} -// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {} -// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {} -// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {} -// -// Note that span IDs are unsigned 64-bit numbers. -// Begin times, end times, and durations are signed 64-bit numbers. -// In order to get LevelDB to properly compare the signed 64-bit quantities, -// we flip the highest bit. This way, we can get leveldb to view negative -// quantities as less than non-negative ones. This also means that we can do -// all queries using unsigned 64-bit math, rather than having to special-case -// the signed fields. -// - -const UNKNOWN_LAYOUT_VERSION = 0 -const CURRENT_LAYOUT_VERSION = 2 - -var EMPTY_BYTE_BUF []byte = []byte{} - -const VERSION_KEY = 'v' -const SPAN_ID_INDEX_PREFIX = 's' -const BEGIN_TIME_INDEX_PREFIX = 'b' -const END_TIME_INDEX_PREFIX = 'e' -const DURATION_INDEX_PREFIX = 'd' -const PARENT_ID_INDEX_PREFIX = 'p' -const INVALID_INDEX_PREFIX = 0 - -type Statistics struct { - NumSpansWritten uint64 -} - -func (stats *Statistics) IncrementWrittenSpans() { - atomic.AddUint64(&stats.NumSpansWritten, 1) -} - -// Make a copy of the statistics structure, using atomic operations. -func (stats *Statistics) Copy() *Statistics { - return &Statistics{ - NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten), - } -} - -// Translate an 8-byte value into a leveldb key. -func makeKey(tag byte, val uint64) []byte { - return []byte{ - tag, - byte(0xff & (val >> 56)), - byte(0xff & (val >> 48)), - byte(0xff & (val >> 40)), - byte(0xff & (val >> 32)), - byte(0xff & (val >> 24)), - byte(0xff & (val >> 16)), - byte(0xff & (val >> 8)), - byte(0xff & (val >> 0)), - } -} - -func keyToInt(key []byte) uint64 { - var id uint64 - id = (uint64(key[0]) << 56) | - (uint64(key[1]) << 48) | - (uint64(key[2]) << 40) | - (uint64(key[3]) << 32) | - (uint64(key[4]) << 24) | - (uint64(key[5]) << 16) | - (uint64(key[6]) << 8) | - (uint64(key[7]) << 0) - return id -} - -func makeSecondaryKey(tag byte, fir uint64, sec uint64) []byte { - return []byte{ - tag, - byte(0xff & (fir >> 56)), - byte(0xff & (fir >> 48)), - byte(0xff & (fir >> 40)), - byte(0xff & (fir >> 32)), - byte(0xff & (fir >> 24)), - byte(0xff & (fir >> 16)), - byte(0xff & (fir >> 8)), - byte(0xff & (fir >> 0)), - byte(0xff & (sec >> 56)), - byte(0xff & (sec >> 48)), - byte(0xff & (sec >> 40)), - byte(0xff & (sec >> 32)), - byte(0xff & (sec >> 24)), - byte(0xff & (sec >> 16)), - byte(0xff & (sec >> 8)), - byte(0xff & (sec >> 0)), - } -} - -// A single directory containing a levelDB instance. -type shard struct { - // The data store that this shard is part of - store *dataStore - - // The LevelDB instance. - ldb *levigo.DB - - // The path to the leveldb directory this shard is managing. - path string - - // Incoming requests to write Spans. - incoming chan *common.Span - - // The channel we will send a bool to when we exit. - exited chan bool -} - -// Process incoming spans for a shard. -func (shd *shard) processIncoming() { - lg := shd.store.lg - for { - span := <-shd.incoming - if span == nil { - lg.Infof("Shard processor for %s exiting.\n", shd.path) - shd.exited <- true - return - } - err := shd.writeSpan(span) - if err != nil { - lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error()) - } else { - lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson()) - } - } -} - -// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the -// highest bit, so that negative input values map to unsigned numbers which are -// less than non-negative input values. -func s2u64(val int64) uint64 { - ret := uint64(val) - ret ^= 0x8000000000000000 - return ret -} - -func (shd *shard) writeSpan(span *common.Span) error { - batch := levigo.NewWriteBatch() - defer batch.Close() - - // Add SpanData to batch. - spanDataBuf := new(bytes.Buffer) - spanDataEnc := gob.NewEncoder(spanDataBuf) - err := spanDataEnc.Encode(span.SpanData) - if err != nil { - return err - } - batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), spanDataBuf.Bytes()) - - // Add this to the parent index. - for parentIdx := range span.Parents { - batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX, - span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF) - } - - // Add to the other secondary indices. - batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, s2u64(span.Begin), - span.Id.Val()), EMPTY_BYTE_BUF) - batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, s2u64(span.End), - span.Id.Val()), EMPTY_BYTE_BUF) - batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, s2u64(span.Duration()), - span.Id.Val()), EMPTY_BYTE_BUF) - - err = shd.ldb.Write(shd.store.writeOpts, batch) - if err != nil { - return err - } - shd.store.stats.IncrementWrittenSpans() - if shd.store.WrittenSpans != nil { - shd.store.WrittenSpans <- span - } - return nil -} - -func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId, - lim int32) ([]common.SpanId, int32, error) { - searchKey := makeKey('p', sid.Val()) - iter := shd.ldb.NewIterator(shd.store.readOpts) - defer iter.Close() - iter.Seek(searchKey) - for { - if !iter.Valid() { - break - } - if lim == 0 { - break - } - key := iter.Key() - if !bytes.HasPrefix(key, searchKey) { - break - } - id := common.SpanId(keyToInt(key[9:])) - childIds = append(childIds, id) - lim-- - iter.Next() - } - return childIds, lim, nil -} - -// Close a shard. -func (shd *shard) Close() { - lg := shd.store.lg - shd.incoming <- nil - lg.Infof("Waiting for %s to exit...\n", shd.path) - if shd.exited != nil { - <-shd.exited - } - shd.ldb.Close() - lg.Infof("Closed %s...\n", shd.path) -} - -// The Data Store. -type dataStore struct { - lg *common.Logger - - // The shards which manage our LevelDB instances. - shards []*shard - - // I/O statistics for all shards. - stats Statistics - - // The read options to use for LevelDB. - readOpts *levigo.ReadOptions - - // The write options to use for LevelDB. - writeOpts *levigo.WriteOptions - - // If non-null, a channel we will send spans to once we finish writing them. This is only used - // for testing. - WrittenSpans chan *common.Span -} - -func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) { - // Get the configuration. - clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR) - dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) - dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP) - - var err error - lg := common.NewLogger("datastore", cnf) - store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: writtenSpans} - - // If we return an error, close the store. - defer func() { - if err != nil { - store.Close() - store = nil - } - }() - - store.readOpts = levigo.NewReadOptions() - store.readOpts.SetFillCache(true) - store.writeOpts = levigo.NewWriteOptions() - store.writeOpts.SetSync(false) - - // Open all shards - for idx := range dirs { - path := dirs[idx] + conf.PATH_SEP + "db" - var shd *shard - shd, err = CreateShard(store, cnf, path, clearStored) - if err != nil { - lg.Errorf("Error creating shard %s: %s\n", path, err.Error()) - return nil, err - } - store.shards = append(store.shards, shd) - } - for idx := range store.shards { - shd := store.shards[idx] - shd.exited = make(chan bool, 1) - go shd.processIncoming() - } - return store, nil -} - -func CreateShard(store *dataStore, cnf *conf.Config, path string, - clearStored bool) (*shard, error) { - lg := store.lg - if clearStored { - fi, err := os.Stat(path) - if err != nil && !os.IsNotExist(err) { - lg.Errorf("Failed to stat %s: %s\n", path, err.Error()) - return nil, err - } - if fi != nil { - err = os.RemoveAll(path) - if err != nil { - lg.Errorf("Failed to clear existing datastore directory %s: %s\n", - path, err.Error()) - return nil, err - } - lg.Infof("Cleared existing datastore directory %s\n", path) - } - } - err := os.MkdirAll(path, 0777) - if err != nil { - lg.Errorf("Failed to MkdirAll(%s): %s\n", path, err.Error()) - return nil, err - } - var shd *shard - openOpts := levigo.NewOptions() - defer openOpts.Close() - newlyCreated := false - ldb, err := levigo.Open(path, openOpts) - if err == nil { - store.lg.Infof("LevelDB opened %s\n", path) - } else { - store.lg.Debugf("LevelDB failed to open %s: %s\n", path, err.Error()) - openOpts.SetCreateIfMissing(true) - ldb, err = levigo.Open(path, openOpts) - if err != nil { - store.lg.Errorf("LevelDB failed to create %s: %s\n", path, err.Error()) - return nil, err - } - store.lg.Infof("Created new LevelDB instance in %s\n", path) - newlyCreated = true - } - defer func() { - if shd == nil { - ldb.Close() - } - }() - lv, err := readLayoutVersion(store, ldb) - if err != nil { - store.lg.Errorf("Got error while reading datastore version for %s: %s\n", - path, err.Error()) - return nil, err - } - if newlyCreated && (lv == UNKNOWN_LAYOUT_VERSION) { - err = writeDataStoreVersion(store, ldb, CURRENT_LAYOUT_VERSION) - if err != nil { - store.lg.Errorf("Got error while writing datastore version for %s: %s\n", - path, err.Error()) - return nil, err - } - store.lg.Tracef("Wrote layout version %d to shard at %s.\n", - CURRENT_LAYOUT_VERSION, path) - } else if lv != CURRENT_LAYOUT_VERSION { - versionName := "unknown" - if lv != UNKNOWN_LAYOUT_VERSION { - versionName = fmt.Sprintf("%d", lv) - } - store.lg.Errorf("Can't read old datastore. Its layout version is %s, but this "+ - "software is at layout version %d. Please set %s to clear the datastore "+ - "on startup, or clear it manually.\n", versionName, - CURRENT_LAYOUT_VERSION, conf.HTRACE_DATA_STORE_CLEAR) - return nil, errors.New(fmt.Sprintf("Invalid layout version: got %s, expected %d.", - versionName, CURRENT_LAYOUT_VERSION)) - } else { - store.lg.Tracef("Found layout version %d in %s.\n", lv, path) - } - spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) - shd = &shard{store: store, ldb: ldb, path: path, - incoming: make(chan *common.Span, spanBufferSize)} - return shd, nil -} - -// Read the datastore version of a leveldb instance. -func readLayoutVersion(store *dataStore, ldb *levigo.DB) (uint32, error) { - buf, err := ldb.Get(store.readOpts, []byte{VERSION_KEY}) - if err != nil { - return 0, err - } - if len(buf) == 0 { - return 0, nil - } - r := bytes.NewBuffer(buf) - decoder := gob.NewDecoder(r) - var v uint32 - err = decoder.Decode(&v) - if err != nil { - return 0, err - } - return v, nil -} - -// Write the datastore version to a shard. -func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error { - w := new(bytes.Buffer) - encoder := gob.NewEncoder(w) - err := encoder.Encode(&v) - if err != nil { - return err - } - return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes()) -} - -func (store *dataStore) GetStatistics() *Statistics { - return store.stats.Copy() -} - -// Close the DataStore. -func (store *dataStore) Close() { - for idx := range store.shards { - store.shards[idx].Close() - store.shards[idx] = nil - } - if store.readOpts != nil { - store.readOpts.Close() - store.readOpts = nil - } - if store.writeOpts != nil { - store.writeOpts.Close() - store.writeOpts = nil - } - if store.lg != nil { - store.lg.Close() - store.lg = nil - } -} - -// Get the index of the shard which stores the given spanId. -func (store *dataStore) getShardIndex(sid common.SpanId) int { - return int(sid.Val() % uint64(len(store.shards))) -} - -func (store *dataStore) WriteSpan(span *common.Span) { - store.shards[store.getShardIndex(span.Id)].incoming <- span -} - -func (store *dataStore) FindSpan(sid common.SpanId) *common.Span { - return store.shards[store.getShardIndex(sid)].FindSpan(sid) -} - -func (shd *shard) FindSpan(sid common.SpanId) *common.Span { - lg := shd.store.lg - buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid.Val())) - if err != nil { - if strings.Index(err.Error(), "NotFound:") != -1 { - return nil - } - lg.Warnf("Shard(%s): FindSpan(%s) error: %s\n", - shd.path, sid.String(), err.Error()) - return nil - } - var span *common.Span - span, err = shd.decodeSpan(sid, buf) - if err != nil { - lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s\n", - shd.path, sid.String(), err.Error()) - return nil - } - return span -} - -func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) { - r := bytes.NewBuffer(buf) - decoder := gob.NewDecoder(r) - data := common.SpanData{} - err := decoder.Decode(&data) - if err != nil { - return nil, err - } - // Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with - // non-nil slices. - if data.Parents == nil { - data.Parents = []common.SpanId{} - } - return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil -} - -// Find the children of a given span id. -func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId { - childIds := make([]common.SpanId, 0) - var err error - - startIdx := store.getShardIndex(sid) - idx := startIdx - numShards := len(store.shards) - for { - if lim == 0 { - break - } - shd := store.shards[idx] - childIds, lim, err = shd.FindChildren(sid, childIds, lim) - if err != nil { - store.lg.Errorf("Shard(%s): FindChildren(%s) error: %s\n", - shd.path, sid.String(), err.Error()) - } - idx++ - if idx >= numShards { - idx = 0 - } - if idx == startIdx { - break - } - } - return childIds -} - -type predicateData struct { - *common.Predicate - uintKey uint64 - strKey string -} - -func loadPredicateData(pred *common.Predicate) (*predicateData, error) { - p := predicateData{Predicate: pred} - - // Parse the input value given to make sure it matches up with the field - // type. - switch pred.Field { - case common.SPAN_ID: - // Span IDs are sent as hex strings. - var id common.SpanId - if err := id.FromString(pred.Val); err != nil { - return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s", - pred.Val, err.Error())) - } - p.uintKey = id.Val() - break - case common.DESCRIPTION: - // Any string is valid for a description. - p.strKey = pred.Val - break - case common.BEGIN_TIME, common.END_TIME, common.DURATION: - // Parse a base-10 signed numeric field. - v, err := strconv.ParseInt(pred.Val, 10, 64) - if err != nil { - return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s", - pred.Field, pred.Val, err.Error())) - } - p.uintKey = s2u64(v) - break - default: - return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field)) - } - - // Validate the predicate operation. - switch pred.Op { - case common.EQUALS, common.LESS_THAN_OR_EQUALS, - common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN: - break - case common.CONTAINS: - if p.fieldIsNumeric() { - return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+ - "numeric field like '%s'", pred.Field)) - } - default: - return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'", - pred.Op)) - } - - return &p, nil -} - -// Get the index prefix for this predicate, or 0 if it is not indexed. -func (pred *predicateData) getIndexPrefix() byte { - switch pred.Field { - case common.SPAN_ID: - return SPAN_ID_INDEX_PREFIX - case common.BEGIN_TIME: - return BEGIN_TIME_INDEX_PREFIX - case common.END_TIME: - return END_TIME_INDEX_PREFIX - case common.DURATION: - return DURATION_INDEX_PREFIX - default: - return INVALID_INDEX_PREFIX - } -} - -// Returns true if the predicate type is numeric. -func (pred *predicateData) fieldIsNumeric() bool { - switch pred.Field { - case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION: - return true - default: - return false - } -} - -// Get the values that this predicate cares about for a given span. -func (pred *predicateData) extractRelevantSpanData(span *common.Span) (uint64, string) { - switch pred.Field { - case common.SPAN_ID: - return span.Id.Val(), "" - case common.DESCRIPTION: - return 0, span.Description - case common.BEGIN_TIME: - return s2u64(span.Begin), "" - case common.END_TIME: - return s2u64(span.End), "" - case common.DURATION: - return s2u64(span.Duration()), "" - default: - panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field)) - } -} - -func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool { - // nil is after everything. - if a == nil { - if b == nil { - return false - } - return false - } else if b == nil { - return true - } - // Compare the spans according to this predicate. - aInt, aStr := pred.extractRelevantSpanData(a) - bInt, bStr := pred.extractRelevantSpanData(b) - if pred.fieldIsNumeric() { - if pred.Op.IsDescending() { - return aInt > bInt - } else { - return aInt < bInt - } - } else { - if pred.Op.IsDescending() { - return aStr > bStr - } else { - return aStr < bStr - } - } -} - -// Returns true if the predicate is satisfied by the given span. -func (pred *predicateData) satisfiedBy(span *common.Span) bool { - intVal, strVal := pred.extractRelevantSpanData(span) - if pred.fieldIsNumeric() { - switch pred.Op { - case common.EQUALS: - return intVal == pred.uintKey - case common.LESS_THAN_OR_EQUALS: - return intVal <= pred.uintKey - case common.GREATER_THAN_OR_EQUALS: - return intVal >= pred.uintKey - case common.GREATER_THAN: - return intVal > pred.uintKey - default: - panic(fmt.Sprintf("unknown Op type %s should have been caught "+ - "during normalization", pred.Op)) - } - } else { - switch pred.Op { - case common.CONTAINS: - return strings.Contains(strVal, pred.strKey) - case common.EQUALS: - return strVal == pred.strKey - case common.LESS_THAN_OR_EQUALS: - return strVal <= pred.strKey - case common.GREATER_THAN_OR_EQUALS: - return strVal >= pred.strKey - case common.GREATER_THAN: - return strVal > pred.strKey - default: - panic(fmt.Sprintf("unknown Op type %s should have been caught "+ - "during normalization", pred.Op)) - } - } -} - -func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) { - var ret *source - src := source{store: store, - pred: pred, - iters: make([]*levigo.Iterator, 0, len(store.shards)), - nexts: make([]*common.Span, len(store.shards)), - numRead: make([]int, len(store.shards)), - keyPrefix: pred.getIndexPrefix(), - } - if src.keyPrefix == INVALID_INDEX_PREFIX { - return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+ - "predicate on field %s", pred.Field)) - } - defer func() { - if ret == nil { - src.Close() - } - }() - for shardIdx := range store.shards { - shd := store.shards[shardIdx] - src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts)) - } - var searchKey []byte - lg := store.lg - if prev != nil { - // If prev != nil, this query RPC is the continuation of a previous - // one. The final result returned the last time is 'prev'. - // - // To avoid returning the same results multiple times, we adjust the - // predicate here. If the predicate is on the span id field, we - // simply manipulate the span ID we're looking for. - // - // If the predicate is on a secondary index, we also use span ID, but - // in a slightly different way. Since the secondary indices are - // organized as [type-code][8b-secondary-key][8b-span-id], elements - // with the same secondary index field are ordered by span ID. So we - // create a 17-byte key incorporating the span ID from 'prev.' - var startId common.SpanId - switch pred.Op { - case common.EQUALS: - if pred.Field == common.SPAN_ID { - // This is an annoying corner case. There can only be one - // result each time we do an EQUALS search for a span id. - // Span id is the primary key for all our spans. - // But for some reason someone is asking for another result. - // We modify the query to search for the illegal 0 span ID, - // which will never be present. - lg.Debugf("Attempted to use a continuation token with an EQUALS "+ - "SPAN_ID query. %s. Setting search id = 0", - pred.Predicate.String()) - startId = 0 - } else { - // When doing an EQUALS search on a secondary index, the - // results are sorted by span id. - startId = prev.Id + 1 - } - case common.LESS_THAN_OR_EQUALS: - // Subtract one from the previous span id. Since the previous - // start ID will never be 0 (0 is an illegal span id), we'll never - // wrap around when doing this. - startId = prev.Id - 1 - case common.GREATER_THAN_OR_EQUALS: - // We can't add one to the span id, since the previous span ID - // might be the maximum value. So just switch over to using - // GREATER_THAN. - pred.Op = common.GREATER_THAN - startId = prev.Id - case common.GREATER_THAN: - // This one is easy. - startId = prev.Id - default: - str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String()) - lg.Error(str + "\n") - panic(str) - } - if pred.Field == common.SPAN_ID { - pred.uintKey = uint64(startId) - searchKey = makeKey(src.keyPrefix, uint64(startId)) - } else { - // Start where the previous query left off. This means adjusting - // our uintKey. - pred.uintKey, _ = pred.extractRelevantSpanData(prev) - searchKey = makeSecondaryKey(src.keyPrefix, pred.uintKey, uint64(startId)) - } - if lg.TraceEnabled() { - lg.Tracef("Handling continuation token %s for %s. startId=%d, "+ - "pred.uintKey=%d\n", prev, pred.Predicate.String(), startId, - pred.uintKey) - } - } else { - searchKey = makeKey(src.keyPrefix, pred.uintKey) - } - for i := range src.iters { - src.iters[i].Seek(searchKey) - } - ret = &src - return ret, nil -} - -// A source of spans. -type source struct { - store *dataStore - pred *predicateData - iters []*levigo.Iterator - nexts []*common.Span - numRead []int - keyPrefix byte -} - -// Return true if this operation may require skipping the first result we get back from leveldb. -func mayRequireOneSkip(op common.Op) bool { - switch op { - // When dealing with descending predicates, the first span we read might not satisfy - // the predicate, even though subsequent ones will. This is because the iter.Seek() - // function "moves the iterator the position of the key given or, if the key doesn't - // exist, the next key that does exist in the database." So if we're on that "next - // key" it will not satisfy the predicate, but the keys previous to it might. - case common.LESS_THAN_OR_EQUALS: - return true - // iter.Seek basically takes us to the key which is "greater than or equal to" some - // value. Since we want greater than (not greater than or equal to) we may have to - // skip the first key. - case common.GREATER_THAN: - return true - } - return false -} - -// Fill in the entry in the 'next' array for a specific shard. -func (src *source) populateNextFromShard(shardIdx int) { - lg := src.store.lg - var err error - iter := src.iters[shardIdx] - if iter == nil { - lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx) - return // There are no more entries in this shard. - } - if src.nexts[shardIdx] != nil { - lg.Debugf("No need to populate shard %d\n", shardIdx) - return // We already have a valid entry for this shard. - } - for { - if !iter.Valid() { - lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx) - break // Can't read past end of DB - } - src.numRead[shardIdx]++ - key := iter.Key() - if !bytes.HasPrefix(key, []byte{src.keyPrefix}) { - lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s\n", - shardIdx, string(src.keyPrefix)) - break // Can't read past end of indexed section - } - var span *common.Span - var sid common.SpanId - if src.keyPrefix == SPAN_ID_INDEX_PREFIX { - // The span id maps to the span itself. - sid = common.SpanId(keyToInt(key[1:])) - span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value()) - if err != nil { - lg.Debugf("Internal error decoding span %s in shard %d: %s\n", - sid.String(), shardIdx, err.Error()) - break - } - } else { - // With a secondary index, we have to look up the span by id. - sid = common.SpanId(keyToInt(key[9:])) - span = src.store.shards[shardIdx].FindSpan(sid) - if span == nil { - lg.Debugf("Internal error rehydrating span %s in shard %d\n", - sid.String(), shardIdx) - break - } - } - if src.pred.Op.IsDescending() { - iter.Prev() - } else { - iter.Next() - } - if src.pred.satisfiedBy(span) { - lg.Debugf("Populated valid span %v from shard %d.\n", sid, shardIdx) - src.nexts[shardIdx] = span // Found valid entry - return - } else { - lg.Debugf("Span %s from shard %d does not satisfy the predicate.\n", - sid.String(), shardIdx) - if src.numRead[shardIdx] <= 1 && mayRequireOneSkip(src.pred.Op) { - continue - } - // This and subsequent entries don't satisfy predicate - break - } - } - lg.Debugf("Closing iterator for shard %d.\n", shardIdx) - iter.Close() - src.iters[shardIdx] = nil -} - -func (src *source) next() *common.Span { - for shardIdx := range src.iters { - src.populateNextFromShard(shardIdx) - } - var best *common.Span - bestIdx := -1 - for shardIdx := range src.iters { - span := src.nexts[shardIdx] - if src.pred.spanPtrIsBefore(span, best) { - best = span - bestIdx = shardIdx - } - } - if bestIdx >= 0 { - src.nexts[bestIdx] = nil - } - return best -} - -func (src *source) Close() { - for i := range src.iters { - if src.iters[i] != nil { - src.iters[i].Close() - } - } - src.iters = nil -} - -func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) { - // Read spans from the first predicate that is indexed. - p := *preds - for i := range p { - pred := p[i] - if pred.getIndexPrefix() != INVALID_INDEX_PREFIX { - *preds = append(p[0:i], p[i+1:]...) - return pred.createSource(store, span) - } - } - // If there are no predicates that are indexed, read rows in order of span id. - spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS, - Field: common.SPAN_ID, - Val: "0000000000000000", - } - spanIdPredData, err := loadPredicateData(&spanIdPred) - if err != nil { - return nil, err - } - return spanIdPredData.createSource(store, span) -} - -func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) { - lg := store.lg - // Parse predicate data. - var err error - preds := make([]*predicateData, len(query.Predicates)) - for i := range query.Predicates { - preds[i], err = loadPredicateData(&query.Predicates[i]) - if err != nil { - return nil, err - } - } - // Get a source of rows. - var src *source - src, err = store.obtainSource(&preds, query.Prev) - if err != nil { - return nil, err - } - defer src.Close() - lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src) - - // Filter the spans through the remaining predicates. - ret := make([]*common.Span, 0, 32) - for { - if len(ret) >= query.Lim { - break // we hit the result size limit - } - span := src.next() - if span == nil { - break // the source has no more spans to give - } - if lg.DebugEnabled() { - lg.Debugf("src.next returned span %s\n", span.ToJson()) - } - satisfied := true - for predIdx := range preds { - if !preds[predIdx].satisfiedBy(span) { - satisfied = false - break - } - } - if satisfied { - ret = append(ret, span) - } - } - return ret, nil -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go deleted file mode 100644 index 4696547..0000000 --- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go +++ /dev/null @@ -1,514 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bytes" - "encoding/json" - "math/rand" - htrace "org/apache/htrace/client" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "org/apache/htrace/test" - "os" - "sort" - "strings" - "testing" -) - -// Test creating and tearing down a datastore. -func TestCreateDatastore(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore", - DataDirs: make([]string, 3)} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() -} - -var SIMPLE_TEST_SPANS []common.Span = []common.Span{ - common.Span{Id: 1, - SpanData: common.SpanData{ - Begin: 123, - End: 456, - Description: "getFileDescriptors", - TraceId: 999, - Parents: []common.SpanId{}, - ProcessId: "firstd", - }}, - common.Span{Id: 2, - SpanData: common.SpanData{ - Begin: 125, - End: 200, - Description: "openFd", - TraceId: 999, - Parents: []common.SpanId{1}, - ProcessId: "secondd", - }}, - common.Span{Id: 3, - SpanData: common.SpanData{ - Begin: 200, - End: 456, - Description: "passFd", - TraceId: 999, - Parents: []common.SpanId{1}, - ProcessId: "thirdd", - }}, -} - -func createSpans(spans []common.Span, store *dataStore) { - for idx := range spans { - store.WriteSpan(&spans[idx]) - } - // Wait the spans to be created - for i := 0; i < 3; i++ { - <-store.WrittenSpans - } -} - -// Test creating a datastore and adding some spans. -func TestDatastoreWriteAndRead(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead", - WrittenSpans: make(chan *common.Span, 100)} - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } - span := ht.Store.FindSpan(1) - if span == nil { - t.Fatal() - } - if span.Id != 1 { - t.Fatal() - } - common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span) - children := ht.Store.FindChildren(1, 1) - if len(children) != 1 { - t.Fatalf("expected 1 child, but got %d\n", len(children)) - } - children = ht.Store.FindChildren(1, 2) - if len(children) != 2 { - t.Fatalf("expected 2 children, but got %d\n", len(children)) - } - sort.Sort(common.SpanIdSlice(children)) - if children[0] != 2 { - t.Fatal() - } - if children[1] != 3 { - t.Fatal() - } -} - -func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query, - expectedSpans []common.Span) { - spans, err := ht.Store.HandleQuery(query) - if err != nil { - t.Fatalf("First query failed: %s\n", err.Error()) - } - expectedBuf := new(bytes.Buffer) - dec := json.NewEncoder(expectedBuf) - err = dec.Encode(expectedSpans) - if err != nil { - t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error()) - } - spansBuf := new(bytes.Buffer) - dec = json.NewEncoder(spansBuf) - err = dec.Encode(spans) - if err != nil { - t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error()) - } - t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans), - len(expectedSpans)) - common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes())) -} - -// Test queries on the datastore. -func TestSimpleQuery(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery", - WrittenSpans: make(chan *common.Span, 100)} - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: "125", - }, - }, - Lim: 5, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) -} - -func TestQueries2(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueries2", - WrittenSpans: make(chan *common.Span, 100)} - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: "125", - }, - }, - Lim: 5, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: "125", - }, - common.Predicate{ - Op: common.EQUALS, - Field: common.DESCRIPTION, - Val: "getFileDescriptors", - }, - }, - Lim: 2, - }, []common.Span{SIMPLE_TEST_SPANS[0]}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.EQUALS, - Field: common.DESCRIPTION, - Val: "getFileDescriptors", - }, - }, - Lim: 2, - }, []common.Span{SIMPLE_TEST_SPANS[0]}) -} - -func TestQueries3(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueries3", - WrittenSpans: make(chan *common.Span, 100)} - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.CONTAINS, - Field: common.DESCRIPTION, - Val: "Fd", - }, - common.Predicate{ - Op: common.GREATER_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: "100", - }, - }, - Lim: 5, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.SPAN_ID, - Val: "0", - }, - }, - Lim: 200, - }, []common.Span{}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.SPAN_ID, - Val: "2", - }, - }, - Lim: 200, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) -} - -func TestQueries4(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueries4", - WrittenSpans: make(chan *common.Span, 100)} - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.BEGIN_TIME, - Val: "125", - }, - }, - Lim: 5, - }, []common.Span{SIMPLE_TEST_SPANS[2]}) - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN_OR_EQUALS, - Field: common.DESCRIPTION, - Val: "openFd", - }, - }, - Lim: 2, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.DESCRIPTION, - Val: "openFd", - }, - }, - Lim: 2, - }, []common.Span{SIMPLE_TEST_SPANS[2]}) -} - -func BenchmarkDatastoreWrites(b *testing.B) { - htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites", - WrittenSpans: make(chan *common.Span, b.N)} - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - rnd := rand.New(rand.NewSource(1)) - allSpans := make([]*common.Span, b.N) - // Write many random spans. - for n := 0; n < b.N; n++ { - span := test.NewRandomSpan(rnd, allSpans[0:n]) - ht.Store.WriteSpan(span) - allSpans[n] = span - } - // Wait for all the spans to be written. - for n := 0; n < b.N; n++ { - <-ht.Store.WrittenSpans - } - spansWritten := ht.Store.GetStatistics().NumSpansWritten - if spansWritten < uint64(b.N) { - b.Fatal("incorrect statistics: expected %d spans to be written, but only got %d", - b.N, spansWritten) - } -} - -func TestReloadDataStore(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore", - DataDirs: make([]string, 2), KeepDataDirsOnClose: true} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - dataDirs := make([]string, len(ht.DataDirs)) - copy(dataDirs, ht.DataDirs) - defer func() { - if ht != nil { - ht.Close() - } - for i := range dataDirs { - os.RemoveAll(dataDirs[i]) - } - }() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - - // Create some random trace spans. - NUM_TEST_SPANS := 5 - allSpans := createRandomTestSpans(NUM_TEST_SPANS) - err = hcl.WriteSpans(&common.WriteSpansReq{ - Spans: allSpans, - }) - if err != nil { - t.Fatalf("WriteSpans failed: %s\n", err.Error()) - } - - // Look up the spans we wrote. - var span *common.Span - for i := 0; i < NUM_TEST_SPANS; i++ { - span, err = hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - common.ExpectSpansEqual(t, allSpans[i], span) - } - - ht.Close() - ht = nil - - htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore2", - DataDirs: dataDirs, KeepDataDirsOnClose: true} - ht, err = htraceBld.Build() - if err != nil { - t.Fatalf("failed to re-create datastore: %s", err.Error()) - } - hcl, err = htrace.NewClient(ht.ClientConf()) - if err != nil { - t.Fatalf("failed to re-create client: %s", err.Error()) - } - - // Look up the spans we wrote earlier. - for i := 0; i < NUM_TEST_SPANS; i++ { - span, err = hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - common.ExpectSpansEqual(t, allSpans[i], span) - } - - // Set an old datastore version number. - for i := range ht.Store.shards { - shard := ht.Store.shards[i] - writeDataStoreVersion(ht.Store, shard.ldb, CURRENT_LAYOUT_VERSION-1) - } - ht.Close() - ht = nil - - htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore3", - DataDirs: dataDirs, KeepDataDirsOnClose: true} - ht, err = htraceBld.Build() - if err == nil { - t.Fatalf("expected the datastore to fail to load after setting an " + - "incorrect version.\n") - } - if !strings.Contains(err.Error(), "Invalid layout version") { - t.Fatal(`expected the loading error to contain "invalid layout version"` + "\n") - } - - // It should work with data.store.clear set. - htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore4", - DataDirs: dataDirs, KeepDataDirsOnClose: true, - Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}} - ht, err = htraceBld.Build() - if err != nil { - t.Fatalf("expected the datastore loading to succeed after setting an "+ - "incorrect version. But it failed with error %s\n", err.Error()) - } -} - -func TestQueriesWithContinuationTokens1(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1", - WrittenSpans: make(chan *common.Span, 100)} - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } - // Adding a prev value to this query excludes the first result that we - // would normally get. - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.BEGIN_TIME, - Val: "120", - }, - }, - Lim: 5, - Prev: &SIMPLE_TEST_SPANS[0], - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) - - // There is only one result from an EQUALS query on SPAN_ID. - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.EQUALS, - Field: common.SPAN_ID, - Val: "1", - }, - }, - Lim: 100, - Prev: &SIMPLE_TEST_SPANS[0], - }, []common.Span{}) - - // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the - // span we pass as a continuation token. (Primary index edition). - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.SPAN_ID, - Val: "2", - }, - }, - Lim: 100, - Prev: &SIMPLE_TEST_SPANS[1], - }, []common.Span{SIMPLE_TEST_SPANS[0]}) - - // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back the - // span we pass as a continuation token. (Secondary index edition). - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.DURATION, - Val: "0", - }, - }, - Lim: 100, - Prev: &SIMPLE_TEST_SPANS[1], - }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]}) -}
