http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/file.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/file.go 
b/htrace-htraced/go/src/org/apache/htrace/htrace/file.go
new file mode 100644
index 0000000..ea214be
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/file.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bufio"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "org/apache/htrace/common"
+       "os"
+)
+
+// A file used for input.
+// Transparently supports using stdin for input.
+type InputFile struct {
+       *os.File
+       path string
+}
+
+// Open an input file.  Stdin will be used when path is -
+func OpenInputFile(path string) (*InputFile, error) {
+       if path == "-" {
+               return &InputFile{File: os.Stdin, path: path}, nil
+       }
+       file, err := os.Open(path)
+       if err != nil {
+               return nil, err
+       }
+       return &InputFile{File: file, path: path}, nil
+}
+
+func (file *InputFile) Close() {
+       if file.path != "-" {
+               file.File.Close()
+       }
+}
+
+// A file used for output.
+// Transparently supports using stdout for output.
+type OutputFile struct {
+       *os.File
+       path string
+}
+
+// Create an output file.  Stdout will be used when path is -
+func CreateOutputFile(path string) (*OutputFile, error) {
+       if path == "-" {
+               return &OutputFile{File: os.Stdout, path: path}, nil
+       }
+       file, err := os.Create(path)
+       if err != nil {
+               return nil, err
+       }
+       return &OutputFile{File: file, path: path}, nil
+}
+
+func (file *OutputFile) Close() error {
+       if file.path != "-" {
+               return file.File.Close()
+       }
+       return nil
+}
+
+// FailureDeferringWriter is a writer which allows us to call Printf multiple
+// times and then check if all the printfs succeeded at the very end, rather
+// than checking after each call.   We will not attempt to write more data
+// after the first write failure.
+type FailureDeferringWriter struct {
+       io.Writer
+       err error
+}
+
+func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter {
+       return &FailureDeferringWriter{writer, nil}
+}
+
+func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) {
+       if w.err != nil {
+               return
+       }
+       str := fmt.Sprintf(format, v...)
+       _, err := w.Writer.Write([]byte(str))
+       if err != nil {
+               w.err = err
+       }
+}
+
+func (w *FailureDeferringWriter) Error() error {
+       return w.err
+}
+
+// Read a file full of whitespace-separated span JSON into a slice of spans.
+func readSpansFile(path string) (common.SpanSlice, error) {
+       file, err := OpenInputFile(path)
+       if err != nil {
+               return nil, err
+       }
+       defer file.Close()
+       return readSpans(bufio.NewReader(file))
+}
+
+// Read whitespace-separated span JSON into a slice of spans.
+func readSpans(reader io.Reader) (common.SpanSlice, error) {
+       spans := make(common.SpanSlice, 0)
+       dec := json.NewDecoder(reader)
+       for {
+               var span common.Span
+               err := dec.Decode(&span)
+               if err != nil {
+                       if err != io.EOF {
+                               return nil, errors.New(fmt.Sprintf("Decode 
error after decoding %d "+
+                                       "span(s): %s", len(spans), err.Error()))
+                       }
+                       break
+               }
+               spans = append(spans, &span)
+       }
+       return spans, nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/file_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/file_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htrace/file_test.go
new file mode 100644
index 0000000..b6f9cac
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/file_test.go
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "errors"
+       "io"
+       "io/ioutil"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "org/apache/htrace/test"
+       "os"
+       "strings"
+       "testing"
+)
+
+func TestInputFileAndOutputFile(t *testing.T) {
+       tdir, err := ioutil.TempDir(os.TempDir(), "TestInputFileAndOutputFile")
+       if err != nil {
+               t.Fatalf("failed to create TempDir: %s\n", err.Error())
+       }
+       defer os.RemoveAll(tdir)
+       tpath := tdir + conf.PATH_SEP + "test"
+       var ofile *OutputFile
+       ofile, err = CreateOutputFile(tpath)
+       if err != nil {
+               t.Fatalf("failed to create OutputFile at %s: %s\n", tpath, 
err.Error())
+       }
+       defer func() {
+               if ofile != nil {
+                       ofile.Close()
+               }
+       }()
+       w := NewFailureDeferringWriter(ofile)
+       w.Printf("Hello, world!\n")
+       w.Printf("2 + 2 = %d\n", 4)
+       if w.Error() != nil {
+               t.Fatalf("got unexpected error writing to %s: %s\n", tpath, 
w.Error().Error())
+       }
+       err = ofile.Close()
+       ofile = nil
+       if err != nil {
+               t.Fatalf("error on closing OutputFile for %s: %s\n", tpath, 
err.Error())
+       }
+       var ifile *InputFile
+       ifile, err = OpenInputFile(tpath)
+       defer ifile.Close()
+       expected := "Hello, world!\n2 + 2 = 4\n"
+       buf := make([]byte, len(expected))
+       _, err = io.ReadAtLeast(ifile, buf, len(buf))
+       if err != nil {
+               t.Fatalf("unexpected error on reading %s: %s\n", tpath, 
err.Error())
+       }
+       str := string(buf)
+       if str != expected {
+               t.Fatalf("Could not read back what we wrote to %s.\n"+
+                       "Got:\n%s\nExpected:\n%s\n", tpath, str, expected)
+       }
+}
+
+type LimitedBufferWriter struct {
+       buf []byte
+       off int
+}
+
+const LIMITED_BUFFER_MESSAGE = "There isn't enough buffer to go around!"
+
+func (w *LimitedBufferWriter) Write(p []byte) (int, error) {
+       var nwritten int
+       for i := range p {
+               if w.off >= len(w.buf) {
+                       return nwritten, errors.New(LIMITED_BUFFER_MESSAGE)
+               }
+               w.buf[w.off] = p[i]
+               w.off = w.off + 1
+               nwritten++
+       }
+       return nwritten, nil
+}
+
+func TestFailureDeferringWriter(t *testing.T) {
+       lw := LimitedBufferWriter{buf: make([]byte, 20), off: 0}
+       w := NewFailureDeferringWriter(&lw)
+       w.Printf("Zippity do dah #%d\n", 1)
+       w.Printf("Zippity do dah #%d\n", 2)
+       if w.Error() == nil {
+               t.Fatalf("expected FailureDeferringWriter to experience a 
failure due to " +
+                       "limited buffer size, but it did not.")
+       }
+       if w.Error().Error() != LIMITED_BUFFER_MESSAGE {
+               t.Fatalf("expected FailureDeferringWriter to have the error 
message %s, but "+
+                       "the message was %s\n", LIMITED_BUFFER_MESSAGE, 
w.Error().Error())
+       }
+       expected := "Zippity do dah #1\nZi"
+       if string(lw.buf) != expected {
+               t.Fatalf("expected LimitedBufferWriter to contain %s, but it 
contained %s "+
+                       "instead.\n", expected, string(lw.buf))
+       }
+}
+
+func TestReadSpans(t *testing.T) {
+       SPAN_TEST_STR := `{"i":"bdd6d4ee48de59bf","s":"c0681027d3ea4928",` +
+               
`"b":1424736225037,"e":1424736225901,"d":"ClientNamenodeProtocol#getFileInfo",` 
+
+               `"r":"FsShell","p":["60538dfb4df91418"]}
+{"i":"bdd6d4ee48de59bf","s":"60538dfb4df91418","b":1424736224969,` +
+               
`"e":1424736225960,"d":"getFileInfo","r":"FsShell","p":[],"n":{"path":"/"}}
+`
+       r := strings.NewReader(SPAN_TEST_STR)
+       spans, err := readSpans(r)
+       if err != nil {
+               t.Fatalf("Failed to read spans from string via readSpans: 
%s\n", err.Error())
+       }
+       SPAN_TEST_EXPECTED := common.SpanSlice{
+               &common.Span{
+                       Id: test.SpanId("c0681027d3ea4928"),
+                       SpanData: common.SpanData{
+                               TraceId:     test.SpanId("bdd6d4ee48de59bf"),
+                               Begin:       1424736225037,
+                               End:         1424736225901,
+                               Description: 
"ClientNamenodeProtocol#getFileInfo",
+                               ProcessId:   "FsShell",
+                               Parents:     
[]common.SpanId{test.SpanId("60538dfb4df91418")},
+                       },
+               },
+               &common.Span{
+                       Id: test.SpanId("60538dfb4df91418"),
+                       SpanData: common.SpanData{
+                               TraceId:     test.SpanId("bdd6d4ee48de59bf"),
+                               Begin:       1424736224969,
+                               End:         1424736225960,
+                               Description: "getFileInfo",
+                               ProcessId:   "FsShell",
+                               Parents:     []common.SpanId{},
+                               Info: common.TraceInfoMap{
+                                       "path": "/",
+                               },
+                       },
+               },
+       }
+       if len(spans) != len(SPAN_TEST_EXPECTED) {
+               t.Fatalf("Expected %d spans, but got %d\n",
+                       len(SPAN_TEST_EXPECTED), len(spans))
+       }
+       for i := range SPAN_TEST_EXPECTED {
+               common.ExpectSpansEqual(t, spans[i], SPAN_TEST_EXPECTED[i])
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/graph.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/graph.go 
b/htrace-htraced/go/src/org/apache/htrace/htrace/graph.go
new file mode 100644
index 0000000..dabf2df
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/graph.go
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bufio"
+       "errors"
+       "fmt"
+       "io"
+       "org/apache/htrace/common"
+       "os"
+       "sort"
+)
+
+// Create a dotfile from a json file.
+func jsonSpanFileToDotFile(jsonFile string, dotFile string) error {
+       spans, err := readSpansFile(jsonFile)
+       if err != nil {
+               return errors.New(fmt.Sprintf("error reading %s: %s",
+                       jsonFile, err.Error()))
+       }
+       var file *OutputFile
+       file, err = CreateOutputFile(dotFile)
+       if err != nil {
+               return errors.New(fmt.Sprintf("error opening %s for write: %s",
+                       dotFile, err.Error()))
+       }
+       defer func() {
+               if file != nil {
+                       file.Close()
+               }
+       }()
+       writer := bufio.NewWriter(file)
+       err = spansToDot(spans, writer)
+       if err != nil {
+               return err
+       }
+       err = writer.Flush()
+       if err != nil {
+               return err
+       }
+       err = file.Close()
+       file = nil
+       return err
+}
+
+// Create output in dotfile format from a set of spans.
+func spansToDot(spans common.SpanSlice, writer io.Writer) error {
+       sort.Sort(spans)
+       idMap := make(map[common.SpanId]*common.Span)
+       for i := range spans {
+               span := spans[i]
+               if idMap[span.Id] != nil {
+                       fmt.Fprintf(os.Stderr, "There were multiple spans 
listed which "+
+                               "had ID %s.\nFirst:%s\nOther:%s\n", 
span.Id.String(),
+                               idMap[span.Id].ToJson(), span.ToJson())
+               } else {
+                       idMap[span.Id] = span
+               }
+       }
+       childMap := make(map[common.SpanId]common.SpanSlice)
+       for i := range spans {
+               child := spans[i]
+               for j := range child.Parents {
+                       parent := idMap[child.Parents[j]]
+                       if parent == nil {
+                               fmt.Fprintf(os.Stderr, "Can't find parent id %s 
for %s\n",
+                                       child.Parents[j].String(), 
child.ToJson())
+                       } else {
+                               children := childMap[parent.Id]
+                               if children == nil {
+                                       children = make(common.SpanSlice, 0)
+                               }
+                               children = append(children, child)
+                               childMap[parent.Id] = children
+                       }
+               }
+       }
+       w := NewFailureDeferringWriter(writer)
+       w.Printf("digraph spans {\n")
+       // Write out the nodes with their descriptions.
+       for i := range spans {
+               w.Printf(fmt.Sprintf(`  "%s" [label="%s"];`+"\n",
+                       spans[i].Id.String(), spans[i].Description))
+       }
+       // Write out the edges between nodes... the parent/children 
relationships
+       for i := range spans {
+               children := childMap[spans[i].Id]
+               sort.Sort(children)
+               if children != nil {
+                       for c := range children {
+                               w.Printf(fmt.Sprintf(`  "%s" -> "%s";`+"\n",
+                                       spans[i].Id.String(), children[c].Id))
+                       }
+               }
+       }
+       w.Printf("}\n")
+       return w.Error()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/graph_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/graph_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htrace/graph_test.go
new file mode 100644
index 0000000..8698a98
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/graph_test.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "org/apache/htrace/common"
+       "org/apache/htrace/test"
+       "testing"
+)
+
+func TestSpansToDot(t *testing.T) {
+       TEST_SPANS := common.SpanSlice{
+               &common.Span{
+                       Id: test.SpanId("6af3cc058e5d829d"),
+                       SpanData: common.SpanData{
+                               TraceId:     test.SpanId("0e4716fe911244de"),
+                               Begin:       1424813349020,
+                               End:         1424813349134,
+                               Description: "newDFSInputStream",
+                               ProcessId:   "FsShell",
+                               Parents:     []common.SpanId{},
+                               Info: common.TraceInfoMap{
+                                       "path": "/",
+                               },
+                       },
+               },
+               &common.Span{
+                       Id: test.SpanId("75d16cc5b2c07d8a"),
+                       SpanData: common.SpanData{
+                               TraceId:     test.SpanId("0e4716fe911244de"),
+                               Begin:       1424813349025,
+                               End:         1424813349133,
+                               Description: "getBlockLocations",
+                               ProcessId:   "FsShell",
+                               Parents:     
[]common.SpanId{test.SpanId("6af3cc058e5d829d")},
+                       },
+               },
+               &common.Span{
+                       Id: test.SpanId("e2c7273efb280a8c"),
+                       SpanData: common.SpanData{
+                               TraceId:     test.SpanId("0e4716fe911244de"),
+                               Begin:       1424813349027,
+                               End:         1424813349073,
+                               Description: 
"ClientNamenodeProtocol#getBlockLocations",
+                               ProcessId:   "FsShell",
+                               Parents:     
[]common.SpanId{test.SpanId("75d16cc5b2c07d8a")},
+                       },
+               },
+       }
+       w := bytes.NewBuffer(make([]byte, 0, 2048))
+       err := spansToDot(TEST_SPANS, w)
+       if err != nil {
+               t.Fatalf("spansToDot failed: error %s\n", err.Error())
+       }
+       EXPECTED_STR := `digraph spans {
+  "6af3cc058e5d829d" [label="newDFSInputStream"];
+  "75d16cc5b2c07d8a" [label="getBlockLocations"];
+  "e2c7273efb280a8c" [label="ClientNamenodeProtocol#getBlockLocations"];
+  "6af3cc058e5d829d" -> "75d16cc5b2c07d8a";
+  "75d16cc5b2c07d8a" -> "e2c7273efb280a8c";
+}
+`
+       if w.String() != EXPECTED_STR {
+               t.Fatalf("Expected to get:\n%s\nGot:\n%s\n", EXPECTED_STR, 
w.String())
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htrace/queries.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/queries.go 
b/htrace-htraced/go/src/org/apache/htrace/htrace/queries.go
new file mode 100644
index 0000000..4ff246c
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/queries.go
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       htrace "org/apache/htrace/client"
+       "org/apache/htrace/common"
+       "strings"
+       "unicode"
+)
+
+// Convert a string into a whitespace-separated sequence of strings.
+func tokenize(str string) []string {
+       prevQuote := rune(0)
+       f := func(c rune) bool {
+               switch {
+               case c == prevQuote:
+                       prevQuote = rune(0)
+                       return false
+               case prevQuote != rune(0):
+                       return false
+               case unicode.In(c, unicode.Quotation_Mark):
+                       prevQuote = c
+                       return false
+               default:
+                       return unicode.IsSpace(c)
+               }
+       }
+       return strings.FieldsFunc(str, f)
+}
+
+// Parses a query string in the format of a series of
+// [TYPE] [OPERATOR] [CONST] tuples, joined by AND statements.
+type predicateParser struct {
+       tokens   []string
+       curToken int
+}
+
+func (ps *predicateParser) Parse() (*common.Predicate, error) {
+       if ps.curToken > len(ps.tokens) {
+               return nil, nil
+       }
+       if ps.curToken > 0 {
+               if strings.ToLower(ps.tokens[ps.curToken]) != "and" {
+                       return nil, errors.New(fmt.Sprintf("Error parsing on 
token %d: "+
+                               "expected predicates to be joined by 'and', but 
found '%s'",
+                               ps.curToken, ps.tokens[ps.curToken]))
+               }
+               ps.curToken++
+               if ps.curToken > len(ps.tokens) {
+                       return nil, errors.New(fmt.Sprintf("Nothing found after 
'and' at "+
+                               "token %d", ps.curToken))
+               }
+       }
+       field := common.Field(ps.tokens[ps.curToken])
+       if !field.IsValid() {
+               return nil, errors.New(fmt.Sprintf("Invalid field specifier at 
token %d.  "+
+                       "Can't understand %s.  Valid field specifiers are %v", 
ps.curToken,
+                       ps.tokens[ps.curToken], common.ValidFields()))
+       }
+       ps.curToken++
+       if ps.curToken > len(ps.tokens) {
+               return nil, errors.New(fmt.Sprintf("Nothing found after field 
specifier "+
+                       "at token %d", ps.curToken))
+       }
+       op := common.Op(ps.tokens[ps.curToken])
+       if !op.IsValid() {
+               return nil, errors.New(fmt.Sprintf("Invalid operation specifier 
at token %d.  "+
+                       "Can't understand %s.  Valid operation specifiers are 
%v", ps.curToken,
+                       ps.tokens[ps.curToken], common.ValidOps()))
+       }
+       ps.curToken++
+       if ps.curToken > len(ps.tokens) {
+               return nil, errors.New(fmt.Sprintf("Nothing found after field 
specifier "+
+                       "at token %d", ps.curToken))
+       }
+       val := ps.tokens[ps.curToken]
+       return &common.Predicate{Op: op, Field: field, Val: val}, nil
+}
+
+func parseQueryString(str string) ([]common.Predicate, error) {
+       ps := predicateParser{tokens: tokenize(str)}
+       preds := make([]common.Predicate, 0)
+       for {
+               pred, err := ps.Parse()
+               if pred == nil {
+                       break
+               }
+               if err != nil {
+                       return nil, err
+               }
+       }
+       if len(preds) == 0 {
+               return nil, errors.New("Empty query string")
+       }
+       return preds, nil
+}
+
+// Send a query from a query string.
+func doQueryFromString(hcl *htrace.Client, str string, lim int) error {
+       query := &common.Query{Lim: lim}
+       var err error
+       query.Predicates, err = parseQueryString(str)
+       if err != nil {
+               return err
+       }
+       return doQuery(hcl, query)
+}
+
+// Send a query from a raw JSON string.
+func doRawQuery(hcl *htrace.Client, str string) error {
+       jsonBytes := []byte(str)
+       var query common.Query
+       err := json.Unmarshal(jsonBytes, &query)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Error parsing provided JSON: 
%s\n", err.Error()))
+       }
+       return doQuery(hcl, &query)
+}
+
+// Send a query.
+func doQuery(hcl *htrace.Client, query *common.Query) error {
+       if *verbose {
+               qbytes, err := json.Marshal(*query)
+               if err != nil {
+                       qbytes = []byte("marshaling error: " + err.Error())
+               }
+               fmt.Printf("Sending query: %s\n", string(qbytes))
+       }
+       spans, err := hcl.Query(query)
+       if err != nil {
+               return err
+       }
+       if *verbose {
+               fmt.Printf("%d results...\n", len(spans))
+       }
+       for i := range spans {
+               fmt.Printf("%s\n", spans[i].ToJson())
+       }
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
new file mode 100644
index 0000000..218c1c8
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "fmt"
+       "math/rand"
+       htrace "org/apache/htrace/client"
+       "org/apache/htrace/common"
+       "org/apache/htrace/test"
+       "sort"
+       "testing"
+       "time"
+)
+
+func TestClientGetServerInfo(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerInfo",
+               DataDirs: make([]string, 2)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf())
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       _, err = hcl.GetServerInfo()
+       if err != nil {
+               t.Fatalf("failed to call GetServerInfo: %s", err.Error())
+       }
+}
+
+func createRandomTestSpans(amount int) common.SpanSlice {
+       rnd := rand.New(rand.NewSource(2))
+       allSpans := make(common.SpanSlice, amount)
+       allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0])
+       for i := 1; i < amount; i++ {
+               allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i])
+       }
+       allSpans[1].SpanData.Parents = 
[]common.SpanId{common.SpanId(allSpans[0].Id)}
+       return allSpans
+}
+
+func TestClientOperations(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
+               DataDirs: make([]string, 2)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf())
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+
+       // Create some random trace spans.
+       NUM_TEST_SPANS := 30
+       allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+
+       // Write half of the spans to htraced via the client.
+       err = hcl.WriteSpans(&common.WriteSpansReq{
+               Spans: allSpans[0 : NUM_TEST_SPANS/2],
+       })
+       if err != nil {
+               t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
+                       err.Error())
+       }
+
+       // Look up the first half of the spans.  They should be found.
+       var span *common.Span
+       for i := 0; i < NUM_TEST_SPANS/2; i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+
+       // Look up the second half of the spans.  They should not be found.
+       for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               if span != nil {
+                       t.Fatalf("Unexpectedly found a span we never write to "+
+                               "the server: FindSpan(%d) succeeded\n", i)
+               }
+       }
+
+       // Test FindChildren
+       childSpan := allSpans[1]
+       parentId := childSpan.Parents[0]
+       var children []common.SpanId
+       children, err = hcl.FindChildren(parentId, 1)
+       if err != nil {
+               t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error())
+       }
+       if len(children) != 1 {
+               t.Fatalf("FindChildren(%s) returned an invalid number of "+
+                       "children: expected %d, got %d\n", parentId, 1, 
len(children))
+       }
+       if children[0] != childSpan.Id {
+               t.Fatalf("FindChildren(%s) returned an invalid child id: 
expected %s, "+
+                       " got %s\n", parentId, childSpan.Id, children[0])
+       }
+
+       // Test FindChildren on a span that has no children
+       childlessSpan := allSpans[NUM_TEST_SPANS/2]
+       children, err = hcl.FindChildren(childlessSpan.Id, 10)
+       if err != nil {
+               t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, 
err.Error())
+       }
+       if len(children) != 0 {
+               t.Fatalf("FindChildren(%d) returned an invalid number of "+
+                       "children: expected %d, got %d\n", childlessSpan.Id, 0, 
len(children))
+       }
+
+       // Test Query
+       var query common.Query
+       query = common.Query{Lim: 10}
+       spans, err := hcl.Query(&query)
+       if err != nil {
+               t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error())
+       }
+       if len(spans) != 10 {
+               t.Fatalf("Query({lim: %d}) returned an invalid number of "+
+                       "children: expected %d, got %d\n", 10, 10, len(spans))
+       }
+}
+
+func TestDumpAll(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
+               DataDirs: make([]string, 2)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf())
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+
+       NUM_TEST_SPANS := 100
+       allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+       sort.Sort(allSpans)
+       err = hcl.WriteSpans(&common.WriteSpansReq{
+               Spans: allSpans,
+       })
+       if err != nil {
+               t.Fatalf("WriteSpans failed: %s\n", err.Error())
+       }
+       out := make(chan *common.Span, 50)
+       var dumpErr error
+       go func() {
+               dumpErr = hcl.DumpAll(3, out)
+       }()
+       var numSpans int
+       nextLogTime := time.Now().Add(time.Millisecond * 5)
+       for {
+               span, channelOpen := <-out
+               if !channelOpen {
+                       break
+               }
+               common.ExpectSpansEqual(t, allSpans[numSpans], span)
+               numSpans++
+               if testing.Verbose() {
+                       now := time.Now()
+                       if !now.Before(nextLogTime) {
+                               nextLogTime = now
+                               nextLogTime = nextLogTime.Add(time.Millisecond 
* 5)
+                               fmt.Printf("read back %d span(s)...\n", 
numSpans)
+                       }
+               }
+       }
+       if numSpans != len(allSpans) {
+               t.Fatalf("expected to read %d spans... but only read %d\n",
+                       len(allSpans), numSpans)
+       }
+       if dumpErr != nil {
+               t.Fatalf("got dump error %s\n", dumpErr.Error())
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
new file mode 100644
index 0000000..0742555
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -0,0 +1,999 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "encoding/gob"
+       "errors"
+       "fmt"
+       "github.com/jmhodges/levigo"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "os"
+       "strconv"
+       "strings"
+       "sync/atomic"
+)
+
+//
+// The data store code for HTraced.
+//
+// This code stores the trace spans.  We use levelDB here so that we don't 
have to store everything
+// in memory at all times.  The data is sharded across multiple levelDB 
databases in multiple
+// directories.  Normally, these multiple directories will be on multiple disk 
drives.
+//
+// The main emphasis in the HTraceD data store is on quickly and efficiently 
storing trace span data
+// coming from many daemons.  Durability is not as big a concern as in some 
data stores, since
+// losing a little bit of trace data if htraced goes down is not critical.  We 
use the "gob" package
+// for serialization.  We assume that there will be many more writes than 
reads.
+//
+// Schema
+// m -> dataStoreVersion
+// s[8-byte-big-endian-sid] -> SpanData
+// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
+// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
+// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
+// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
+//
+// Note that span IDs are unsigned 64-bit numbers.
+// Begin times, end times, and durations are signed 64-bit numbers.
+// In order to get LevelDB to properly compare the signed 64-bit quantities,
+// we flip the highest bit.  This way, we can get leveldb to view negative
+// quantities as less than non-negative ones.  This also means that we can do
+// all queries using unsigned 64-bit math, rather than having to special-case
+// the signed fields.
+//
+
+const UNKNOWN_LAYOUT_VERSION = 0
+const CURRENT_LAYOUT_VERSION = 2
+
+var EMPTY_BYTE_BUF []byte = []byte{}
+
+const VERSION_KEY = 'v'
+const SPAN_ID_INDEX_PREFIX = 's'
+const BEGIN_TIME_INDEX_PREFIX = 'b'
+const END_TIME_INDEX_PREFIX = 'e'
+const DURATION_INDEX_PREFIX = 'd'
+const PARENT_ID_INDEX_PREFIX = 'p'
+const INVALID_INDEX_PREFIX = 0
+
+type Statistics struct {
+       NumSpansWritten uint64
+}
+
+func (stats *Statistics) IncrementWrittenSpans() {
+       atomic.AddUint64(&stats.NumSpansWritten, 1)
+}
+
+// Make a copy of the statistics structure, using atomic operations.
+func (stats *Statistics) Copy() *Statistics {
+       return &Statistics{
+               NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten),
+       }
+}
+
+// Translate an 8-byte value into a leveldb key.
+func makeKey(tag byte, val uint64) []byte {
+       return []byte{
+               tag,
+               byte(0xff & (val >> 56)),
+               byte(0xff & (val >> 48)),
+               byte(0xff & (val >> 40)),
+               byte(0xff & (val >> 32)),
+               byte(0xff & (val >> 24)),
+               byte(0xff & (val >> 16)),
+               byte(0xff & (val >> 8)),
+               byte(0xff & (val >> 0)),
+       }
+}
+
+func keyToInt(key []byte) uint64 {
+       var id uint64
+       id = (uint64(key[0]) << 56) |
+               (uint64(key[1]) << 48) |
+               (uint64(key[2]) << 40) |
+               (uint64(key[3]) << 32) |
+               (uint64(key[4]) << 24) |
+               (uint64(key[5]) << 16) |
+               (uint64(key[6]) << 8) |
+               (uint64(key[7]) << 0)
+       return id
+}
+
+func makeSecondaryKey(tag byte, fir uint64, sec uint64) []byte {
+       return []byte{
+               tag,
+               byte(0xff & (fir >> 56)),
+               byte(0xff & (fir >> 48)),
+               byte(0xff & (fir >> 40)),
+               byte(0xff & (fir >> 32)),
+               byte(0xff & (fir >> 24)),
+               byte(0xff & (fir >> 16)),
+               byte(0xff & (fir >> 8)),
+               byte(0xff & (fir >> 0)),
+               byte(0xff & (sec >> 56)),
+               byte(0xff & (sec >> 48)),
+               byte(0xff & (sec >> 40)),
+               byte(0xff & (sec >> 32)),
+               byte(0xff & (sec >> 24)),
+               byte(0xff & (sec >> 16)),
+               byte(0xff & (sec >> 8)),
+               byte(0xff & (sec >> 0)),
+       }
+}
+
+// A single directory containing a levelDB instance.
+type shard struct {
+       // The data store that this shard is part of
+       store *dataStore
+
+       // The LevelDB instance.
+       ldb *levigo.DB
+
+       // The path to the leveldb directory this shard is managing.
+       path string
+
+       // Incoming requests to write Spans.
+       incoming chan *common.Span
+
+       // The channel we will send a bool to when we exit.
+       exited chan bool
+}
+
+// Process incoming spans for a shard.
+func (shd *shard) processIncoming() {
+       lg := shd.store.lg
+       for {
+               span := <-shd.incoming
+               if span == nil {
+                       lg.Infof("Shard processor for %s exiting.\n", shd.path)
+                       shd.exited <- true
+                       return
+               }
+               err := shd.writeSpan(span)
+               if err != nil {
+                       lg.Errorf("Shard processor for %s got fatal error 
%s.\n", shd.path, err.Error())
+               } else {
+                       lg.Tracef("Shard processor for %s wrote span %s.\n", 
shd.path, span.ToJson())
+               }
+       }
+}
+
+// Convert a signed 64-bit number into an unsigned 64-bit number.  We flip the
+// highest bit, so that negative input values map to unsigned numbers which are
+// less than non-negative input values.
+func s2u64(val int64) uint64 {
+       ret := uint64(val)
+       ret ^= 0x8000000000000000
+       return ret
+}
+
+func (shd *shard) writeSpan(span *common.Span) error {
+       batch := levigo.NewWriteBatch()
+       defer batch.Close()
+
+       // Add SpanData to batch.
+       spanDataBuf := new(bytes.Buffer)
+       spanDataEnc := gob.NewEncoder(spanDataBuf)
+       err := spanDataEnc.Encode(span.SpanData)
+       if err != nil {
+               return err
+       }
+       batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), 
spanDataBuf.Bytes())
+
+       // Add this to the parent index.
+       for parentIdx := range span.Parents {
+               batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX,
+                       span.Parents[parentIdx].Val(), span.Id.Val()), 
EMPTY_BYTE_BUF)
+       }
+
+       // Add to the other secondary indices.
+       batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, s2u64(span.Begin),
+               span.Id.Val()), EMPTY_BYTE_BUF)
+       batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, s2u64(span.End),
+               span.Id.Val()), EMPTY_BYTE_BUF)
+       batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, 
s2u64(span.Duration()),
+               span.Id.Val()), EMPTY_BYTE_BUF)
+
+       err = shd.ldb.Write(shd.store.writeOpts, batch)
+       if err != nil {
+               return err
+       }
+       shd.store.stats.IncrementWrittenSpans()
+       if shd.store.WrittenSpans != nil {
+               shd.store.WrittenSpans <- span
+       }
+       return nil
+}
+
+func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId,
+       lim int32) ([]common.SpanId, int32, error) {
+       searchKey := makeKey('p', sid.Val())
+       iter := shd.ldb.NewIterator(shd.store.readOpts)
+       defer iter.Close()
+       iter.Seek(searchKey)
+       for {
+               if !iter.Valid() {
+                       break
+               }
+               if lim == 0 {
+                       break
+               }
+               key := iter.Key()
+               if !bytes.HasPrefix(key, searchKey) {
+                       break
+               }
+               id := common.SpanId(keyToInt(key[9:]))
+               childIds = append(childIds, id)
+               lim--
+               iter.Next()
+       }
+       return childIds, lim, nil
+}
+
+// Close a shard.
+func (shd *shard) Close() {
+       lg := shd.store.lg
+       shd.incoming <- nil
+       lg.Infof("Waiting for %s to exit...\n", shd.path)
+       if shd.exited != nil {
+               <-shd.exited
+       }
+       shd.ldb.Close()
+       lg.Infof("Closed %s...\n", shd.path)
+}
+
+// The Data Store.
+type dataStore struct {
+       lg *common.Logger
+
+       // The shards which manage our LevelDB instances.
+       shards []*shard
+
+       // I/O statistics for all shards.
+       stats Statistics
+
+       // The read options to use for LevelDB.
+       readOpts *levigo.ReadOptions
+
+       // The write options to use for LevelDB.
+       writeOpts *levigo.WriteOptions
+
+       // If non-null, a channel we will send spans to once we finish writing 
them.  This is only used
+       // for testing.
+       WrittenSpans chan *common.Span
+}
+
+func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) 
(*dataStore, error) {
+       // Get the configuration.
+       clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR)
+       dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
+       dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
+
+       var err error
+       lg := common.NewLogger("datastore", cnf)
+       store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: 
writtenSpans}
+
+       // If we return an error, close the store.
+       defer func() {
+               if err != nil {
+                       store.Close()
+                       store = nil
+               }
+       }()
+
+       store.readOpts = levigo.NewReadOptions()
+       store.readOpts.SetFillCache(true)
+       store.writeOpts = levigo.NewWriteOptions()
+       store.writeOpts.SetSync(false)
+
+       // Open all shards
+       for idx := range dirs {
+               path := dirs[idx] + conf.PATH_SEP + "db"
+               var shd *shard
+               shd, err = CreateShard(store, cnf, path, clearStored)
+               if err != nil {
+                       lg.Errorf("Error creating shard %s: %s\n", path, 
err.Error())
+                       return nil, err
+               }
+               store.shards = append(store.shards, shd)
+       }
+       for idx := range store.shards {
+               shd := store.shards[idx]
+               shd.exited = make(chan bool, 1)
+               go shd.processIncoming()
+       }
+       return store, nil
+}
+
+func CreateShard(store *dataStore, cnf *conf.Config, path string,
+       clearStored bool) (*shard, error) {
+       lg := store.lg
+       if clearStored {
+               fi, err := os.Stat(path)
+               if err != nil && !os.IsNotExist(err) {
+                       lg.Errorf("Failed to stat %s: %s\n", path, err.Error())
+                       return nil, err
+               }
+               if fi != nil {
+                       err = os.RemoveAll(path)
+                       if err != nil {
+                               lg.Errorf("Failed to clear existing datastore 
directory %s: %s\n",
+                                       path, err.Error())
+                               return nil, err
+                       }
+                       lg.Infof("Cleared existing datastore directory %s\n", 
path)
+               }
+       }
+       err := os.MkdirAll(path, 0777)
+       if err != nil {
+               lg.Errorf("Failed to MkdirAll(%s): %s\n", path, err.Error())
+               return nil, err
+       }
+       var shd *shard
+       openOpts := levigo.NewOptions()
+       defer openOpts.Close()
+       newlyCreated := false
+       ldb, err := levigo.Open(path, openOpts)
+       if err == nil {
+               store.lg.Infof("LevelDB opened %s\n", path)
+       } else {
+               store.lg.Debugf("LevelDB failed to open %s: %s\n", path, 
err.Error())
+               openOpts.SetCreateIfMissing(true)
+               ldb, err = levigo.Open(path, openOpts)
+               if err != nil {
+                       store.lg.Errorf("LevelDB failed to create %s: %s\n", 
path, err.Error())
+                       return nil, err
+               }
+               store.lg.Infof("Created new LevelDB instance in %s\n", path)
+               newlyCreated = true
+       }
+       defer func() {
+               if shd == nil {
+                       ldb.Close()
+               }
+       }()
+       lv, err := readLayoutVersion(store, ldb)
+       if err != nil {
+               store.lg.Errorf("Got error while reading datastore version for 
%s: %s\n",
+                       path, err.Error())
+               return nil, err
+       }
+       if newlyCreated && (lv == UNKNOWN_LAYOUT_VERSION) {
+               err = writeDataStoreVersion(store, ldb, CURRENT_LAYOUT_VERSION)
+               if err != nil {
+                       store.lg.Errorf("Got error while writing datastore 
version for %s: %s\n",
+                               path, err.Error())
+                       return nil, err
+               }
+               store.lg.Tracef("Wrote layout version %d to shard at %s.\n",
+                       CURRENT_LAYOUT_VERSION, path)
+       } else if lv != CURRENT_LAYOUT_VERSION {
+               versionName := "unknown"
+               if lv != UNKNOWN_LAYOUT_VERSION {
+                       versionName = fmt.Sprintf("%d", lv)
+               }
+               store.lg.Errorf("Can't read old datastore.  Its layout version 
is %s, but this "+
+                       "software is at layout version %d.  Please set %s to 
clear the datastore "+
+                       "on startup, or clear it manually.\n", versionName,
+                       CURRENT_LAYOUT_VERSION, conf.HTRACE_DATA_STORE_CLEAR)
+               return nil, errors.New(fmt.Sprintf("Invalid layout version: got 
%s, expected %d.",
+                       versionName, CURRENT_LAYOUT_VERSION))
+       } else {
+               store.lg.Tracef("Found layout version %d in %s.\n", lv, path)
+       }
+       spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
+       shd = &shard{store: store, ldb: ldb, path: path,
+               incoming: make(chan *common.Span, spanBufferSize)}
+       return shd, nil
+}
+
+// Read the datastore version of a leveldb instance.
+func readLayoutVersion(store *dataStore, ldb *levigo.DB) (uint32, error) {
+       buf, err := ldb.Get(store.readOpts, []byte{VERSION_KEY})
+       if err != nil {
+               return 0, err
+       }
+       if len(buf) == 0 {
+               return 0, nil
+       }
+       r := bytes.NewBuffer(buf)
+       decoder := gob.NewDecoder(r)
+       var v uint32
+       err = decoder.Decode(&v)
+       if err != nil {
+               return 0, err
+       }
+       return v, nil
+}
+
+// Write the datastore version to a shard.
+func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error {
+       w := new(bytes.Buffer)
+       encoder := gob.NewEncoder(w)
+       err := encoder.Encode(&v)
+       if err != nil {
+               return err
+       }
+       return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
+}
+
+func (store *dataStore) GetStatistics() *Statistics {
+       return store.stats.Copy()
+}
+
+// Close the DataStore.
+func (store *dataStore) Close() {
+       for idx := range store.shards {
+               store.shards[idx].Close()
+               store.shards[idx] = nil
+       }
+       if store.readOpts != nil {
+               store.readOpts.Close()
+               store.readOpts = nil
+       }
+       if store.writeOpts != nil {
+               store.writeOpts.Close()
+               store.writeOpts = nil
+       }
+       if store.lg != nil {
+               store.lg.Close()
+               store.lg = nil
+       }
+}
+
+// Get the index of the shard which stores the given spanId.
+func (store *dataStore) getShardIndex(sid common.SpanId) int {
+       return int(sid.Val() % uint64(len(store.shards)))
+}
+
+func (store *dataStore) WriteSpan(span *common.Span) {
+       store.shards[store.getShardIndex(span.Id)].incoming <- span
+}
+
+func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
+       return store.shards[store.getShardIndex(sid)].FindSpan(sid)
+}
+
+func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
+       lg := shd.store.lg
+       buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid.Val()))
+       if err != nil {
+               if strings.Index(err.Error(), "NotFound:") != -1 {
+                       return nil
+               }
+               lg.Warnf("Shard(%s): FindSpan(%s) error: %s\n",
+                       shd.path, sid.String(), err.Error())
+               return nil
+       }
+       var span *common.Span
+       span, err = shd.decodeSpan(sid, buf)
+       if err != nil {
+               lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s\n",
+                       shd.path, sid.String(), err.Error())
+               return nil
+       }
+       return span
+}
+
+func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, 
error) {
+       r := bytes.NewBuffer(buf)
+       decoder := gob.NewDecoder(r)
+       data := common.SpanData{}
+       err := decoder.Decode(&data)
+       if err != nil {
+               return nil, err
+       }
+       // Gob encoding translates empty slices to nil.  Reverse this so that 
we're always dealing with
+       // non-nil slices.
+       if data.Parents == nil {
+               data.Parents = []common.SpanId{}
+       }
+       return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
+}
+
+// Find the children of a given span id.
+func (store *dataStore) FindChildren(sid common.SpanId, lim int32) 
[]common.SpanId {
+       childIds := make([]common.SpanId, 0)
+       var err error
+
+       startIdx := store.getShardIndex(sid)
+       idx := startIdx
+       numShards := len(store.shards)
+       for {
+               if lim == 0 {
+                       break
+               }
+               shd := store.shards[idx]
+               childIds, lim, err = shd.FindChildren(sid, childIds, lim)
+               if err != nil {
+                       store.lg.Errorf("Shard(%s): FindChildren(%s) error: 
%s\n",
+                               shd.path, sid.String(), err.Error())
+               }
+               idx++
+               if idx >= numShards {
+                       idx = 0
+               }
+               if idx == startIdx {
+                       break
+               }
+       }
+       return childIds
+}
+
+type predicateData struct {
+       *common.Predicate
+       uintKey uint64
+       strKey  string
+}
+
+func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
+       p := predicateData{Predicate: pred}
+
+       // Parse the input value given to make sure it matches up with the field
+       // type.
+       switch pred.Field {
+       case common.SPAN_ID:
+               // Span IDs are sent as hex strings.
+               var id common.SpanId
+               if err := id.FromString(pred.Val); err != nil {
+                       return nil, errors.New(fmt.Sprintf("Unable to parse 
span id '%s': %s",
+                               pred.Val, err.Error()))
+               }
+               p.uintKey = id.Val()
+               break
+       case common.DESCRIPTION:
+               // Any string is valid for a description.
+               p.strKey = pred.Val
+               break
+       case common.BEGIN_TIME, common.END_TIME, common.DURATION:
+               // Parse a base-10 signed numeric field.
+               v, err := strconv.ParseInt(pred.Val, 10, 64)
+               if err != nil {
+                       return nil, errors.New(fmt.Sprintf("Unable to parse %s 
'%s': %s",
+                               pred.Field, pred.Val, err.Error()))
+               }
+               p.uintKey = s2u64(v)
+               break
+       default:
+               return nil, errors.New(fmt.Sprintf("Unknown field %s", 
pred.Field))
+       }
+
+       // Validate the predicate operation.
+       switch pred.Op {
+       case common.EQUALS, common.LESS_THAN_OR_EQUALS,
+               common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN:
+               break
+       case common.CONTAINS:
+               if p.fieldIsNumeric() {
+                       return nil, errors.New(fmt.Sprintf("Can't use CONTAINS 
on a "+
+                               "numeric field like '%s'", pred.Field))
+               }
+       default:
+               return nil, errors.New(fmt.Sprintf("Unknown predicate operation 
'%s'",
+                       pred.Op))
+       }
+
+       return &p, nil
+}
+
+// Get the index prefix for this predicate, or 0 if it is not indexed.
+func (pred *predicateData) getIndexPrefix() byte {
+       switch pred.Field {
+       case common.SPAN_ID:
+               return SPAN_ID_INDEX_PREFIX
+       case common.BEGIN_TIME:
+               return BEGIN_TIME_INDEX_PREFIX
+       case common.END_TIME:
+               return END_TIME_INDEX_PREFIX
+       case common.DURATION:
+               return DURATION_INDEX_PREFIX
+       default:
+               return INVALID_INDEX_PREFIX
+       }
+}
+
+// Returns true if the predicate type is numeric.
+func (pred *predicateData) fieldIsNumeric() bool {
+       switch pred.Field {
+       case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, 
common.DURATION:
+               return true
+       default:
+               return false
+       }
+}
+
+// Get the values that this predicate cares about for a given span.
+func (pred *predicateData) extractRelevantSpanData(span *common.Span) (uint64, 
string) {
+       switch pred.Field {
+       case common.SPAN_ID:
+               return span.Id.Val(), ""
+       case common.DESCRIPTION:
+               return 0, span.Description
+       case common.BEGIN_TIME:
+               return s2u64(span.Begin), ""
+       case common.END_TIME:
+               return s2u64(span.End), ""
+       case common.DURATION:
+               return s2u64(span.Duration()), ""
+       default:
+               panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", 
pred.Field))
+       }
+}
+
+func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) 
bool {
+       // nil is after everything.
+       if a == nil {
+               if b == nil {
+                       return false
+               }
+               return false
+       } else if b == nil {
+               return true
+       }
+       // Compare the spans according to this predicate.
+       aInt, aStr := pred.extractRelevantSpanData(a)
+       bInt, bStr := pred.extractRelevantSpanData(b)
+       if pred.fieldIsNumeric() {
+               if pred.Op.IsDescending() {
+                       return aInt > bInt
+               } else {
+                       return aInt < bInt
+               }
+       } else {
+               if pred.Op.IsDescending() {
+                       return aStr > bStr
+               } else {
+                       return aStr < bStr
+               }
+       }
+}
+
+// Returns true if the predicate is satisfied by the given span.
+func (pred *predicateData) satisfiedBy(span *common.Span) bool {
+       intVal, strVal := pred.extractRelevantSpanData(span)
+       if pred.fieldIsNumeric() {
+               switch pred.Op {
+               case common.EQUALS:
+                       return intVal == pred.uintKey
+               case common.LESS_THAN_OR_EQUALS:
+                       return intVal <= pred.uintKey
+               case common.GREATER_THAN_OR_EQUALS:
+                       return intVal >= pred.uintKey
+               case common.GREATER_THAN:
+                       return intVal > pred.uintKey
+               default:
+                       panic(fmt.Sprintf("unknown Op type %s should have been 
caught "+
+                               "during normalization", pred.Op))
+               }
+       } else {
+               switch pred.Op {
+               case common.CONTAINS:
+                       return strings.Contains(strVal, pred.strKey)
+               case common.EQUALS:
+                       return strVal == pred.strKey
+               case common.LESS_THAN_OR_EQUALS:
+                       return strVal <= pred.strKey
+               case common.GREATER_THAN_OR_EQUALS:
+                       return strVal >= pred.strKey
+               case common.GREATER_THAN:
+                       return strVal > pred.strKey
+               default:
+                       panic(fmt.Sprintf("unknown Op type %s should have been 
caught "+
+                               "during normalization", pred.Op))
+               }
+       }
+}
+
+func (pred *predicateData) createSource(store *dataStore, prev *common.Span) 
(*source, error) {
+       var ret *source
+       src := source{store: store,
+               pred:      pred,
+               iters:     make([]*levigo.Iterator, 0, len(store.shards)),
+               nexts:     make([]*common.Span, len(store.shards)),
+               numRead:   make([]int, len(store.shards)),
+               keyPrefix: pred.getIndexPrefix(),
+       }
+       if src.keyPrefix == INVALID_INDEX_PREFIX {
+               return nil, errors.New(fmt.Sprintf("Can't create source from 
unindexed "+
+                       "predicate on field %s", pred.Field))
+       }
+       defer func() {
+               if ret == nil {
+                       src.Close()
+               }
+       }()
+       for shardIdx := range store.shards {
+               shd := store.shards[shardIdx]
+               src.iters = append(src.iters, 
shd.ldb.NewIterator(store.readOpts))
+       }
+       var searchKey []byte
+       lg := store.lg
+       if prev != nil {
+               // If prev != nil, this query RPC is the continuation of a 
previous
+               // one.  The final result returned the last time is 'prev'.
+               //
+               // To avoid returning the same results multiple times, we 
adjust the
+               // predicate here.  If the predicate is on the span id field, we
+               // simply manipulate the span ID we're looking for.
+               //
+               // If the predicate is on a secondary index, we also use span 
ID, but
+               // in a slightly different way.  Since the secondary indices are
+               // organized as [type-code][8b-secondary-key][8b-span-id], 
elements
+               // with the same secondary index field are ordered by span ID.  
So we
+               // create a 17-byte key incorporating the span ID from 'prev.'
+               var startId common.SpanId
+               switch pred.Op {
+               case common.EQUALS:
+                       if pred.Field == common.SPAN_ID {
+                               // This is an annoying corner case.  There can 
only be one
+                               // result each time we do an EQUALS search for 
a span id.
+                               // Span id is the primary key for all our spans.
+                               // But for some reason someone is asking for 
another result.
+                               // We modify the query to search for the 
illegal 0 span ID,
+                               // which will never be present.
+                               lg.Debugf("Attempted to use a continuation 
token with an EQUALS "+
+                                       "SPAN_ID query. %s.  Setting search id 
= 0",
+                                       pred.Predicate.String())
+                               startId = 0
+                       } else {
+                               // When doing an EQUALS search on a secondary 
index, the
+                               // results are sorted by span id.
+                               startId = prev.Id + 1
+                       }
+               case common.LESS_THAN_OR_EQUALS:
+                       // Subtract one from the previous span id.  Since the 
previous
+                       // start ID will never be 0 (0 is an illegal span id), 
we'll never
+                       // wrap around when doing this.
+                       startId = prev.Id - 1
+               case common.GREATER_THAN_OR_EQUALS:
+                       // We can't add one to the span id, since the previous 
span ID
+                       // might be the maximum value.  So just switch over to 
using
+                       // GREATER_THAN.
+                       pred.Op = common.GREATER_THAN
+                       startId = prev.Id
+               case common.GREATER_THAN:
+                       // This one is easy.
+                       startId = prev.Id
+               default:
+                       str := fmt.Sprintf("Can't use a %v predicate as a 
source.", pred.Predicate.String())
+                       lg.Error(str + "\n")
+                       panic(str)
+               }
+               if pred.Field == common.SPAN_ID {
+                       pred.uintKey = uint64(startId)
+                       searchKey = makeKey(src.keyPrefix, uint64(startId))
+               } else {
+                       // Start where the previous query left off.  This means 
adjusting
+                       // our uintKey.
+                       pred.uintKey, _ = pred.extractRelevantSpanData(prev)
+                       searchKey = makeSecondaryKey(src.keyPrefix, 
pred.uintKey, uint64(startId))
+               }
+               if lg.TraceEnabled() {
+                       lg.Tracef("Handling continuation token %s for %s.  
startId=%d, "+
+                               "pred.uintKey=%d\n", prev, 
pred.Predicate.String(), startId,
+                               pred.uintKey)
+               }
+       } else {
+               searchKey = makeKey(src.keyPrefix, pred.uintKey)
+       }
+       for i := range src.iters {
+               src.iters[i].Seek(searchKey)
+       }
+       ret = &src
+       return ret, nil
+}
+
+// A source of spans.
+type source struct {
+       store     *dataStore
+       pred      *predicateData
+       iters     []*levigo.Iterator
+       nexts     []*common.Span
+       numRead   []int
+       keyPrefix byte
+}
+
+// Return true if this operation may require skipping the first result we get 
back from leveldb.
+func mayRequireOneSkip(op common.Op) bool {
+       switch op {
+       // When dealing with descending predicates, the first span we read 
might not satisfy
+       // the predicate, even though subsequent ones will.  This is because 
the iter.Seek()
+       // function "moves the iterator the position of the key given or, if 
the key doesn't
+       // exist, the next key that does exist in the database."  So if we're 
on that "next
+       // key" it will not satisfy the predicate, but the keys previous to it 
might.
+       case common.LESS_THAN_OR_EQUALS:
+               return true
+       // iter.Seek basically takes us to the key which is "greater than or 
equal to" some
+       // value.  Since we want greater than (not greater than or equal to) we 
may have to
+       // skip the first key.
+       case common.GREATER_THAN:
+               return true
+       }
+       return false
+}
+
+// Fill in the entry in the 'next' array for a specific shard.
+func (src *source) populateNextFromShard(shardIdx int) {
+       lg := src.store.lg
+       var err error
+       iter := src.iters[shardIdx]
+       if iter == nil {
+               lg.Debugf("Can't populate: No more entries in shard %d\n", 
shardIdx)
+               return // There are no more entries in this shard.
+       }
+       if src.nexts[shardIdx] != nil {
+               lg.Debugf("No need to populate shard %d\n", shardIdx)
+               return // We already have a valid entry for this shard.
+       }
+       for {
+               if !iter.Valid() {
+                       lg.Debugf("Can't populate: Iterator for shard %d is no 
longer valid.\n", shardIdx)
+                       break // Can't read past end of DB
+               }
+               src.numRead[shardIdx]++
+               key := iter.Key()
+               if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
+                       lg.Debugf("Can't populate: Iterator for shard %d does 
not have prefix %s\n",
+                               shardIdx, string(src.keyPrefix))
+                       break // Can't read past end of indexed section
+               }
+               var span *common.Span
+               var sid common.SpanId
+               if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
+                       // The span id maps to the span itself.
+                       sid = common.SpanId(keyToInt(key[1:]))
+                       span, err = src.store.shards[shardIdx].decodeSpan(sid, 
iter.Value())
+                       if err != nil {
+                               lg.Debugf("Internal error decoding span %s in 
shard %d: %s\n",
+                                       sid.String(), shardIdx, err.Error())
+                               break
+                       }
+               } else {
+                       // With a secondary index, we have to look up the span 
by id.
+                       sid = common.SpanId(keyToInt(key[9:]))
+                       span = src.store.shards[shardIdx].FindSpan(sid)
+                       if span == nil {
+                               lg.Debugf("Internal error rehydrating span %s 
in shard %d\n",
+                                       sid.String(), shardIdx)
+                               break
+                       }
+               }
+               if src.pred.Op.IsDescending() {
+                       iter.Prev()
+               } else {
+                       iter.Next()
+               }
+               if src.pred.satisfiedBy(span) {
+                       lg.Debugf("Populated valid span %v from shard %d.\n", 
sid, shardIdx)
+                       src.nexts[shardIdx] = span // Found valid entry
+                       return
+               } else {
+                       lg.Debugf("Span %s from shard %d does not satisfy the 
predicate.\n",
+                               sid.String(), shardIdx)
+                       if src.numRead[shardIdx] <= 1 && 
mayRequireOneSkip(src.pred.Op) {
+                               continue
+                       }
+                       // This and subsequent entries don't satisfy predicate
+                       break
+               }
+       }
+       lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
+       iter.Close()
+       src.iters[shardIdx] = nil
+}
+
+func (src *source) next() *common.Span {
+       for shardIdx := range src.iters {
+               src.populateNextFromShard(shardIdx)
+       }
+       var best *common.Span
+       bestIdx := -1
+       for shardIdx := range src.iters {
+               span := src.nexts[shardIdx]
+               if src.pred.spanPtrIsBefore(span, best) {
+                       best = span
+                       bestIdx = shardIdx
+               }
+       }
+       if bestIdx >= 0 {
+               src.nexts[bestIdx] = nil
+       }
+       return best
+}
+
+func (src *source) Close() {
+       for i := range src.iters {
+               if src.iters[i] != nil {
+                       src.iters[i].Close()
+               }
+       }
+       src.iters = nil
+}
+
+func (store *dataStore) obtainSource(preds *[]*predicateData, span 
*common.Span) (*source, error) {
+       // Read spans from the first predicate that is indexed.
+       p := *preds
+       for i := range p {
+               pred := p[i]
+               if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
+                       *preds = append(p[0:i], p[i+1:]...)
+                       return pred.createSource(store, span)
+               }
+       }
+       // If there are no predicates that are indexed, read rows in order of 
span id.
+       spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
+               Field: common.SPAN_ID,
+               Val:   "0000000000000000",
+       }
+       spanIdPredData, err := loadPredicateData(&spanIdPred)
+       if err != nil {
+               return nil, err
+       }
+       return spanIdPredData.createSource(store, span)
+}
+
+func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, 
error) {
+       lg := store.lg
+       // Parse predicate data.
+       var err error
+       preds := make([]*predicateData, len(query.Predicates))
+       for i := range query.Predicates {
+               preds[i], err = loadPredicateData(&query.Predicates[i])
+               if err != nil {
+                       return nil, err
+               }
+       }
+       // Get a source of rows.
+       var src *source
+       src, err = store.obtainSource(&preds, query.Prev)
+       if err != nil {
+               return nil, err
+       }
+       defer src.Close()
+       lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
+
+       // Filter the spans through the remaining predicates.
+       ret := make([]*common.Span, 0, 32)
+       for {
+               if len(ret) >= query.Lim {
+                       break // we hit the result size limit
+               }
+               span := src.next()
+               if span == nil {
+                       break // the source has no more spans to give
+               }
+               if lg.DebugEnabled() {
+                       lg.Debugf("src.next returned span %s\n", span.ToJson())
+               }
+               satisfied := true
+               for predIdx := range preds {
+                       if !preds[predIdx].satisfiedBy(span) {
+                               satisfied = false
+                               break
+                       }
+               }
+               if satisfied {
+                       ret = append(ret, span)
+               }
+       }
+       return ret, nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
new file mode 100644
index 0000000..4696547
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "math/rand"
+       htrace "org/apache/htrace/client"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "org/apache/htrace/test"
+       "os"
+       "sort"
+       "strings"
+       "testing"
+)
+
+// Test creating and tearing down a datastore.
+func TestCreateDatastore(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore",
+               DataDirs: make([]string, 3)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+}
+
+var SIMPLE_TEST_SPANS []common.Span = []common.Span{
+       common.Span{Id: 1,
+               SpanData: common.SpanData{
+                       Begin:       123,
+                       End:         456,
+                       Description: "getFileDescriptors",
+                       TraceId:     999,
+                       Parents:     []common.SpanId{},
+                       ProcessId:   "firstd",
+               }},
+       common.Span{Id: 2,
+               SpanData: common.SpanData{
+                       Begin:       125,
+                       End:         200,
+                       Description: "openFd",
+                       TraceId:     999,
+                       Parents:     []common.SpanId{1},
+                       ProcessId:   "secondd",
+               }},
+       common.Span{Id: 3,
+               SpanData: common.SpanData{
+                       Begin:       200,
+                       End:         456,
+                       Description: "passFd",
+                       TraceId:     999,
+                       Parents:     []common.SpanId{1},
+                       ProcessId:   "thirdd",
+               }},
+}
+
+func createSpans(spans []common.Span, store *dataStore) {
+       for idx := range spans {
+               store.WriteSpan(&spans[idx])
+       }
+       // Wait the spans to be created
+       for i := 0; i < 3; i++ {
+               <-store.WrittenSpans
+       }
+}
+
+// Test creating a datastore and adding some spans.
+func TestDatastoreWriteAndRead(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       span := ht.Store.FindSpan(1)
+       if span == nil {
+               t.Fatal()
+       }
+       if span.Id != 1 {
+               t.Fatal()
+       }
+       common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span)
+       children := ht.Store.FindChildren(1, 1)
+       if len(children) != 1 {
+               t.Fatalf("expected 1 child, but got %d\n", len(children))
+       }
+       children = ht.Store.FindChildren(1, 2)
+       if len(children) != 2 {
+               t.Fatalf("expected 2 children, but got %d\n", len(children))
+       }
+       sort.Sort(common.SpanIdSlice(children))
+       if children[0] != 2 {
+               t.Fatal()
+       }
+       if children[1] != 3 {
+               t.Fatal()
+       }
+}
+
+func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
+       expectedSpans []common.Span) {
+       spans, err := ht.Store.HandleQuery(query)
+       if err != nil {
+               t.Fatalf("First query failed: %s\n", err.Error())
+       }
+       expectedBuf := new(bytes.Buffer)
+       dec := json.NewEncoder(expectedBuf)
+       err = dec.Encode(expectedSpans)
+       if err != nil {
+               t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", 
err.Error())
+       }
+       spansBuf := new(bytes.Buffer)
+       dec = json.NewEncoder(spansBuf)
+       err = dec.Encode(spans)
+       if err != nil {
+               t.Fatalf("Failed to encode result spans to JSON: %s\n", 
err.Error())
+       }
+       t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
+               len(expectedSpans))
+       common.ExpectStrEqual(t, string(expectedBuf.Bytes()), 
string(spansBuf.Bytes()))
+}
+
+// Test queries on the datastore.
+func TestSimpleQuery(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+}
+
+func TestQueries2(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.DESCRIPTION,
+                               Val:   "getFileDescriptors",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.DESCRIPTION,
+                               Val:   "getFileDescriptors",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries3(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.CONTAINS,
+                               Field: common.DESCRIPTION,
+                               Val:   "Fd",
+                       },
+                       common.Predicate{
+                               Op:    common.GREATER_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "100",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   "0",
+                       },
+               },
+               Lim: 200,
+       }, []common.Span{})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   "2",
+                       },
+               },
+               Lim: 200,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries4(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[2]})
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN_OR_EQUALS,
+                               Field: common.DESCRIPTION,
+                               Val:   "openFd",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.DESCRIPTION,
+                               Val:   "openFd",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[2]})
+}
+
+func BenchmarkDatastoreWrites(b *testing.B) {
+       htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
+               WrittenSpans: make(chan *common.Span, b.N)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       rnd := rand.New(rand.NewSource(1))
+       allSpans := make([]*common.Span, b.N)
+       // Write many random spans.
+       for n := 0; n < b.N; n++ {
+               span := test.NewRandomSpan(rnd, allSpans[0:n])
+               ht.Store.WriteSpan(span)
+               allSpans[n] = span
+       }
+       // Wait for all the spans to be written.
+       for n := 0; n < b.N; n++ {
+               <-ht.Store.WrittenSpans
+       }
+       spansWritten := ht.Store.GetStatistics().NumSpansWritten
+       if spansWritten < uint64(b.N) {
+               b.Fatal("incorrect statistics: expected %d spans to be written, 
but only got %d",
+                       b.N, spansWritten)
+       }
+}
+
+func TestReloadDataStore(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
+               DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       dataDirs := make([]string, len(ht.DataDirs))
+       copy(dataDirs, ht.DataDirs)
+       defer func() {
+               if ht != nil {
+                       ht.Close()
+               }
+               for i := range dataDirs {
+                       os.RemoveAll(dataDirs[i])
+               }
+       }()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf())
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+
+       // Create some random trace spans.
+       NUM_TEST_SPANS := 5
+       allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+       err = hcl.WriteSpans(&common.WriteSpansReq{
+               Spans: allSpans,
+       })
+       if err != nil {
+               t.Fatalf("WriteSpans failed: %s\n", err.Error())
+       }
+
+       // Look up the spans we wrote.
+       var span *common.Span
+       for i := 0; i < NUM_TEST_SPANS; i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+
+       ht.Close()
+       ht = nil
+
+       htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore2",
+               DataDirs: dataDirs, KeepDataDirsOnClose: true}
+       ht, err = htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to re-create datastore: %s", err.Error())
+       }
+       hcl, err = htrace.NewClient(ht.ClientConf())
+       if err != nil {
+               t.Fatalf("failed to re-create client: %s", err.Error())
+       }
+
+       // Look up the spans we wrote earlier.
+       for i := 0; i < NUM_TEST_SPANS; i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+
+       // Set an old datastore version number.
+       for i := range ht.Store.shards {
+               shard := ht.Store.shards[i]
+               writeDataStoreVersion(ht.Store, shard.ldb, 
CURRENT_LAYOUT_VERSION-1)
+       }
+       ht.Close()
+       ht = nil
+
+       htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore3",
+               DataDirs: dataDirs, KeepDataDirsOnClose: true}
+       ht, err = htraceBld.Build()
+       if err == nil {
+               t.Fatalf("expected the datastore to fail to load after setting 
an " +
+                       "incorrect version.\n")
+       }
+       if !strings.Contains(err.Error(), "Invalid layout version") {
+               t.Fatal(`expected the loading error to contain "invalid layout 
version"` + "\n")
+       }
+
+       // It should work with data.store.clear set.
+       htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore4",
+               DataDirs: dataDirs, KeepDataDirsOnClose: true,
+               Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}}
+       ht, err = htraceBld.Build()
+       if err != nil {
+               t.Fatalf("expected the datastore loading to succeed after 
setting an "+
+                       "incorrect version.  But it failed with error %s\n", 
err.Error())
+       }
+}
+
+func TestQueriesWithContinuationTokens1(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: 
"TestQueriesWithContinuationTokens1",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       // Adding a prev value to this query excludes the first result that we
+       // would normally get.
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.BEGIN_TIME,
+                               Val:   "120",
+                       },
+               },
+               Lim:  5,
+               Prev: &SIMPLE_TEST_SPANS[0],
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+       // There is only one result from an EQUALS query on SPAN_ID.
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   "1",
+                       },
+               },
+               Lim:  100,
+               Prev: &SIMPLE_TEST_SPANS[0],
+       }, []common.Span{})
+
+       // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the
+       // span we pass as a continuation token. (Primary index edition).
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   "2",
+                       },
+               },
+               Lim:  100,
+               Prev: &SIMPLE_TEST_SPANS[1],
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+       // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back 
the
+       // span we pass as a continuation token. (Secondary index edition).
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.DURATION,
+                               Val:   "0",
+                       },
+               },
+               Lim:  100,
+               Prev: &SIMPLE_TEST_SPANS[1],
+       }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]})
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
new file mode 100644
index 0000000..a53380e
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bufio"
+       "bytes"
+       "encoding/binary"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "github.com/ugorji/go/codec"
+       "io"
+       "net"
+       "net/rpc"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+)
+
+// Handles HRPC calls
+type HrpcHandler struct {
+       lg    *common.Logger
+       store *dataStore
+}
+
+// The HRPC server
+type HrpcServer struct {
+       *rpc.Server
+       hand     *HrpcHandler
+       listener net.Listener
+}
+
+// Codec which encodes HRPC data via JSON
+type HrpcServerCodec struct {
+       lg     *common.Logger
+       conn   net.Conn
+       length uint32
+}
+
+func asJson(val interface{}) string {
+       js, err := json.Marshal(val)
+       if err != nil {
+               return "encoding error: " + err.Error()
+       }
+       return string(js)
+}
+
+func createErrAndWarn(lg *common.Logger, val string) error {
+       return createErrAndLog(lg, val, common.WARN)
+}
+
+func createErrAndLog(lg *common.Logger, val string, level common.Level) error {
+       lg.Write(level, val+"\n")
+       return errors.New(val)
+}
+
+func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
+       hdr := common.HrpcRequestHeader{}
+       if cdc.lg.TraceEnabled() {
+               cdc.lg.Tracef("Reading HRPC request header from %s\n", 
cdc.conn.RemoteAddr())
+       }
+       err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
+       if err != nil {
+               level := common.WARN
+               if err == io.EOF {
+                       level = common.DEBUG
+               }
+               return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading 
header bytes: %s",
+                       err.Error()), level)
+       }
+       if cdc.lg.TraceEnabled() {
+               cdc.lg.Tracef("Read HRPC request header %s from %s\n",
+                       asJson(&hdr), cdc.conn.RemoteAddr())
+       }
+       if hdr.Magic != common.HRPC_MAGIC {
+               return createErrAndWarn(cdc.lg, fmt.Sprintf("Invalid request 
header: expected "+
+                       "magic number of 0x%04x, but got 0x%04x", 
common.HRPC_MAGIC, hdr.Magic))
+       }
+       if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
+               return createErrAndWarn(cdc.lg, fmt.Sprintf("Length prefix was 
too long.  "+
+                       "Maximum length is %d, but we got %d.", 
common.MAX_HRPC_BODY_LENGTH,
+                       hdr.Length))
+       }
+       req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+       if req.ServiceMethod == "" {
+               return createErrAndWarn(cdc.lg, fmt.Sprintf("Unknown MethodID 
code 0x%04x",
+                       hdr.MethodId))
+       }
+       req.Seq = hdr.Seq
+       cdc.length = hdr.Length
+       return nil
+}
+
+func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+       if cdc.lg.TraceEnabled() {
+               cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
+                       cdc.length, cdc.conn.RemoteAddr())
+       }
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh)
+       err := dec.Decode(body)
+       if err != nil {
+               return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read 
request "+
+                       "body from %s: %s", cdc.conn.RemoteAddr(), err.Error()))
+       }
+       if cdc.lg.TraceEnabled() {
+               cdc.lg.Tracef("Read body from %s: %s\n",
+                       cdc.conn.RemoteAddr(), asJson(&body))
+       }
+       return nil
+}
+
+var EMPTY []byte = make([]byte, 0)
+
+func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) 
error {
+       var err error
+       buf := EMPTY
+       if msg != nil {
+               mh := new(codec.MsgpackHandle)
+               mh.WriteExt = true
+               w := bytes.NewBuffer(make([]byte, 0, 128))
+               enc := codec.NewEncoder(w, mh)
+               err := enc.Encode(msg)
+               if err != nil {
+                       return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to 
marshal "+
+                               "response message: %s", err.Error()))
+               }
+               buf = w.Bytes()
+       }
+       hdr := common.HrpcResponseHeader{}
+       hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
+       hdr.Seq = resp.Seq
+       hdr.ErrLength = uint32(len(resp.Error))
+       hdr.Length = uint32(len(buf))
+       writer := bufio.NewWriterSize(cdc.conn, 256)
+       err = binary.Write(writer, binary.LittleEndian, &hdr)
+       if err != nil {
+               return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write 
response "+
+                       "header: %s", err.Error()))
+       }
+       if hdr.ErrLength > 0 {
+               _, err = io.WriteString(writer, resp.Error)
+               if err != nil {
+                       return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to 
write error "+
+                               "string: %s", err.Error()))
+               }
+       }
+       if hdr.Length > 0 {
+               var length int
+               length, err = writer.Write(buf)
+               if err != nil {
+                       return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to 
write response "+
+                               "message: %s", err.Error()))
+               }
+               if uint32(length) != hdr.Length {
+                       return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to 
write all of "+
+                               "response message: %s", err.Error()))
+               }
+       }
+       err = writer.Flush()
+       if err != nil {
+               return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write 
the response "+
+                       "bytes: %s", err.Error()))
+       }
+       return nil
+}
+
+func (cdc *HrpcServerCodec) Close() error {
+       return cdc.conn.Close()
+}
+
+func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
+       resp *common.WriteSpansResp) (err error) {
+       hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s).  "+
+               "defaultPid = %s\n", len(req.Spans), req.DefaultPid)
+       for i := range req.Spans {
+               span := req.Spans[i]
+               if span.ProcessId == "" {
+                       span.ProcessId = req.DefaultPid
+               }
+               if hand.lg.TraceEnabled() {
+                       hand.lg.Tracef("writing span %d: %s\n", i, 
span.ToJson())
+               }
+               hand.store.WriteSpan(span)
+       }
+       return nil
+}
+
+func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) 
{
+       lg := common.NewLogger("hrpc", cnf)
+       hsv := &HrpcServer{
+               Server: rpc.NewServer(),
+               hand: &HrpcHandler{
+                       lg:    lg,
+                       store: store,
+               },
+       }
+       var err error
+       hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
+       if err != nil {
+               return nil, err
+       }
+       hsv.Server.Register(hsv.hand)
+       go hsv.run()
+       lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String())
+       return hsv, nil
+}
+
+func (hsv *HrpcServer) run() {
+       lg := hsv.hand.lg
+       for {
+               conn, err := hsv.listener.Accept()
+               if err != nil {
+                       lg.Errorf("HRPC Accept error: %s\n", err.Error())
+                       continue
+               }
+               if lg.TraceEnabled() {
+                       lg.Tracef("Accepted HRPC connection from %s\n", 
conn.RemoteAddr())
+               }
+               go hsv.ServeCodec(&HrpcServerCodec{
+                       lg:   lg,
+                       conn: conn,
+               })
+       }
+}
+
+func (hsv *HrpcServer) Addr() net.Addr {
+       return hsv.listener.Addr()
+}
+
+func (hsv *HrpcServer) Close() {
+       hsv.listener.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/42b2f6a2/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
new file mode 100644
index 0000000..64da457
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "net"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "os"
+       "strings"
+       "time"
+)
+
+var RELEASE_VERSION string
+var GIT_VERSION string
+
+const USAGE = `htraced: the HTrace server daemon.
+
+htraced receives trace spans sent from HTrace clients.  It exposes a REST
+interface which others can query.  It also runs a web server with a graphical
+user interface.  htraced stores its span data in levelDB files on the local
+disks.
+
+Usage:
+--help: this help message
+
+-Dk=v: set configuration key 'k' to value 'v'
+For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost,
+port 8080.
+
+-Dk: set configuration key 'k' to 'true'
+
+Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME 
+ `
+configuration file.  We find this file by searching the paths in the 
+` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate 
way
+of setting configuration when launching the daemon.
+`
+
+func main() {
+       for idx := range os.Args {
+               arg := os.Args[idx]
+               if strings.HasPrefix(arg, "--h") || strings.HasPrefix(arg, 
"-h") {
+                       fmt.Fprintf(os.Stderr, USAGE)
+                       os.Exit(0)
+               }
+       }
+       cnf := common.LoadApplicationConfig()
+       common.InstallSignalHandlers(cnf)
+       lg := common.NewLogger("main", cnf)
+       defer lg.Close()
+       store, err := CreateDataStore(cnf, nil)
+       if err != nil {
+               lg.Errorf("Error creating datastore: %s\n", err.Error())
+               os.Exit(1)
+       }
+       var rsv *RestServer
+       rsv, err = CreateRestServer(cnf, store)
+       if err != nil {
+               lg.Errorf("Error creating REST server: %s\n", err.Error())
+               os.Exit(1)
+       }
+       var hsv *HrpcServer
+       if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+               hsv, err = CreateHrpcServer(cnf, store)
+               if err != nil {
+                       lg.Errorf("Error creating HRPC server: %s\n", 
err.Error())
+                       os.Exit(1)
+               }
+       } else {
+               lg.Infof("Not starting HRPC server because no value was given 
for %s.\n",
+                       conf.HTRACE_HRPC_ADDRESS)
+       }
+       naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
+       if naddr != "" {
+               notif := StartupNotification{
+                       HttpAddr:  rsv.Addr().String(),
+                       ProcessId: os.Getpid(),
+               }
+               if hsv != nil {
+                       notif.HrpcAddr = hsv.Addr().String()
+               }
+               err = sendStartupNotification(naddr, &notif)
+               if err != nil {
+                       fmt.Fprintf(os.Stderr, "Failed to send startup 
notification: "+
+                               "%s\n", err.Error())
+                       os.Exit(1)
+               }
+       }
+       for {
+               time.Sleep(time.Duration(10) * time.Hour)
+       }
+}
+
+// A startup notification message that we optionally send on startup.
+// Used by unit tests.
+type StartupNotification struct {
+       HttpAddr  string
+       HrpcAddr  string
+       ProcessId int
+}
+
+func sendStartupNotification(naddr string, notif *StartupNotification) error {
+       conn, err := net.Dial("tcp", naddr)
+       if err != nil {
+               return err
+       }
+       defer func() {
+               if conn != nil {
+                       conn.Close()
+               }
+       }()
+       var buf []byte
+       buf, err = json.Marshal(notif)
+       if err != nil {
+               return err
+       }
+       _, err = conn.Write(buf)
+       conn.Close()
+       conn = nil
+       return nil
+}

Reply via email to