http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/file.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/file.go b/htrace-htraced/go/src/org/apache/htrace/htrace/file.go new file mode 100644 index 0000000..ea214be --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htrace/file.go @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "org/apache/htrace/common" + "os" +) + +// A file used for input. +// Transparently supports using stdin for input. +type InputFile struct { + *os.File + path string +} + +// Open an input file. Stdin will be used when path is - +func OpenInputFile(path string) (*InputFile, error) { + if path == "-" { + return &InputFile{File: os.Stdin, path: path}, nil + } + file, err := os.Open(path) + if err != nil { + return nil, err + } + return &InputFile{File: file, path: path}, nil +} + +func (file *InputFile) Close() { + if file.path != "-" { + file.File.Close() + } +} + +// A file used for output. +// Transparently supports using stdout for output. +type OutputFile struct { + *os.File + path string +} + +// Create an output file. Stdout will be used when path is - +func CreateOutputFile(path string) (*OutputFile, error) { + if path == "-" { + return &OutputFile{File: os.Stdout, path: path}, nil + } + file, err := os.Create(path) + if err != nil { + return nil, err + } + return &OutputFile{File: file, path: path}, nil +} + +func (file *OutputFile) Close() error { + if file.path != "-" { + return file.File.Close() + } + return nil +} + +// FailureDeferringWriter is a writer which allows us to call Printf multiple +// times and then check if all the printfs succeeded at the very end, rather +// than checking after each call. We will not attempt to write more data +// after the first write failure. +type FailureDeferringWriter struct { + io.Writer + err error +} + +func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter { + return &FailureDeferringWriter{writer, nil} +} + +func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) { + if w.err != nil { + return + } + str := fmt.Sprintf(format, v...) + _, err := w.Writer.Write([]byte(str)) + if err != nil { + w.err = err + } +} + +func (w *FailureDeferringWriter) Error() error { + return w.err +} + +// Read a file full of whitespace-separated span JSON into a slice of spans. +func readSpansFile(path string) (common.SpanSlice, error) { + file, err := OpenInputFile(path) + if err != nil { + return nil, err + } + defer file.Close() + return readSpans(bufio.NewReader(file)) +} + +// Read whitespace-separated span JSON into a slice of spans. +func readSpans(reader io.Reader) (common.SpanSlice, error) { + spans := make(common.SpanSlice, 0) + dec := json.NewDecoder(reader) + for { + var span common.Span + err := dec.Decode(&span) + if err != nil { + if err != io.EOF { + return nil, errors.New(fmt.Sprintf("Decode error after decoding %d "+ + "span(s): %s", len(spans), err.Error())) + } + break + } + spans = append(spans, &span) + } + return spans, nil +}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/file_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/file_test.go b/htrace-htraced/go/src/org/apache/htrace/htrace/file_test.go new file mode 100644 index 0000000..b6f9cac --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htrace/file_test.go @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "errors" + "io" + "io/ioutil" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "org/apache/htrace/test" + "os" + "strings" + "testing" +) + +func TestInputFileAndOutputFile(t *testing.T) { + tdir, err := ioutil.TempDir(os.TempDir(), "TestInputFileAndOutputFile") + if err != nil { + t.Fatalf("failed to create TempDir: %s\n", err.Error()) + } + defer os.RemoveAll(tdir) + tpath := tdir + conf.PATH_SEP + "test" + var ofile *OutputFile + ofile, err = CreateOutputFile(tpath) + if err != nil { + t.Fatalf("failed to create OutputFile at %s: %s\n", tpath, err.Error()) + } + defer func() { + if ofile != nil { + ofile.Close() + } + }() + w := NewFailureDeferringWriter(ofile) + w.Printf("Hello, world!\n") + w.Printf("2 + 2 = %d\n", 4) + if w.Error() != nil { + t.Fatalf("got unexpected error writing to %s: %s\n", tpath, w.Error().Error()) + } + err = ofile.Close() + ofile = nil + if err != nil { + t.Fatalf("error on closing OutputFile for %s: %s\n", tpath, err.Error()) + } + var ifile *InputFile + ifile, err = OpenInputFile(tpath) + defer ifile.Close() + expected := "Hello, world!\n2 + 2 = 4\n" + buf := make([]byte, len(expected)) + _, err = io.ReadAtLeast(ifile, buf, len(buf)) + if err != nil { + t.Fatalf("unexpected error on reading %s: %s\n", tpath, err.Error()) + } + str := string(buf) + if str != expected { + t.Fatalf("Could not read back what we wrote to %s.\n"+ + "Got:\n%s\nExpected:\n%s\n", tpath, str, expected) + } +} + +type LimitedBufferWriter struct { + buf []byte + off int +} + +const LIMITED_BUFFER_MESSAGE = "There isn't enough buffer to go around!" + +func (w *LimitedBufferWriter) Write(p []byte) (int, error) { + var nwritten int + for i := range p { + if w.off >= len(w.buf) { + return nwritten, errors.New(LIMITED_BUFFER_MESSAGE) + } + w.buf[w.off] = p[i] + w.off = w.off + 1 + nwritten++ + } + return nwritten, nil +} + +func TestFailureDeferringWriter(t *testing.T) { + lw := LimitedBufferWriter{buf: make([]byte, 20), off: 0} + w := NewFailureDeferringWriter(&lw) + w.Printf("Zippity do dah #%d\n", 1) + w.Printf("Zippity do dah #%d\n", 2) + if w.Error() == nil { + t.Fatalf("expected FailureDeferringWriter to experience a failure due to " + + "limited buffer size, but it did not.") + } + if w.Error().Error() != LIMITED_BUFFER_MESSAGE { + t.Fatalf("expected FailureDeferringWriter to have the error message %s, but "+ + "the message was %s\n", LIMITED_BUFFER_MESSAGE, w.Error().Error()) + } + expected := "Zippity do dah #1\nZi" + if string(lw.buf) != expected { + t.Fatalf("expected LimitedBufferWriter to contain %s, but it contained %s "+ + "instead.\n", expected, string(lw.buf)) + } +} + +func TestReadSpans(t *testing.T) { + SPAN_TEST_STR := `{"i":"bdd6d4ee48de59bf","s":"c0681027d3ea4928",` + + `"b":1424736225037,"e":1424736225901,"d":"ClientNamenodeProtocol#getFileInfo",` + + `"r":"FsShell","p":["60538dfb4df91418"]} +{"i":"bdd6d4ee48de59bf","s":"60538dfb4df91418","b":1424736224969,` + + `"e":1424736225960,"d":"getFileInfo","r":"FsShell","p":[],"n":{"path":"/"}} +` + r := strings.NewReader(SPAN_TEST_STR) + spans, err := readSpans(r) + if err != nil { + t.Fatalf("Failed to read spans from string via readSpans: %s\n", err.Error()) + } + SPAN_TEST_EXPECTED := common.SpanSlice{ + &common.Span{ + Id: test.SpanId("c0681027d3ea4928"), + SpanData: common.SpanData{ + TraceId: test.SpanId("bdd6d4ee48de59bf"), + Begin: 1424736225037, + End: 1424736225901, + Description: "ClientNamenodeProtocol#getFileInfo", + ProcessId: "FsShell", + Parents: []common.SpanId{test.SpanId("60538dfb4df91418")}, + }, + }, + &common.Span{ + Id: test.SpanId("60538dfb4df91418"), + SpanData: common.SpanData{ + TraceId: test.SpanId("bdd6d4ee48de59bf"), + Begin: 1424736224969, + End: 1424736225960, + Description: "getFileInfo", + ProcessId: "FsShell", + Parents: []common.SpanId{}, + Info: common.TraceInfoMap{ + "path": "/", + }, + }, + }, + } + if len(spans) != len(SPAN_TEST_EXPECTED) { + t.Fatalf("Expected %d spans, but got %d\n", + len(SPAN_TEST_EXPECTED), len(spans)) + } + for i := range SPAN_TEST_EXPECTED { + common.ExpectSpansEqual(t, spans[i], SPAN_TEST_EXPECTED[i]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/graph.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/graph.go b/htrace-htraced/go/src/org/apache/htrace/htrace/graph.go new file mode 100644 index 0000000..dabf2df --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htrace/graph.go @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "errors" + "fmt" + "io" + "org/apache/htrace/common" + "os" + "sort" +) + +// Create a dotfile from a json file. +func jsonSpanFileToDotFile(jsonFile string, dotFile string) error { + spans, err := readSpansFile(jsonFile) + if err != nil { + return errors.New(fmt.Sprintf("error reading %s: %s", + jsonFile, err.Error())) + } + var file *OutputFile + file, err = CreateOutputFile(dotFile) + if err != nil { + return errors.New(fmt.Sprintf("error opening %s for write: %s", + dotFile, err.Error())) + } + defer func() { + if file != nil { + file.Close() + } + }() + writer := bufio.NewWriter(file) + err = spansToDot(spans, writer) + if err != nil { + return err + } + err = writer.Flush() + if err != nil { + return err + } + err = file.Close() + file = nil + return err +} + +// Create output in dotfile format from a set of spans. +func spansToDot(spans common.SpanSlice, writer io.Writer) error { + sort.Sort(spans) + idMap := make(map[common.SpanId]*common.Span) + for i := range spans { + span := spans[i] + if idMap[span.Id] != nil { + fmt.Fprintf(os.Stderr, "There were multiple spans listed which "+ + "had ID %s.\nFirst:%s\nOther:%s\n", span.Id.String(), + idMap[span.Id].ToJson(), span.ToJson()) + } else { + idMap[span.Id] = span + } + } + childMap := make(map[common.SpanId]common.SpanSlice) + for i := range spans { + child := spans[i] + for j := range child.Parents { + parent := idMap[child.Parents[j]] + if parent == nil { + fmt.Fprintf(os.Stderr, "Can't find parent id %s for %s\n", + child.Parents[j].String(), child.ToJson()) + } else { + children := childMap[parent.Id] + if children == nil { + children = make(common.SpanSlice, 0) + } + children = append(children, child) + childMap[parent.Id] = children + } + } + } + w := NewFailureDeferringWriter(writer) + w.Printf("digraph spans {\n") + // Write out the nodes with their descriptions. + for i := range spans { + w.Printf(fmt.Sprintf(` "%s" [label="%s"];`+"\n", + spans[i].Id.String(), spans[i].Description)) + } + // Write out the edges between nodes... the parent/children relationships + for i := range spans { + children := childMap[spans[i].Id] + sort.Sort(children) + if children != nil { + for c := range children { + w.Printf(fmt.Sprintf(` "%s" -> "%s";`+"\n", + spans[i].Id.String(), children[c].Id)) + } + } + } + w.Printf("}\n") + return w.Error() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/graph_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/graph_test.go b/htrace-htraced/go/src/org/apache/htrace/htrace/graph_test.go new file mode 100644 index 0000000..8698a98 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htrace/graph_test.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "org/apache/htrace/common" + "org/apache/htrace/test" + "testing" +) + +func TestSpansToDot(t *testing.T) { + TEST_SPANS := common.SpanSlice{ + &common.Span{ + Id: test.SpanId("6af3cc058e5d829d"), + SpanData: common.SpanData{ + TraceId: test.SpanId("0e4716fe911244de"), + Begin: 1424813349020, + End: 1424813349134, + Description: "newDFSInputStream", + ProcessId: "FsShell", + Parents: []common.SpanId{}, + Info: common.TraceInfoMap{ + "path": "/", + }, + }, + }, + &common.Span{ + Id: test.SpanId("75d16cc5b2c07d8a"), + SpanData: common.SpanData{ + TraceId: test.SpanId("0e4716fe911244de"), + Begin: 1424813349025, + End: 1424813349133, + Description: "getBlockLocations", + ProcessId: "FsShell", + Parents: []common.SpanId{test.SpanId("6af3cc058e5d829d")}, + }, + }, + &common.Span{ + Id: test.SpanId("e2c7273efb280a8c"), + SpanData: common.SpanData{ + TraceId: test.SpanId("0e4716fe911244de"), + Begin: 1424813349027, + End: 1424813349073, + Description: "ClientNamenodeProtocol#getBlockLocations", + ProcessId: "FsShell", + Parents: []common.SpanId{test.SpanId("75d16cc5b2c07d8a")}, + }, + }, + } + w := bytes.NewBuffer(make([]byte, 0, 2048)) + err := spansToDot(TEST_SPANS, w) + if err != nil { + t.Fatalf("spansToDot failed: error %s\n", err.Error()) + } + EXPECTED_STR := `digraph spans { + "6af3cc058e5d829d" [label="newDFSInputStream"]; + "75d16cc5b2c07d8a" [label="getBlockLocations"]; + "e2c7273efb280a8c" [label="ClientNamenodeProtocol#getBlockLocations"]; + "6af3cc058e5d829d" -> "75d16cc5b2c07d8a"; + "75d16cc5b2c07d8a" -> "e2c7273efb280a8c"; +} +` + if w.String() != EXPECTED_STR { + t.Fatalf("Expected to get:\n%s\nGot:\n%s\n", EXPECTED_STR, w.String()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/queries.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/queries.go b/htrace-htraced/go/src/org/apache/htrace/htrace/queries.go new file mode 100644 index 0000000..4ff246c --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htrace/queries.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "encoding/json" + "errors" + "fmt" + htrace "org/apache/htrace/client" + "org/apache/htrace/common" + "strings" + "unicode" +) + +// Convert a string into a whitespace-separated sequence of strings. +func tokenize(str string) []string { + prevQuote := rune(0) + f := func(c rune) bool { + switch { + case c == prevQuote: + prevQuote = rune(0) + return false + case prevQuote != rune(0): + return false + case unicode.In(c, unicode.Quotation_Mark): + prevQuote = c + return false + default: + return unicode.IsSpace(c) + } + } + return strings.FieldsFunc(str, f) +} + +// Parses a query string in the format of a series of +// [TYPE] [OPERATOR] [CONST] tuples, joined by AND statements. +type predicateParser struct { + tokens []string + curToken int +} + +func (ps *predicateParser) Parse() (*common.Predicate, error) { + if ps.curToken > len(ps.tokens) { + return nil, nil + } + if ps.curToken > 0 { + if strings.ToLower(ps.tokens[ps.curToken]) != "and" { + return nil, errors.New(fmt.Sprintf("Error parsing on token %d: "+ + "expected predicates to be joined by 'and', but found '%s'", + ps.curToken, ps.tokens[ps.curToken])) + } + ps.curToken++ + if ps.curToken > len(ps.tokens) { + return nil, errors.New(fmt.Sprintf("Nothing found after 'and' at "+ + "token %d", ps.curToken)) + } + } + field := common.Field(ps.tokens[ps.curToken]) + if !field.IsValid() { + return nil, errors.New(fmt.Sprintf("Invalid field specifier at token %d. "+ + "Can't understand %s. Valid field specifiers are %v", ps.curToken, + ps.tokens[ps.curToken], common.ValidFields())) + } + ps.curToken++ + if ps.curToken > len(ps.tokens) { + return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+ + "at token %d", ps.curToken)) + } + op := common.Op(ps.tokens[ps.curToken]) + if !op.IsValid() { + return nil, errors.New(fmt.Sprintf("Invalid operation specifier at token %d. "+ + "Can't understand %s. Valid operation specifiers are %v", ps.curToken, + ps.tokens[ps.curToken], common.ValidOps())) + } + ps.curToken++ + if ps.curToken > len(ps.tokens) { + return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+ + "at token %d", ps.curToken)) + } + val := ps.tokens[ps.curToken] + return &common.Predicate{Op: op, Field: field, Val: val}, nil +} + +func parseQueryString(str string) ([]common.Predicate, error) { + ps := predicateParser{tokens: tokenize(str)} + preds := make([]common.Predicate, 0) + for { + pred, err := ps.Parse() + if pred == nil { + break + } + if err != nil { + return nil, err + } + } + if len(preds) == 0 { + return nil, errors.New("Empty query string") + } + return preds, nil +} + +// Send a query from a query string. +func doQueryFromString(hcl *htrace.Client, str string, lim int) error { + query := &common.Query{Lim: lim} + var err error + query.Predicates, err = parseQueryString(str) + if err != nil { + return err + } + return doQuery(hcl, query) +} + +// Send a query from a raw JSON string. +func doRawQuery(hcl *htrace.Client, str string) error { + jsonBytes := []byte(str) + var query common.Query + err := json.Unmarshal(jsonBytes, &query) + if err != nil { + return errors.New(fmt.Sprintf("Error parsing provided JSON: %s\n", err.Error())) + } + return doQuery(hcl, &query) +} + +// Send a query. +func doQuery(hcl *htrace.Client, query *common.Query) error { + if *verbose { + qbytes, err := json.Marshal(*query) + if err != nil { + qbytes = []byte("marshaling error: " + err.Error()) + } + fmt.Printf("Sending query: %s\n", string(qbytes)) + } + spans, err := hcl.Query(query) + if err != nil { + return err + } + if *verbose { + fmt.Printf("%d results...\n", len(spans)) + } + for i := range spans { + fmt.Printf("%s\n", spans[i].ToJson()) + } + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go new file mode 100644 index 0000000..218c1c8 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "fmt" + "math/rand" + htrace "org/apache/htrace/client" + "org/apache/htrace/common" + "org/apache/htrace/test" + "sort" + "testing" + "time" +) + +func TestClientGetServerInfo(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerInfo", + DataDirs: make([]string, 2)} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + _, err = hcl.GetServerInfo() + if err != nil { + t.Fatalf("failed to call GetServerInfo: %s", err.Error()) + } +} + +func createRandomTestSpans(amount int) common.SpanSlice { + rnd := rand.New(rand.NewSource(2)) + allSpans := make(common.SpanSlice, amount) + allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0]) + for i := 1; i < amount; i++ { + allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i]) + } + allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)} + return allSpans +} + +func TestClientOperations(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations", + DataDirs: make([]string, 2)} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + + // Create some random trace spans. + NUM_TEST_SPANS := 30 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + + // Write half of the spans to htraced via the client. + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans[0 : NUM_TEST_SPANS/2], + }) + if err != nil { + t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2, + err.Error()) + } + + // Look up the first half of the spans. They should be found. + var span *common.Span + for i := 0; i < NUM_TEST_SPANS/2; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + + // Look up the second half of the spans. They should not be found. + for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + if span != nil { + t.Fatalf("Unexpectedly found a span we never write to "+ + "the server: FindSpan(%d) succeeded\n", i) + } + } + + // Test FindChildren + childSpan := allSpans[1] + parentId := childSpan.Parents[0] + var children []common.SpanId + children, err = hcl.FindChildren(parentId, 1) + if err != nil { + t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error()) + } + if len(children) != 1 { + t.Fatalf("FindChildren(%s) returned an invalid number of "+ + "children: expected %d, got %d\n", parentId, 1, len(children)) + } + if children[0] != childSpan.Id { + t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+ + " got %s\n", parentId, childSpan.Id, children[0]) + } + + // Test FindChildren on a span that has no children + childlessSpan := allSpans[NUM_TEST_SPANS/2] + children, err = hcl.FindChildren(childlessSpan.Id, 10) + if err != nil { + t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error()) + } + if len(children) != 0 { + t.Fatalf("FindChildren(%d) returned an invalid number of "+ + "children: expected %d, got %d\n", childlessSpan.Id, 0, len(children)) + } + + // Test Query + var query common.Query + query = common.Query{Lim: 10} + spans, err := hcl.Query(&query) + if err != nil { + t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error()) + } + if len(spans) != 10 { + t.Fatalf("Query({lim: %d}) returned an invalid number of "+ + "children: expected %d, got %d\n", 10, 10, len(spans)) + } +} + +func TestDumpAll(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll", + DataDirs: make([]string, 2)} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + + NUM_TEST_SPANS := 100 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + sort.Sort(allSpans) + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans, + }) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) + } + out := make(chan *common.Span, 50) + var dumpErr error + go func() { + dumpErr = hcl.DumpAll(3, out) + }() + var numSpans int + nextLogTime := time.Now().Add(time.Millisecond * 5) + for { + span, channelOpen := <-out + if !channelOpen { + break + } + common.ExpectSpansEqual(t, allSpans[numSpans], span) + numSpans++ + if testing.Verbose() { + now := time.Now() + if !now.Before(nextLogTime) { + nextLogTime = now + nextLogTime = nextLogTime.Add(time.Millisecond * 5) + fmt.Printf("read back %d span(s)...\n", numSpans) + } + } + } + if numSpans != len(allSpans) { + t.Fatalf("expected to read %d spans... but only read %d\n", + len(allSpans), numSpans) + } + if dumpErr != nil { + t.Fatalf("got dump error %s\n", dumpErr.Error()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go new file mode 100644 index 0000000..0742555 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -0,0 +1,999 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "encoding/gob" + "errors" + "fmt" + "github.com/jmhodges/levigo" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "os" + "strconv" + "strings" + "sync/atomic" +) + +// +// The data store code for HTraced. +// +// This code stores the trace spans. We use levelDB here so that we don't have to store everything +// in memory at all times. The data is sharded across multiple levelDB databases in multiple +// directories. Normally, these multiple directories will be on multiple disk drives. +// +// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data +// coming from many daemons. Durability is not as big a concern as in some data stores, since +// losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package +// for serialization. We assume that there will be many more writes than reads. +// +// Schema +// m -> dataStoreVersion +// s[8-byte-big-endian-sid] -> SpanData +// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {} +// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {} +// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {} +// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {} +// +// Note that span IDs are unsigned 64-bit numbers. +// Begin times, end times, and durations are signed 64-bit numbers. +// In order to get LevelDB to properly compare the signed 64-bit quantities, +// we flip the highest bit. This way, we can get leveldb to view negative +// quantities as less than non-negative ones. This also means that we can do +// all queries using unsigned 64-bit math, rather than having to special-case +// the signed fields. +// + +const UNKNOWN_LAYOUT_VERSION = 0 +const CURRENT_LAYOUT_VERSION = 2 + +var EMPTY_BYTE_BUF []byte = []byte{} + +const VERSION_KEY = 'v' +const SPAN_ID_INDEX_PREFIX = 's' +const BEGIN_TIME_INDEX_PREFIX = 'b' +const END_TIME_INDEX_PREFIX = 'e' +const DURATION_INDEX_PREFIX = 'd' +const PARENT_ID_INDEX_PREFIX = 'p' +const INVALID_INDEX_PREFIX = 0 + +type Statistics struct { + NumSpansWritten uint64 +} + +func (stats *Statistics) IncrementWrittenSpans() { + atomic.AddUint64(&stats.NumSpansWritten, 1) +} + +// Make a copy of the statistics structure, using atomic operations. +func (stats *Statistics) Copy() *Statistics { + return &Statistics{ + NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten), + } +} + +// Translate an 8-byte value into a leveldb key. +func makeKey(tag byte, val uint64) []byte { + return []byte{ + tag, + byte(0xff & (val >> 56)), + byte(0xff & (val >> 48)), + byte(0xff & (val >> 40)), + byte(0xff & (val >> 32)), + byte(0xff & (val >> 24)), + byte(0xff & (val >> 16)), + byte(0xff & (val >> 8)), + byte(0xff & (val >> 0)), + } +} + +func keyToInt(key []byte) uint64 { + var id uint64 + id = (uint64(key[0]) << 56) | + (uint64(key[1]) << 48) | + (uint64(key[2]) << 40) | + (uint64(key[3]) << 32) | + (uint64(key[4]) << 24) | + (uint64(key[5]) << 16) | + (uint64(key[6]) << 8) | + (uint64(key[7]) << 0) + return id +} + +func makeSecondaryKey(tag byte, fir uint64, sec uint64) []byte { + return []byte{ + tag, + byte(0xff & (fir >> 56)), + byte(0xff & (fir >> 48)), + byte(0xff & (fir >> 40)), + byte(0xff & (fir >> 32)), + byte(0xff & (fir >> 24)), + byte(0xff & (fir >> 16)), + byte(0xff & (fir >> 8)), + byte(0xff & (fir >> 0)), + byte(0xff & (sec >> 56)), + byte(0xff & (sec >> 48)), + byte(0xff & (sec >> 40)), + byte(0xff & (sec >> 32)), + byte(0xff & (sec >> 24)), + byte(0xff & (sec >> 16)), + byte(0xff & (sec >> 8)), + byte(0xff & (sec >> 0)), + } +} + +// A single directory containing a levelDB instance. +type shard struct { + // The data store that this shard is part of + store *dataStore + + // The LevelDB instance. + ldb *levigo.DB + + // The path to the leveldb directory this shard is managing. + path string + + // Incoming requests to write Spans. + incoming chan *common.Span + + // The channel we will send a bool to when we exit. + exited chan bool +} + +// Process incoming spans for a shard. +func (shd *shard) processIncoming() { + lg := shd.store.lg + for { + span := <-shd.incoming + if span == nil { + lg.Infof("Shard processor for %s exiting.\n", shd.path) + shd.exited <- true + return + } + err := shd.writeSpan(span) + if err != nil { + lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error()) + } else { + lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson()) + } + } +} + +// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the +// highest bit, so that negative input values map to unsigned numbers which are +// less than non-negative input values. +func s2u64(val int64) uint64 { + ret := uint64(val) + ret ^= 0x8000000000000000 + return ret +} + +func (shd *shard) writeSpan(span *common.Span) error { + batch := levigo.NewWriteBatch() + defer batch.Close() + + // Add SpanData to batch. + spanDataBuf := new(bytes.Buffer) + spanDataEnc := gob.NewEncoder(spanDataBuf) + err := spanDataEnc.Encode(span.SpanData) + if err != nil { + return err + } + batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), spanDataBuf.Bytes()) + + // Add this to the parent index. + for parentIdx := range span.Parents { + batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX, + span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF) + } + + // Add to the other secondary indices. + batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, s2u64(span.Begin), + span.Id.Val()), EMPTY_BYTE_BUF) + batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, s2u64(span.End), + span.Id.Val()), EMPTY_BYTE_BUF) + batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, s2u64(span.Duration()), + span.Id.Val()), EMPTY_BYTE_BUF) + + err = shd.ldb.Write(shd.store.writeOpts, batch) + if err != nil { + return err + } + shd.store.stats.IncrementWrittenSpans() + if shd.store.WrittenSpans != nil { + shd.store.WrittenSpans <- span + } + return nil +} + +func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId, + lim int32) ([]common.SpanId, int32, error) { + searchKey := makeKey('p', sid.Val()) + iter := shd.ldb.NewIterator(shd.store.readOpts) + defer iter.Close() + iter.Seek(searchKey) + for { + if !iter.Valid() { + break + } + if lim == 0 { + break + } + key := iter.Key() + if !bytes.HasPrefix(key, searchKey) { + break + } + id := common.SpanId(keyToInt(key[9:])) + childIds = append(childIds, id) + lim-- + iter.Next() + } + return childIds, lim, nil +} + +// Close a shard. +func (shd *shard) Close() { + lg := shd.store.lg + shd.incoming <- nil + lg.Infof("Waiting for %s to exit...\n", shd.path) + if shd.exited != nil { + <-shd.exited + } + shd.ldb.Close() + lg.Infof("Closed %s...\n", shd.path) +} + +// The Data Store. +type dataStore struct { + lg *common.Logger + + // The shards which manage our LevelDB instances. + shards []*shard + + // I/O statistics for all shards. + stats Statistics + + // The read options to use for LevelDB. + readOpts *levigo.ReadOptions + + // The write options to use for LevelDB. + writeOpts *levigo.WriteOptions + + // If non-null, a channel we will send spans to once we finish writing them. This is only used + // for testing. + WrittenSpans chan *common.Span +} + +func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) { + // Get the configuration. + clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR) + dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) + dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP) + + var err error + lg := common.NewLogger("datastore", cnf) + store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: writtenSpans} + + // If we return an error, close the store. + defer func() { + if err != nil { + store.Close() + store = nil + } + }() + + store.readOpts = levigo.NewReadOptions() + store.readOpts.SetFillCache(true) + store.writeOpts = levigo.NewWriteOptions() + store.writeOpts.SetSync(false) + + // Open all shards + for idx := range dirs { + path := dirs[idx] + conf.PATH_SEP + "db" + var shd *shard + shd, err = CreateShard(store, cnf, path, clearStored) + if err != nil { + lg.Errorf("Error creating shard %s: %s\n", path, err.Error()) + return nil, err + } + store.shards = append(store.shards, shd) + } + for idx := range store.shards { + shd := store.shards[idx] + shd.exited = make(chan bool, 1) + go shd.processIncoming() + } + return store, nil +} + +func CreateShard(store *dataStore, cnf *conf.Config, path string, + clearStored bool) (*shard, error) { + lg := store.lg + if clearStored { + fi, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + lg.Errorf("Failed to stat %s: %s\n", path, err.Error()) + return nil, err + } + if fi != nil { + err = os.RemoveAll(path) + if err != nil { + lg.Errorf("Failed to clear existing datastore directory %s: %s\n", + path, err.Error()) + return nil, err + } + lg.Infof("Cleared existing datastore directory %s\n", path) + } + } + err := os.MkdirAll(path, 0777) + if err != nil { + lg.Errorf("Failed to MkdirAll(%s): %s\n", path, err.Error()) + return nil, err + } + var shd *shard + openOpts := levigo.NewOptions() + defer openOpts.Close() + newlyCreated := false + ldb, err := levigo.Open(path, openOpts) + if err == nil { + store.lg.Infof("LevelDB opened %s\n", path) + } else { + store.lg.Debugf("LevelDB failed to open %s: %s\n", path, err.Error()) + openOpts.SetCreateIfMissing(true) + ldb, err = levigo.Open(path, openOpts) + if err != nil { + store.lg.Errorf("LevelDB failed to create %s: %s\n", path, err.Error()) + return nil, err + } + store.lg.Infof("Created new LevelDB instance in %s\n", path) + newlyCreated = true + } + defer func() { + if shd == nil { + ldb.Close() + } + }() + lv, err := readLayoutVersion(store, ldb) + if err != nil { + store.lg.Errorf("Got error while reading datastore version for %s: %s\n", + path, err.Error()) + return nil, err + } + if newlyCreated && (lv == UNKNOWN_LAYOUT_VERSION) { + err = writeDataStoreVersion(store, ldb, CURRENT_LAYOUT_VERSION) + if err != nil { + store.lg.Errorf("Got error while writing datastore version for %s: %s\n", + path, err.Error()) + return nil, err + } + store.lg.Tracef("Wrote layout version %d to shard at %s.\n", + CURRENT_LAYOUT_VERSION, path) + } else if lv != CURRENT_LAYOUT_VERSION { + versionName := "unknown" + if lv != UNKNOWN_LAYOUT_VERSION { + versionName = fmt.Sprintf("%d", lv) + } + store.lg.Errorf("Can't read old datastore. Its layout version is %s, but this "+ + "software is at layout version %d. Please set %s to clear the datastore "+ + "on startup, or clear it manually.\n", versionName, + CURRENT_LAYOUT_VERSION, conf.HTRACE_DATA_STORE_CLEAR) + return nil, errors.New(fmt.Sprintf("Invalid layout version: got %s, expected %d.", + versionName, CURRENT_LAYOUT_VERSION)) + } else { + store.lg.Tracef("Found layout version %d in %s.\n", lv, path) + } + spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) + shd = &shard{store: store, ldb: ldb, path: path, + incoming: make(chan *common.Span, spanBufferSize)} + return shd, nil +} + +// Read the datastore version of a leveldb instance. +func readLayoutVersion(store *dataStore, ldb *levigo.DB) (uint32, error) { + buf, err := ldb.Get(store.readOpts, []byte{VERSION_KEY}) + if err != nil { + return 0, err + } + if len(buf) == 0 { + return 0, nil + } + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + var v uint32 + err = decoder.Decode(&v) + if err != nil { + return 0, err + } + return v, nil +} + +// Write the datastore version to a shard. +func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + err := encoder.Encode(&v) + if err != nil { + return err + } + return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes()) +} + +func (store *dataStore) GetStatistics() *Statistics { + return store.stats.Copy() +} + +// Close the DataStore. +func (store *dataStore) Close() { + for idx := range store.shards { + store.shards[idx].Close() + store.shards[idx] = nil + } + if store.readOpts != nil { + store.readOpts.Close() + store.readOpts = nil + } + if store.writeOpts != nil { + store.writeOpts.Close() + store.writeOpts = nil + } + if store.lg != nil { + store.lg.Close() + store.lg = nil + } +} + +// Get the index of the shard which stores the given spanId. +func (store *dataStore) getShardIndex(sid common.SpanId) int { + return int(sid.Val() % uint64(len(store.shards))) +} + +func (store *dataStore) WriteSpan(span *common.Span) { + store.shards[store.getShardIndex(span.Id)].incoming <- span +} + +func (store *dataStore) FindSpan(sid common.SpanId) *common.Span { + return store.shards[store.getShardIndex(sid)].FindSpan(sid) +} + +func (shd *shard) FindSpan(sid common.SpanId) *common.Span { + lg := shd.store.lg + buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid.Val())) + if err != nil { + if strings.Index(err.Error(), "NotFound:") != -1 { + return nil + } + lg.Warnf("Shard(%s): FindSpan(%s) error: %s\n", + shd.path, sid.String(), err.Error()) + return nil + } + var span *common.Span + span, err = shd.decodeSpan(sid, buf) + if err != nil { + lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s\n", + shd.path, sid.String(), err.Error()) + return nil + } + return span +} + +func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + data := common.SpanData{} + err := decoder.Decode(&data) + if err != nil { + return nil, err + } + // Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with + // non-nil slices. + if data.Parents == nil { + data.Parents = []common.SpanId{} + } + return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil +} + +// Find the children of a given span id. +func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId { + childIds := make([]common.SpanId, 0) + var err error + + startIdx := store.getShardIndex(sid) + idx := startIdx + numShards := len(store.shards) + for { + if lim == 0 { + break + } + shd := store.shards[idx] + childIds, lim, err = shd.FindChildren(sid, childIds, lim) + if err != nil { + store.lg.Errorf("Shard(%s): FindChildren(%s) error: %s\n", + shd.path, sid.String(), err.Error()) + } + idx++ + if idx >= numShards { + idx = 0 + } + if idx == startIdx { + break + } + } + return childIds +} + +type predicateData struct { + *common.Predicate + uintKey uint64 + strKey string +} + +func loadPredicateData(pred *common.Predicate) (*predicateData, error) { + p := predicateData{Predicate: pred} + + // Parse the input value given to make sure it matches up with the field + // type. + switch pred.Field { + case common.SPAN_ID: + // Span IDs are sent as hex strings. + var id common.SpanId + if err := id.FromString(pred.Val); err != nil { + return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s", + pred.Val, err.Error())) + } + p.uintKey = id.Val() + break + case common.DESCRIPTION: + // Any string is valid for a description. + p.strKey = pred.Val + break + case common.BEGIN_TIME, common.END_TIME, common.DURATION: + // Parse a base-10 signed numeric field. + v, err := strconv.ParseInt(pred.Val, 10, 64) + if err != nil { + return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s", + pred.Field, pred.Val, err.Error())) + } + p.uintKey = s2u64(v) + break + default: + return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field)) + } + + // Validate the predicate operation. + switch pred.Op { + case common.EQUALS, common.LESS_THAN_OR_EQUALS, + common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN: + break + case common.CONTAINS: + if p.fieldIsNumeric() { + return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+ + "numeric field like '%s'", pred.Field)) + } + default: + return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'", + pred.Op)) + } + + return &p, nil +} + +// Get the index prefix for this predicate, or 0 if it is not indexed. +func (pred *predicateData) getIndexPrefix() byte { + switch pred.Field { + case common.SPAN_ID: + return SPAN_ID_INDEX_PREFIX + case common.BEGIN_TIME: + return BEGIN_TIME_INDEX_PREFIX + case common.END_TIME: + return END_TIME_INDEX_PREFIX + case common.DURATION: + return DURATION_INDEX_PREFIX + default: + return INVALID_INDEX_PREFIX + } +} + +// Returns true if the predicate type is numeric. +func (pred *predicateData) fieldIsNumeric() bool { + switch pred.Field { + case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION: + return true + default: + return false + } +} + +// Get the values that this predicate cares about for a given span. +func (pred *predicateData) extractRelevantSpanData(span *common.Span) (uint64, string) { + switch pred.Field { + case common.SPAN_ID: + return span.Id.Val(), "" + case common.DESCRIPTION: + return 0, span.Description + case common.BEGIN_TIME: + return s2u64(span.Begin), "" + case common.END_TIME: + return s2u64(span.End), "" + case common.DURATION: + return s2u64(span.Duration()), "" + default: + panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field)) + } +} + +func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool { + // nil is after everything. + if a == nil { + if b == nil { + return false + } + return false + } else if b == nil { + return true + } + // Compare the spans according to this predicate. + aInt, aStr := pred.extractRelevantSpanData(a) + bInt, bStr := pred.extractRelevantSpanData(b) + if pred.fieldIsNumeric() { + if pred.Op.IsDescending() { + return aInt > bInt + } else { + return aInt < bInt + } + } else { + if pred.Op.IsDescending() { + return aStr > bStr + } else { + return aStr < bStr + } + } +} + +// Returns true if the predicate is satisfied by the given span. +func (pred *predicateData) satisfiedBy(span *common.Span) bool { + intVal, strVal := pred.extractRelevantSpanData(span) + if pred.fieldIsNumeric() { + switch pred.Op { + case common.EQUALS: + return intVal == pred.uintKey + case common.LESS_THAN_OR_EQUALS: + return intVal <= pred.uintKey + case common.GREATER_THAN_OR_EQUALS: + return intVal >= pred.uintKey + case common.GREATER_THAN: + return intVal > pred.uintKey + default: + panic(fmt.Sprintf("unknown Op type %s should have been caught "+ + "during normalization", pred.Op)) + } + } else { + switch pred.Op { + case common.CONTAINS: + return strings.Contains(strVal, pred.strKey) + case common.EQUALS: + return strVal == pred.strKey + case common.LESS_THAN_OR_EQUALS: + return strVal <= pred.strKey + case common.GREATER_THAN_OR_EQUALS: + return strVal >= pred.strKey + case common.GREATER_THAN: + return strVal > pred.strKey + default: + panic(fmt.Sprintf("unknown Op type %s should have been caught "+ + "during normalization", pred.Op)) + } + } +} + +func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) { + var ret *source + src := source{store: store, + pred: pred, + iters: make([]*levigo.Iterator, 0, len(store.shards)), + nexts: make([]*common.Span, len(store.shards)), + numRead: make([]int, len(store.shards)), + keyPrefix: pred.getIndexPrefix(), + } + if src.keyPrefix == INVALID_INDEX_PREFIX { + return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+ + "predicate on field %s", pred.Field)) + } + defer func() { + if ret == nil { + src.Close() + } + }() + for shardIdx := range store.shards { + shd := store.shards[shardIdx] + src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts)) + } + var searchKey []byte + lg := store.lg + if prev != nil { + // If prev != nil, this query RPC is the continuation of a previous + // one. The final result returned the last time is 'prev'. + // + // To avoid returning the same results multiple times, we adjust the + // predicate here. If the predicate is on the span id field, we + // simply manipulate the span ID we're looking for. + // + // If the predicate is on a secondary index, we also use span ID, but + // in a slightly different way. Since the secondary indices are + // organized as [type-code][8b-secondary-key][8b-span-id], elements + // with the same secondary index field are ordered by span ID. So we + // create a 17-byte key incorporating the span ID from 'prev.' + var startId common.SpanId + switch pred.Op { + case common.EQUALS: + if pred.Field == common.SPAN_ID { + // This is an annoying corner case. There can only be one + // result each time we do an EQUALS search for a span id. + // Span id is the primary key for all our spans. + // But for some reason someone is asking for another result. + // We modify the query to search for the illegal 0 span ID, + // which will never be present. + lg.Debugf("Attempted to use a continuation token with an EQUALS "+ + "SPAN_ID query. %s. Setting search id = 0", + pred.Predicate.String()) + startId = 0 + } else { + // When doing an EQUALS search on a secondary index, the + // results are sorted by span id. + startId = prev.Id + 1 + } + case common.LESS_THAN_OR_EQUALS: + // Subtract one from the previous span id. Since the previous + // start ID will never be 0 (0 is an illegal span id), we'll never + // wrap around when doing this. + startId = prev.Id - 1 + case common.GREATER_THAN_OR_EQUALS: + // We can't add one to the span id, since the previous span ID + // might be the maximum value. So just switch over to using + // GREATER_THAN. + pred.Op = common.GREATER_THAN + startId = prev.Id + case common.GREATER_THAN: + // This one is easy. + startId = prev.Id + default: + str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String()) + lg.Error(str + "\n") + panic(str) + } + if pred.Field == common.SPAN_ID { + pred.uintKey = uint64(startId) + searchKey = makeKey(src.keyPrefix, uint64(startId)) + } else { + // Start where the previous query left off. This means adjusting + // our uintKey. + pred.uintKey, _ = pred.extractRelevantSpanData(prev) + searchKey = makeSecondaryKey(src.keyPrefix, pred.uintKey, uint64(startId)) + } + if lg.TraceEnabled() { + lg.Tracef("Handling continuation token %s for %s. startId=%d, "+ + "pred.uintKey=%d\n", prev, pred.Predicate.String(), startId, + pred.uintKey) + } + } else { + searchKey = makeKey(src.keyPrefix, pred.uintKey) + } + for i := range src.iters { + src.iters[i].Seek(searchKey) + } + ret = &src + return ret, nil +} + +// A source of spans. +type source struct { + store *dataStore + pred *predicateData + iters []*levigo.Iterator + nexts []*common.Span + numRead []int + keyPrefix byte +} + +// Return true if this operation may require skipping the first result we get back from leveldb. +func mayRequireOneSkip(op common.Op) bool { + switch op { + // When dealing with descending predicates, the first span we read might not satisfy + // the predicate, even though subsequent ones will. This is because the iter.Seek() + // function "moves the iterator the position of the key given or, if the key doesn't + // exist, the next key that does exist in the database." So if we're on that "next + // key" it will not satisfy the predicate, but the keys previous to it might. + case common.LESS_THAN_OR_EQUALS: + return true + // iter.Seek basically takes us to the key which is "greater than or equal to" some + // value. Since we want greater than (not greater than or equal to) we may have to + // skip the first key. + case common.GREATER_THAN: + return true + } + return false +} + +// Fill in the entry in the 'next' array for a specific shard. +func (src *source) populateNextFromShard(shardIdx int) { + lg := src.store.lg + var err error + iter := src.iters[shardIdx] + if iter == nil { + lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx) + return // There are no more entries in this shard. + } + if src.nexts[shardIdx] != nil { + lg.Debugf("No need to populate shard %d\n", shardIdx) + return // We already have a valid entry for this shard. + } + for { + if !iter.Valid() { + lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx) + break // Can't read past end of DB + } + src.numRead[shardIdx]++ + key := iter.Key() + if !bytes.HasPrefix(key, []byte{src.keyPrefix}) { + lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s\n", + shardIdx, string(src.keyPrefix)) + break // Can't read past end of indexed section + } + var span *common.Span + var sid common.SpanId + if src.keyPrefix == SPAN_ID_INDEX_PREFIX { + // The span id maps to the span itself. + sid = common.SpanId(keyToInt(key[1:])) + span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value()) + if err != nil { + lg.Debugf("Internal error decoding span %s in shard %d: %s\n", + sid.String(), shardIdx, err.Error()) + break + } + } else { + // With a secondary index, we have to look up the span by id. + sid = common.SpanId(keyToInt(key[9:])) + span = src.store.shards[shardIdx].FindSpan(sid) + if span == nil { + lg.Debugf("Internal error rehydrating span %s in shard %d\n", + sid.String(), shardIdx) + break + } + } + if src.pred.Op.IsDescending() { + iter.Prev() + } else { + iter.Next() + } + if src.pred.satisfiedBy(span) { + lg.Debugf("Populated valid span %v from shard %d.\n", sid, shardIdx) + src.nexts[shardIdx] = span // Found valid entry + return + } else { + lg.Debugf("Span %s from shard %d does not satisfy the predicate.\n", + sid.String(), shardIdx) + if src.numRead[shardIdx] <= 1 && mayRequireOneSkip(src.pred.Op) { + continue + } + // This and subsequent entries don't satisfy predicate + break + } + } + lg.Debugf("Closing iterator for shard %d.\n", shardIdx) + iter.Close() + src.iters[shardIdx] = nil +} + +func (src *source) next() *common.Span { + for shardIdx := range src.iters { + src.populateNextFromShard(shardIdx) + } + var best *common.Span + bestIdx := -1 + for shardIdx := range src.iters { + span := src.nexts[shardIdx] + if src.pred.spanPtrIsBefore(span, best) { + best = span + bestIdx = shardIdx + } + } + if bestIdx >= 0 { + src.nexts[bestIdx] = nil + } + return best +} + +func (src *source) Close() { + for i := range src.iters { + if src.iters[i] != nil { + src.iters[i].Close() + } + } + src.iters = nil +} + +func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) { + // Read spans from the first predicate that is indexed. + p := *preds + for i := range p { + pred := p[i] + if pred.getIndexPrefix() != INVALID_INDEX_PREFIX { + *preds = append(p[0:i], p[i+1:]...) + return pred.createSource(store, span) + } + } + // If there are no predicates that are indexed, read rows in order of span id. + spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: "0000000000000000", + } + spanIdPredData, err := loadPredicateData(&spanIdPred) + if err != nil { + return nil, err + } + return spanIdPredData.createSource(store, span) +} + +func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) { + lg := store.lg + // Parse predicate data. + var err error + preds := make([]*predicateData, len(query.Predicates)) + for i := range query.Predicates { + preds[i], err = loadPredicateData(&query.Predicates[i]) + if err != nil { + return nil, err + } + } + // Get a source of rows. + var src *source + src, err = store.obtainSource(&preds, query.Prev) + if err != nil { + return nil, err + } + defer src.Close() + lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src) + + // Filter the spans through the remaining predicates. + ret := make([]*common.Span, 0, 32) + for { + if len(ret) >= query.Lim { + break // we hit the result size limit + } + span := src.next() + if span == nil { + break // the source has no more spans to give + } + if lg.DebugEnabled() { + lg.Debugf("src.next returned span %s\n", span.ToJson()) + } + satisfied := true + for predIdx := range preds { + if !preds[predIdx].satisfiedBy(span) { + satisfied = false + break + } + } + if satisfied { + ret = append(ret, span) + } + } + return ret, nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go new file mode 100644 index 0000000..4696547 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "encoding/json" + "math/rand" + htrace "org/apache/htrace/client" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "org/apache/htrace/test" + "os" + "sort" + "strings" + "testing" +) + +// Test creating and tearing down a datastore. +func TestCreateDatastore(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore", + DataDirs: make([]string, 3)} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() +} + +var SIMPLE_TEST_SPANS []common.Span = []common.Span{ + common.Span{Id: 1, + SpanData: common.SpanData{ + Begin: 123, + End: 456, + Description: "getFileDescriptors", + TraceId: 999, + Parents: []common.SpanId{}, + ProcessId: "firstd", + }}, + common.Span{Id: 2, + SpanData: common.SpanData{ + Begin: 125, + End: 200, + Description: "openFd", + TraceId: 999, + Parents: []common.SpanId{1}, + ProcessId: "secondd", + }}, + common.Span{Id: 3, + SpanData: common.SpanData{ + Begin: 200, + End: 456, + Description: "passFd", + TraceId: 999, + Parents: []common.SpanId{1}, + ProcessId: "thirdd", + }}, +} + +func createSpans(spans []common.Span, store *dataStore) { + for idx := range spans { + store.WriteSpan(&spans[idx]) + } + // Wait the spans to be created + for i := 0; i < 3; i++ { + <-store.WrittenSpans + } +} + +// Test creating a datastore and adding some spans. +func TestDatastoreWriteAndRead(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + span := ht.Store.FindSpan(1) + if span == nil { + t.Fatal() + } + if span.Id != 1 { + t.Fatal() + } + common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span) + children := ht.Store.FindChildren(1, 1) + if len(children) != 1 { + t.Fatalf("expected 1 child, but got %d\n", len(children)) + } + children = ht.Store.FindChildren(1, 2) + if len(children) != 2 { + t.Fatalf("expected 2 children, but got %d\n", len(children)) + } + sort.Sort(common.SpanIdSlice(children)) + if children[0] != 2 { + t.Fatal() + } + if children[1] != 3 { + t.Fatal() + } +} + +func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query, + expectedSpans []common.Span) { + spans, err := ht.Store.HandleQuery(query) + if err != nil { + t.Fatalf("First query failed: %s\n", err.Error()) + } + expectedBuf := new(bytes.Buffer) + dec := json.NewEncoder(expectedBuf) + err = dec.Encode(expectedSpans) + if err != nil { + t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error()) + } + spansBuf := new(bytes.Buffer) + dec = json.NewEncoder(spansBuf) + err = dec.Encode(spans) + if err != nil { + t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error()) + } + t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans), + len(expectedSpans)) + common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes())) +} + +// Test queries on the datastore. +func TestSimpleQuery(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) +} + +func TestQueries2(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries2", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + common.Predicate{ + Op: common.EQUALS, + Field: common.DESCRIPTION, + Val: "getFileDescriptors", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[0]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.EQUALS, + Field: common.DESCRIPTION, + Val: "getFileDescriptors", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[0]}) +} + +func TestQueries3(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries3", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.CONTAINS, + Field: common.DESCRIPTION, + Val: "Fd", + }, + common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "100", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: "0", + }, + }, + Lim: 200, + }, []common.Span{}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: "2", + }, + }, + Lim: 200, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) +} + +func TestQueries4(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries4", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.BEGIN_TIME, + Val: "125", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[2]}) + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.DESCRIPTION, + Val: "openFd", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.DESCRIPTION, + Val: "openFd", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[2]}) +} + +func BenchmarkDatastoreWrites(b *testing.B) { + htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites", + WrittenSpans: make(chan *common.Span, b.N)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + rnd := rand.New(rand.NewSource(1)) + allSpans := make([]*common.Span, b.N) + // Write many random spans. + for n := 0; n < b.N; n++ { + span := test.NewRandomSpan(rnd, allSpans[0:n]) + ht.Store.WriteSpan(span) + allSpans[n] = span + } + // Wait for all the spans to be written. + for n := 0; n < b.N; n++ { + <-ht.Store.WrittenSpans + } + spansWritten := ht.Store.GetStatistics().NumSpansWritten + if spansWritten < uint64(b.N) { + b.Fatal("incorrect statistics: expected %d spans to be written, but only got %d", + b.N, spansWritten) + } +} + +func TestReloadDataStore(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore", + DataDirs: make([]string, 2), KeepDataDirsOnClose: true} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + dataDirs := make([]string, len(ht.DataDirs)) + copy(dataDirs, ht.DataDirs) + defer func() { + if ht != nil { + ht.Close() + } + for i := range dataDirs { + os.RemoveAll(dataDirs[i]) + } + }() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + + // Create some random trace spans. + NUM_TEST_SPANS := 5 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans, + }) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) + } + + // Look up the spans we wrote. + var span *common.Span + for i := 0; i < NUM_TEST_SPANS; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + + ht.Close() + ht = nil + + htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore2", + DataDirs: dataDirs, KeepDataDirsOnClose: true} + ht, err = htraceBld.Build() + if err != nil { + t.Fatalf("failed to re-create datastore: %s", err.Error()) + } + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to re-create client: %s", err.Error()) + } + + // Look up the spans we wrote earlier. + for i := 0; i < NUM_TEST_SPANS; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + + // Set an old datastore version number. + for i := range ht.Store.shards { + shard := ht.Store.shards[i] + writeDataStoreVersion(ht.Store, shard.ldb, CURRENT_LAYOUT_VERSION-1) + } + ht.Close() + ht = nil + + htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore3", + DataDirs: dataDirs, KeepDataDirsOnClose: true} + ht, err = htraceBld.Build() + if err == nil { + t.Fatalf("expected the datastore to fail to load after setting an " + + "incorrect version.\n") + } + if !strings.Contains(err.Error(), "Invalid layout version") { + t.Fatal(`expected the loading error to contain "invalid layout version"` + "\n") + } + + // It should work with data.store.clear set. + htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore4", + DataDirs: dataDirs, KeepDataDirsOnClose: true, + Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}} + ht, err = htraceBld.Build() + if err != nil { + t.Fatalf("expected the datastore loading to succeed after setting an "+ + "incorrect version. But it failed with error %s\n", err.Error()) + } +} + +func TestQueriesWithContinuationTokens1(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + // Adding a prev value to this query excludes the first result that we + // would normally get. + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.BEGIN_TIME, + Val: "120", + }, + }, + Lim: 5, + Prev: &SIMPLE_TEST_SPANS[0], + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) + + // There is only one result from an EQUALS query on SPAN_ID. + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.EQUALS, + Field: common.SPAN_ID, + Val: "1", + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[0], + }, []common.Span{}) + + // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the + // span we pass as a continuation token. (Primary index edition). + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: "2", + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[1], + }, []common.Span{SIMPLE_TEST_SPANS[0]}) + + // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back the + // span we pass as a continuation token. (Secondary index edition). + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.DURATION, + Val: "0", + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[1], + }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]}) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go new file mode 100644 index 0000000..a53380e --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "github.com/ugorji/go/codec" + "io" + "net" + "net/rpc" + "org/apache/htrace/common" + "org/apache/htrace/conf" +) + +// Handles HRPC calls +type HrpcHandler struct { + lg *common.Logger + store *dataStore +} + +// The HRPC server +type HrpcServer struct { + *rpc.Server + hand *HrpcHandler + listener net.Listener +} + +// Codec which encodes HRPC data via JSON +type HrpcServerCodec struct { + lg *common.Logger + conn net.Conn + length uint32 +} + +func asJson(val interface{}) string { + js, err := json.Marshal(val) + if err != nil { + return "encoding error: " + err.Error() + } + return string(js) +} + +func createErrAndWarn(lg *common.Logger, val string) error { + return createErrAndLog(lg, val, common.WARN) +} + +func createErrAndLog(lg *common.Logger, val string, level common.Level) error { + lg.Write(level, val+"\n") + return errors.New(val) +} + +func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { + hdr := common.HrpcRequestHeader{} + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("Reading HRPC request header from %s\n", cdc.conn.RemoteAddr()) + } + err := binary.Read(cdc.conn, binary.LittleEndian, &hdr) + if err != nil { + level := common.WARN + if err == io.EOF { + level = common.DEBUG + } + return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading header bytes: %s", + err.Error()), level) + } + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("Read HRPC request header %s from %s\n", + asJson(&hdr), cdc.conn.RemoteAddr()) + } + if hdr.Magic != common.HRPC_MAGIC { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Invalid request header: expected "+ + "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic)) + } + if hdr.Length > common.MAX_HRPC_BODY_LENGTH { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Length prefix was too long. "+ + "Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, + hdr.Length)) + } + req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId) + if req.ServiceMethod == "" { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Unknown MethodID code 0x%04x", + hdr.MethodId)) + } + req.Seq = hdr.Seq + cdc.length = hdr.Length + return nil +} + +func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n", + cdc.length, cdc.conn.RemoteAddr()) + } + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh) + err := dec.Decode(body) + if err != nil { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read request "+ + "body from %s: %s", cdc.conn.RemoteAddr(), err.Error())) + } + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("Read body from %s: %s\n", + cdc.conn.RemoteAddr(), asJson(&body)) + } + return nil +} + +var EMPTY []byte = make([]byte, 0) + +func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error { + var err error + buf := EMPTY + if msg != nil { + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + w := bytes.NewBuffer(make([]byte, 0, 128)) + enc := codec.NewEncoder(w, mh) + err := enc.Encode(msg) + if err != nil { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to marshal "+ + "response message: %s", err.Error())) + } + buf = w.Bytes() + } + hdr := common.HrpcResponseHeader{} + hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod) + hdr.Seq = resp.Seq + hdr.ErrLength = uint32(len(resp.Error)) + hdr.Length = uint32(len(buf)) + writer := bufio.NewWriterSize(cdc.conn, 256) + err = binary.Write(writer, binary.LittleEndian, &hdr) + if err != nil { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write response "+ + "header: %s", err.Error())) + } + if hdr.ErrLength > 0 { + _, err = io.WriteString(writer, resp.Error) + if err != nil { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write error "+ + "string: %s", err.Error())) + } + } + if hdr.Length > 0 { + var length int + length, err = writer.Write(buf) + if err != nil { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write response "+ + "message: %s", err.Error())) + } + if uint32(length) != hdr.Length { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write all of "+ + "response message: %s", err.Error())) + } + } + err = writer.Flush() + if err != nil { + return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write the response "+ + "bytes: %s", err.Error())) + } + return nil +} + +func (cdc *HrpcServerCodec) Close() error { + return cdc.conn.Close() +} + +func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, + resp *common.WriteSpansResp) (err error) { + hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s). "+ + "defaultPid = %s\n", len(req.Spans), req.DefaultPid) + for i := range req.Spans { + span := req.Spans[i] + if span.ProcessId == "" { + span.ProcessId = req.DefaultPid + } + if hand.lg.TraceEnabled() { + hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson()) + } + hand.store.WriteSpan(span) + } + return nil +} + +func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) { + lg := common.NewLogger("hrpc", cnf) + hsv := &HrpcServer{ + Server: rpc.NewServer(), + hand: &HrpcHandler{ + lg: lg, + store: store, + }, + } + var err error + hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS)) + if err != nil { + return nil, err + } + hsv.Server.Register(hsv.hand) + go hsv.run() + lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String()) + return hsv, nil +} + +func (hsv *HrpcServer) run() { + lg := hsv.hand.lg + for { + conn, err := hsv.listener.Accept() + if err != nil { + lg.Errorf("HRPC Accept error: %s\n", err.Error()) + continue + } + if lg.TraceEnabled() { + lg.Tracef("Accepted HRPC connection from %s\n", conn.RemoteAddr()) + } + go hsv.ServeCodec(&HrpcServerCodec{ + lg: lg, + conn: conn, + }) + } +} + +func (hsv *HrpcServer) Addr() net.Addr { + return hsv.listener.Addr() +} + +func (hsv *HrpcServer) Close() { + hsv.listener.Close() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go new file mode 100644 index 0000000..64da457 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "encoding/json" + "fmt" + "net" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "os" + "strings" + "time" +) + +var RELEASE_VERSION string +var GIT_VERSION string + +const USAGE = `htraced: the HTrace server daemon. + +htraced receives trace spans sent from HTrace clients. It exposes a REST +interface which others can query. It also runs a web server with a graphical +user interface. htraced stores its span data in levelDB files on the local +disks. + +Usage: +--help: this help message + +-Dk=v: set configuration key 'k' to value 'v' +For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost, +port 8080. + +-Dk: set configuration key 'k' to 'true' + +Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME + ` +configuration file. We find this file by searching the paths in the +` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate way +of setting configuration when launching the daemon. +` + +func main() { + for idx := range os.Args { + arg := os.Args[idx] + if strings.HasPrefix(arg, "--h") || strings.HasPrefix(arg, "-h") { + fmt.Fprintf(os.Stderr, USAGE) + os.Exit(0) + } + } + cnf := common.LoadApplicationConfig() + common.InstallSignalHandlers(cnf) + lg := common.NewLogger("main", cnf) + defer lg.Close() + store, err := CreateDataStore(cnf, nil) + if err != nil { + lg.Errorf("Error creating datastore: %s\n", err.Error()) + os.Exit(1) + } + var rsv *RestServer + rsv, err = CreateRestServer(cnf, store) + if err != nil { + lg.Errorf("Error creating REST server: %s\n", err.Error()) + os.Exit(1) + } + var hsv *HrpcServer + if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" { + hsv, err = CreateHrpcServer(cnf, store) + if err != nil { + lg.Errorf("Error creating HRPC server: %s\n", err.Error()) + os.Exit(1) + } + } else { + lg.Infof("Not starting HRPC server because no value was given for %s.\n", + conf.HTRACE_HRPC_ADDRESS) + } + naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS) + if naddr != "" { + notif := StartupNotification{ + HttpAddr: rsv.Addr().String(), + ProcessId: os.Getpid(), + } + if hsv != nil { + notif.HrpcAddr = hsv.Addr().String() + } + err = sendStartupNotification(naddr, ¬if) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+ + "%s\n", err.Error()) + os.Exit(1) + } + } + for { + time.Sleep(time.Duration(10) * time.Hour) + } +} + +// A startup notification message that we optionally send on startup. +// Used by unit tests. +type StartupNotification struct { + HttpAddr string + HrpcAddr string + ProcessId int +} + +func sendStartupNotification(naddr string, notif *StartupNotification) error { + conn, err := net.Dial("tcp", naddr) + if err != nil { + return err + } + defer func() { + if conn != nil { + conn.Close() + } + }() + var buf []byte + buf, err = json.Marshal(notif) + if err != nil { + return err + } + _, err = conn.Write(buf) + conn.Close() + conn = nil + return nil +}
