http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/conf/config_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/conf/config_test.go 
b/htrace-htraced/go/src/htrace/conf/config_test.go
new file mode 100644
index 0000000..bdab187
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/conf/config_test.go
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package conf
+
+import (
+       "bytes"
+       "os"
+       "strings"
+       "testing"
+)
+
+// Test that parsing command-line arguments of the form -Dfoo=bar works.
+func TestParseArgV(t *testing.T) {
+       t.Parallel()
+       argv := []string{"-Dfoo=bar", "-Dbaz=123", "-DsillyMode", "-Dlog.path="}
+       bld := &Builder{Argv: argv,
+               Defaults: map[string]string{
+                       "log.path": "/log/path/default",
+               }}
+       cnf, err := bld.Build()
+       if err != nil {
+               t.Fatal()
+       }
+       if "bar" != cnf.Get("foo") {
+               t.Fatal()
+       }
+       if 123 != cnf.GetInt("baz") {
+               t.Fatal()
+       }
+       if !cnf.GetBool("sillyMode") {
+               t.Fatal()
+       }
+       if cnf.GetBool("otherSillyMode") {
+               t.Fatal()
+       }
+       if "" != cnf.Get("log.path") {
+               t.Fatal()
+       }
+}
+
+// Test that default values work.
+// Defaults are used only when the configuration option is not present or 
can't be parsed.
+func TestDefaults(t *testing.T) {
+       t.Parallel()
+       argv := []string{"-Dfoo=bar", "-Dbaz=invalidNumber"}
+       defaults := map[string]string{
+               "foo":  "notbar",
+               "baz":  "456",
+               "foo2": "4611686018427387904",
+       }
+       bld := &Builder{Argv: argv, Defaults: defaults}
+       cnf, err := bld.Build()
+       if err != nil {
+               t.Fatal()
+       }
+       if "bar" != cnf.Get("foo") {
+               t.Fatal()
+       }
+       if 456 != cnf.GetInt("baz") {
+               t.Fatal()
+       }
+       if 4611686018427387904 != cnf.GetInt64("foo2") {
+               t.Fatal()
+       }
+}
+
+// Test that we can parse our XML configuration file.
+func TestXmlConfigurationFile(t *testing.T) {
+       t.Parallel()
+       xml := `
+<?xml version="1.0"?>
+<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>
+<configuration>
+  <property>
+    <name>foo.bar</name>
+    <value>123</value>
+  </property>
+  <property>
+    <name>foo.baz</name>
+    <value>xmlValue</value>
+  </property>
+  <!--<property>
+    <name>commented.out</name>
+    <value>stuff</value>
+  </property>-->
+</configuration>
+`
+       xmlReader := strings.NewReader(xml)
+       argv := []string{"-Dfoo.bar=456"}
+       defaults := map[string]string{
+               "foo.bar":     "789",
+               "cmdline.opt": "4611686018427387904",
+       }
+       bld := &Builder{Argv: argv, Defaults: defaults, Reader: xmlReader}
+       cnf, err := bld.Build()
+       if err != nil {
+               t.Fatal()
+       }
+       // The command-line argument takes precedence over the XML and the 
defaults.
+       if 456 != cnf.GetInt("foo.bar") {
+               t.Fatal()
+       }
+       if "xmlValue" != cnf.Get("foo.baz") {
+               t.Fatalf("foo.baz = %s", cnf.Get("foo.baz"))
+       }
+       if "" != cnf.Get("commented.out") {
+               t.Fatal()
+       }
+       if 4611686018427387904 != cnf.GetInt64("cmdline.opt") {
+               t.Fatal()
+       }
+}
+
+// Test our handling of the HTRACE_CONF_DIR environment variable.
+func TestGetHTracedConfDirs(t *testing.T) {
+       os.Setenv("HTRACED_CONF_DIR", "")
+       dlog := new(bytes.Buffer)
+       dirs := getHTracedConfDirs(dlog)
+       if len(dirs) != 1 || dirs[0] != getDefaultHTracedConfDir() {
+               t.Fatal()
+       }
+       os.Setenv("HTRACED_CONF_DIR", "/foo/bar:/baz")
+       dirs = getHTracedConfDirs(dlog)
+       if len(dirs) != 2 || dirs[0] != "/foo/bar" || dirs[1] != "/baz" {
+               t.Fatal()
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/conf/xml.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/conf/xml.go 
b/htrace-htraced/go/src/htrace/conf/xml.go
new file mode 100644
index 0000000..de14bc5
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/conf/xml.go
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package conf
+
+import (
+       "encoding/xml"
+       "io"
+       "log"
+)
+
+type configuration struct {
+       Properties []propertyXml `xml:"property"`
+}
+
+type propertyXml struct {
+       Name  string `xml:"name"`
+       Value string `xml:"value"`
+}
+
+// Parse an XML configuration file.
+func parseXml(reader io.Reader, m map[string]string) error {
+       dec := xml.NewDecoder(reader)
+       configurationXml := configuration{}
+       err := dec.Decode(&configurationXml)
+       if err != nil {
+               return err
+       }
+       props := configurationXml.Properties
+       for p := range props {
+               key := props[p].Name
+               value := props[p].Value
+               if key == "" {
+                       log.Println("Warning: ignoring element with missing or 
empty <name>.")
+                       continue
+               }
+               if value == "" {
+                       log.Println("Warning: ignoring element with key " + key 
+ " with missing or empty <value>.")
+                       continue
+               }
+               //log.Printf("setting %s to %s\n", key, value)
+               m[key] = value
+       }
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/client_test.go 
b/htrace-htraced/go/src/htrace/htraced/client_test.go
new file mode 100644
index 0000000..6b50097
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/client_test.go
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "fmt"
+       "github.com/ugorji/go/codec"
+       htrace "htrace/client"
+       "htrace/common"
+       "htrace/conf"
+       "htrace/test"
+       "math"
+       "math/rand"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+)
+
+func TestClientGetServerVersion(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerVersion",
+               DataDirs: make([]string, 2)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       defer hcl.Close()
+       _, err = hcl.GetServerVersion()
+       if err != nil {
+               t.Fatalf("failed to call GetServerVersion: %s", err.Error())
+       }
+}
+
+func TestClientGetServerDebugInfo(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerDebugInfo",
+               DataDirs: make([]string, 2)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       defer hcl.Close()
+       debugInfo, err := hcl.GetServerDebugInfo()
+       if err != nil {
+               t.Fatalf("failed to call GetServerDebugInfo: %s", err.Error())
+       }
+       if debugInfo.StackTraces == "" {
+               t.Fatalf(`debugInfo.StackTraces == ""`)
+       }
+       if debugInfo.GCStats == "" {
+               t.Fatalf(`debugInfo.GCStats == ""`)
+       }
+}
+
+func createRandomTestSpans(amount int) common.SpanSlice {
+       rnd := rand.New(rand.NewSource(2))
+       allSpans := make(common.SpanSlice, amount)
+       allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0])
+       for i := 1; i < amount; i++ {
+               allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i])
+       }
+       allSpans[1].SpanData.Parents = 
[]common.SpanId{common.SpanId(allSpans[0].Id)}
+       return allSpans
+}
+
+func TestClientOperations(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
+               DataDirs:     make([]string, 2),
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       defer hcl.Close()
+
+       // Create some random trace spans.
+       NUM_TEST_SPANS := 30
+       allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+
+       // Write half of the spans to htraced via the client.
+       err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2])
+       if err != nil {
+               t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
+                       err.Error())
+       }
+       ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS / 2))
+
+       // Look up the first half of the spans.  They should be found.
+       var span *common.Span
+       for i := 0; i < NUM_TEST_SPANS/2; i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+
+       // Look up the second half of the spans.  They should not be found.
+       for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               if span != nil {
+                       t.Fatalf("Unexpectedly found a span we never write to "+
+                               "the server: FindSpan(%d) succeeded\n", i)
+               }
+       }
+
+       // Test FindChildren
+       childSpan := allSpans[1]
+       parentId := childSpan.Parents[0]
+       var children []common.SpanId
+       children, err = hcl.FindChildren(parentId, 1)
+       if err != nil {
+               t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error())
+       }
+       if len(children) != 1 {
+               t.Fatalf("FindChildren(%s) returned an invalid number of "+
+                       "children: expected %d, got %d\n", parentId, 1, 
len(children))
+       }
+       if !children[0].Equal(childSpan.Id) {
+               t.Fatalf("FindChildren(%s) returned an invalid child id: 
expected %s, "+
+                       " got %s\n", parentId, childSpan.Id, children[0])
+       }
+
+       // Test FindChildren on a span that has no children
+       childlessSpan := allSpans[NUM_TEST_SPANS/2]
+       children, err = hcl.FindChildren(childlessSpan.Id, 10)
+       if err != nil {
+               t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, 
err.Error())
+       }
+       if len(children) != 0 {
+               t.Fatalf("FindChildren(%d) returned an invalid number of "+
+                       "children: expected %d, got %d\n", childlessSpan.Id, 0, 
len(children))
+       }
+
+       // Test Query
+       var query common.Query
+       query = common.Query{Lim: 10}
+       spans, err := hcl.Query(&query)
+       if err != nil {
+               t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error())
+       }
+       if len(spans) != 10 {
+               t.Fatalf("Query({lim: %d}) returned an invalid number of "+
+                       "children: expected %d, got %d\n", 10, 10, len(spans))
+       }
+}
+
+func TestDumpAll(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
+               DataDirs:     make([]string, 2),
+               WrittenSpans: common.NewSemaphore(0),
+               Cnf: map[string]string{
+                       conf.HTRACE_LOG_LEVEL: "INFO",
+               },
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       defer hcl.Close()
+
+       NUM_TEST_SPANS := 100
+       allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+       sort.Sort(allSpans)
+       err = hcl.WriteSpans(allSpans)
+       if err != nil {
+               t.Fatalf("WriteSpans failed: %s\n", err.Error())
+       }
+       ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
+       out := make(chan *common.Span, NUM_TEST_SPANS)
+       var dumpErr error
+       go func() {
+               dumpErr = hcl.DumpAll(3, out)
+       }()
+       var numSpans int
+       nextLogTime := time.Now().Add(time.Millisecond * 5)
+       for {
+               span, channelOpen := <-out
+               if !channelOpen {
+                       break
+               }
+               common.ExpectSpansEqual(t, allSpans[numSpans], span)
+               numSpans++
+               if testing.Verbose() {
+                       now := time.Now()
+                       if !now.Before(nextLogTime) {
+                               nextLogTime = now
+                               nextLogTime = nextLogTime.Add(time.Millisecond 
* 5)
+                               fmt.Printf("read back %d span(s)...\n", 
numSpans)
+                       }
+               }
+       }
+       if numSpans != len(allSpans) {
+               t.Fatalf("expected to read %d spans... but only read %d\n",
+                       len(allSpans), numSpans)
+       }
+       if dumpErr != nil {
+               t.Fatalf("got dump error %s\n", dumpErr.Error())
+       }
+}
+
+const EXAMPLE_CONF_KEY = "example.conf.key"
+const EXAMPLE_CONF_VALUE = "foo.bar.baz"
+
+func TestClientGetServerConf(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf",
+               Cnf: map[string]string{
+                       EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE,
+               },
+               DataDirs: make([]string, 2)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       defer hcl.Close()
+       serverCnf, err2 := hcl.GetServerConf()
+       if err2 != nil {
+               t.Fatalf("failed to call GetServerConf: %s", err2.Error())
+       }
+       if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE {
+               t.Fatalf("unexpected value for %s: %s",
+                       EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
+       }
+}
+
+const TEST_NUM_HRPC_HANDLERS = 2
+
+const TEST_NUM_WRITESPANS = 4
+
+// Tests that HRPC limits the number of simultaneous connections being 
processed.
+func TestHrpcAdmissionsControl(t *testing.T) {
+       var wg sync.WaitGroup
+       wg.Add(TEST_NUM_WRITESPANS)
+       var numConcurrentHrpcCalls int32
+       testHooks := &hrpcTestHooks{
+               HandleAdmission: func() {
+                       defer wg.Done()
+                       n := atomic.AddInt32(&numConcurrentHrpcCalls, 1)
+                       if n > TEST_NUM_HRPC_HANDLERS {
+                               t.Fatalf("The number of concurrent HRPC calls 
went above "+
+                                       "%d: it's at %d\n", 
TEST_NUM_HRPC_HANDLERS, n)
+                       }
+                       time.Sleep(1 * time.Millisecond)
+                       n = atomic.AddInt32(&numConcurrentHrpcCalls, -1)
+                       if n >= TEST_NUM_HRPC_HANDLERS {
+                               t.Fatalf("The number of concurrent HRPC calls 
went above "+
+                                       "%d: it was at %d\n", 
TEST_NUM_HRPC_HANDLERS, n+1)
+                       }
+               },
+       }
+       htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl",
+               DataDirs: make([]string, 2),
+               Cnf: map[string]string{
+                       conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", 
TEST_NUM_HRPC_HANDLERS),
+               },
+               WrittenSpans:  common.NewSemaphore(0),
+               HrpcTestHooks: testHooks,
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       // Create some random trace spans.
+       allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
+       for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
+               go func(i int) {
+                       err = hcl.WriteSpans(allSpans[i : i+1])
+                       if err != nil {
+                               t.Fatalf("WriteSpans failed: %s\n", err.Error())
+                       }
+               }(iter)
+       }
+       wg.Wait()
+       ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS))
+}
+
+// Tests that HRPC I/O timeouts work.
+func TestHrpcIoTimeout(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout",
+               DataDirs: make([]string, 2),
+               Cnf: map[string]string{
+                       conf.HTRACE_NUM_HRPC_HANDLERS:  fmt.Sprintf("%d", 
TEST_NUM_HRPC_HANDLERS),
+                       conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1",
+               },
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       finishClient := make(chan interface{})
+       defer func() {
+               // Close the finishClient channel, if it hasn't already been 
closed.
+               defer func() { recover() }()
+               close(finishClient)
+       }()
+       testHooks := &htrace.TestHooks{
+               HandleWriteRequestBody: func() {
+                       <-finishClient
+               },
+       }
+       hcl, err = htrace.NewClient(ht.ClientConf(), testHooks)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       // Create some random trace spans.
+       allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
+       var wg sync.WaitGroup
+       wg.Add(TEST_NUM_WRITESPANS)
+       for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
+               go func(i int) {
+                       defer wg.Done()
+                       // Ignore the error return because there are internal 
retries in
+                       // the client which will make this succeed eventually, 
usually.
+                       // Keep in mind that we only block until we have seen
+                       // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- 
after that,
+                       // we let requests through so that the test can exit 
cleanly.
+                       hcl.WriteSpans(allSpans[i : i+1])
+               }(iter)
+       }
+       for {
+               if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS {
+                       break
+               }
+               time.Sleep(1000 * time.Nanosecond)
+       }
+       close(finishClient)
+       wg.Wait()
+}
+
+func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) {
+       htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
+               Cnf: map[string]string{
+                       conf.HTRACE_LOG_LEVEL:         "INFO",
+                       conf.HTRACE_NUM_HRPC_HANDLERS: "20",
+               },
+               WrittenSpans: common.NewSemaphore(int64(1 - N)),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       rnd := rand.New(rand.NewSource(1))
+       allSpans := make([]*common.Span, N)
+       for n := 0; n < N; n++ {
+               allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
+       }
+       // Determine how many calls to WriteSpans we should make.  Each 
writeSpans
+       // message should be small enough so that it doesn't exceed the max RPC
+       // body length limit.  TODO: a production-quality golang client would do
+       // this internally rather than needing us to do it here in the unit 
test.
+       bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
+       reqs := make([][]*common.Span, 0, 4)
+       curReq := -1
+       curReqLen := bodyLen
+       var curReqSpans uint32
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       var mbuf [8192]byte
+       buf := mbuf[:0]
+       enc := codec.NewEncoderBytes(&buf, mh)
+       for n := 0; n < N; n++ {
+               span := allSpans[n]
+               if (curReqSpans >= maxSpansPerRpc) ||
+                       (curReqLen >= bodyLen) {
+                       reqs = append(reqs, make([]*common.Span, 0, 16))
+                       curReqLen = 0
+                       curReq++
+                       curReqSpans = 0
+               }
+               buf = mbuf[:0]
+               enc.ResetBytes(&buf)
+               err := enc.Encode(span)
+               if err != nil {
+                       panic(fmt.Sprintf("Error encoding span %s: %s\n",
+                               span.String(), err.Error()))
+               }
+               bufLen := len(buf)
+               if bufLen > (bodyLen / 5) {
+                       panic(fmt.Sprintf("Span too long at %d bytes\n", 
bufLen))
+               }
+               curReqLen += bufLen
+               reqs[curReq] = append(reqs[curReq], span)
+               curReqSpans++
+       }
+       ht.Store.lg.Infof("num spans: %d.  num WriteSpansReq calls: %d\n", N, 
len(reqs))
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               panic(fmt.Sprintf("failed to create client: %s", err.Error()))
+       }
+       defer hcl.Close()
+
+       // Reset the timer to avoid including the time required to create new
+       // random spans in the benchmark total.
+       if b != nil {
+               b.ResetTimer()
+       }
+
+       // Write many random spans.
+       for reqIdx := range reqs {
+               go func(i int) {
+                       err = hcl.WriteSpans(reqs[i])
+                       if err != nil {
+                               panic(fmt.Sprintf("failed to send WriteSpans 
request %d: %s",
+                                       i, err.Error()))
+                       }
+               }(reqIdx)
+       }
+       // Wait for all the spans to be written.
+       ht.Store.WrittenSpans.Wait()
+}
+
+// This is a test of how quickly we can create new spans via WriteSpans RPCs.
+// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore.
+// Unlike that benchmark, it sends the spans via RPC.
+// Suggested flags for running this:
+// -tags unsafe -cpu 16 -benchtime=1m
+func BenchmarkWriteSpans(b *testing.B) {
+       doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b)
+}
+
+func TestWriteSpansRpcs(t *testing.T) {
+       doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil)
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/htrace/htraced/datastore.go
new file mode 100644
index 0000000..26531af
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/datastore.go
@@ -0,0 +1,1339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "encoding/hex"
+       "errors"
+       "fmt"
+       "github.com/jmhodges/levigo"
+       "github.com/ugorji/go/codec"
+       "htrace/common"
+       "htrace/conf"
+       "strconv"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+//
+// The data store code for HTraced.
+//
+// This code stores the trace spans.  We use levelDB here so that we don't 
have to store everything
+// in memory at all times.  The data is sharded across multiple levelDB 
databases in multiple
+// directories.  Normally, these multiple directories will be on multiple disk 
drives.
+//
+// The main emphasis in the HTraceD data store is on quickly and efficiently 
storing trace span data
+// coming from many daemons.  Durability is not as big a concern as in some 
data stores, since
+// losing a little bit of trace data if htraced goes down is not critical.  We 
use msgpack
+// for serialization.  We assume that there will be many more writes than 
reads.
+//
+// Schema
+// w -> ShardInfo
+// s[8-byte-big-endian-sid] -> SpanData
+// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
+// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
+// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
+// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
+//
+// Note that span IDs are unsigned 64-bit numbers.
+// Begin times, end times, and durations are signed 64-bit numbers.
+// In order to get LevelDB to properly compare the signed 64-bit quantities,
+// we flip the highest bit.  This way, we can get leveldb to view negative
+// quantities as less than non-negative ones.  This also means that we can do
+// all queries using unsigned 64-bit math, rather than having to special-case
+// the signed fields.
+//
+
+var EMPTY_BYTE_BUF []byte = []byte{}
+
+const SPAN_ID_INDEX_PREFIX = 's'
+const BEGIN_TIME_INDEX_PREFIX = 'b'
+const END_TIME_INDEX_PREFIX = 'e'
+const DURATION_INDEX_PREFIX = 'd'
+const PARENT_ID_INDEX_PREFIX = 'p'
+const INVALID_INDEX_PREFIX = 0
+
+// The maximum span expiry time, in milliseconds.
+// For all practical purposes this is "never" since it's more than a million 
years.
+const MAX_SPAN_EXPIRY_MS = 0x7ffffffffffffff
+
+type IncomingSpan struct {
+       // The address that the span was sent from.
+       Addr string
+
+       // The span.
+       *common.Span
+
+       // Serialized span data
+       SpanDataBytes []byte
+}
+
+// A single directory containing a levelDB instance.
+type shard struct {
+       // The data store that this shard is part of
+       store *dataStore
+
+       // The LevelDB instance.
+       ldb *levigo.DB
+
+       // The path to the leveldb directory this shard is managing.
+       path string
+
+       // Incoming requests to write Spans.
+       incoming chan []*IncomingSpan
+
+       // A channel for incoming heartbeats
+       heartbeats chan interface{}
+
+       // Tracks whether the shard goroutine has exited.
+       exited sync.WaitGroup
+}
+
+// Process incoming spans for a shard.
+func (shd *shard) processIncoming() {
+       lg := shd.store.lg
+       defer func() {
+               lg.Infof("Shard processor for %s exiting.\n", shd.path)
+               shd.exited.Done()
+       }()
+       for {
+               select {
+               case spans := <-shd.incoming:
+                       if spans == nil {
+                               return
+                       }
+                       totalWritten := 0
+                       totalDropped := 0
+                       for spanIdx := range spans {
+                               err := shd.writeSpan(spans[spanIdx])
+                               if err != nil {
+                                       lg.Errorf("Shard processor for %s got 
fatal error %s.\n",
+                                               shd.path, err.Error())
+                                       totalDropped++
+                               } else {
+                                       if lg.TraceEnabled() {
+                                               lg.Tracef("Shard processor for 
%s wrote span %s.\n",
+                                                       shd.path, 
spans[spanIdx].ToJson())
+                                       }
+                                       totalWritten++
+                               }
+                       }
+                       shd.store.msink.UpdatePersisted(spans[0].Addr, 
totalWritten, totalDropped)
+                       if shd.store.WrittenSpans != nil {
+                               lg.Debugf("Shard %s incrementing WrittenSpans 
by %d\n", shd.path, len(spans))
+                               shd.store.WrittenSpans.Posts(int64(len(spans)))
+                       }
+               case <-shd.heartbeats:
+                       lg.Tracef("Shard processor for %s handling 
heartbeat.\n", shd.path)
+                       shd.pruneExpired()
+               }
+       }
+}
+
+func (shd *shard) pruneExpired() {
+       lg := shd.store.rpr.lg
+       src, err := CreateReaperSource(shd)
+       if err != nil {
+               lg.Errorf("Error creating reaper source for shd(%s): %s\n",
+                       shd.path, err.Error())
+               return
+       }
+       var totalReaped uint64
+       defer func() {
+               src.Close()
+               if totalReaped > 0 {
+                       atomic.AddUint64(&shd.store.rpr.ReapedSpans, 
totalReaped)
+               }
+       }()
+       urdate := s2u64(shd.store.rpr.GetReaperDate())
+       for {
+               span := src.next()
+               if span == nil {
+                       lg.Debugf("After reaping %d span(s), no more found in 
shard %s "+
+                               "to reap.\n", totalReaped, shd.path)
+                       return
+               }
+               begin := s2u64(span.Begin)
+               if begin >= urdate {
+                       lg.Debugf("After reaping %d span(s), the remaining 
spans in "+
+                               "shard %s are new enough to be kept\n",
+                               totalReaped, shd.path)
+                       return
+               }
+               err = shd.DeleteSpan(span)
+               if err != nil {
+                       lg.Errorf("Error deleting span %s from shd(%s): %s\n",
+                               span.String(), shd.path, err.Error())
+                       return
+               }
+               if lg.TraceEnabled() {
+                       lg.Tracef("Reaped span %s from shard %s\n", 
span.String(), shd.path)
+               }
+               totalReaped++
+       }
+}
+
+// Delete a span from the shard.  Note that leveldb may retain the data until
+// compaction(s) remove it.
+func (shd *shard) DeleteSpan(span *common.Span) error {
+       batch := levigo.NewWriteBatch()
+       defer batch.Close()
+       primaryKey :=
+               append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
+       batch.Delete(primaryKey)
+       for parentIdx := range span.Parents {
+               key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
+                       span.Parents[parentIdx].Val()...), span.Id.Val()...)
+               batch.Delete(key)
+       }
+       beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
+               u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
+       batch.Delete(beginTimeKey)
+       endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
+               u64toSlice(s2u64(span.End))...), span.Id.Val()...)
+       batch.Delete(endTimeKey)
+       durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
+               u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
+       batch.Delete(durationKey)
+       err := shd.ldb.Write(shd.store.writeOpts, batch)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+// Convert a signed 64-bit number into an unsigned 64-bit number.  We flip the
+// highest bit, so that negative input values map to unsigned numbers which are
+// less than non-negative input values.
+func s2u64(val int64) uint64 {
+       ret := uint64(val)
+       ret ^= 0x8000000000000000
+       return ret
+}
+
+func u64toSlice(val uint64) []byte {
+       return []byte{
+               byte(0xff & (val >> 56)),
+               byte(0xff & (val >> 48)),
+               byte(0xff & (val >> 40)),
+               byte(0xff & (val >> 32)),
+               byte(0xff & (val >> 24)),
+               byte(0xff & (val >> 16)),
+               byte(0xff & (val >> 8)),
+               byte(0xff & (val >> 0))}
+}
+
+func (shd *shard) writeSpan(ispan *IncomingSpan) error {
+       batch := levigo.NewWriteBatch()
+       defer batch.Close()
+       span := ispan.Span
+       primaryKey :=
+               append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
+       batch.Put(primaryKey, ispan.SpanDataBytes)
+
+       // Add this to the parent index.
+       for parentIdx := range span.Parents {
+               key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
+                       span.Parents[parentIdx].Val()...), span.Id.Val()...)
+               batch.Put(key, EMPTY_BYTE_BUF)
+       }
+
+       // Add to the other secondary indices.
+       beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
+               u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
+       batch.Put(beginTimeKey, EMPTY_BYTE_BUF)
+       endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
+               u64toSlice(s2u64(span.End))...), span.Id.Val()...)
+       batch.Put(endTimeKey, EMPTY_BYTE_BUF)
+       durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
+               u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
+       batch.Put(durationKey, EMPTY_BYTE_BUF)
+
+       err := shd.ldb.Write(shd.store.writeOpts, batch)
+       if err != nil {
+               shd.store.lg.Errorf("Error writing span %s to leveldb at %s: 
%s\n",
+                       span.String(), shd.path, err.Error())
+               return err
+       }
+       return nil
+}
+
+func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId,
+       lim int32) ([]common.SpanId, int32, error) {
+       searchKey := append([]byte{PARENT_ID_INDEX_PREFIX}, sid.Val()...)
+       iter := shd.ldb.NewIterator(shd.store.readOpts)
+       defer iter.Close()
+       iter.Seek(searchKey)
+       for {
+               if !iter.Valid() {
+                       break
+               }
+               if lim == 0 {
+                       break
+               }
+               key := iter.Key()
+               if !bytes.HasPrefix(key, searchKey) {
+                       break
+               }
+               id := common.SpanId(key[17:])
+               childIds = append(childIds, id)
+               lim--
+               iter.Next()
+       }
+       return childIds, lim, nil
+}
+
+// Close a shard.
+func (shd *shard) Close() {
+       lg := shd.store.lg
+       shd.incoming <- nil
+       lg.Infof("Waiting for %s to exit...\n", shd.path)
+       shd.exited.Wait()
+       shd.ldb.Close()
+       lg.Infof("Closed %s...\n", shd.path)
+}
+
+type Reaper struct {
+       // The logger used by the reaper
+       lg *common.Logger
+
+       // The number of milliseconds to keep spans around, in milliseconds.
+       spanExpiryMs int64
+
+       // The oldest date for which we'll keep spans.
+       reaperDate int64
+
+       // A channel used to send heartbeats to the reaper
+       heartbeats chan interface{}
+
+       // Tracks whether the reaper goroutine has exited
+       exited sync.WaitGroup
+
+       // The lock protecting reaper data.
+       lock sync.Mutex
+
+       // The reaper heartbeater
+       hb *Heartbeater
+
+       // The total number of spans which have been reaped.
+       ReapedSpans uint64
+}
+
+func NewReaper(cnf *conf.Config) *Reaper {
+       rpr := &Reaper{
+               lg:           common.NewLogger("reaper", cnf),
+               spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
+               heartbeats:   make(chan interface{}, 1),
+       }
+       if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
+               rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
+       } else if rpr.spanExpiryMs <= 0 {
+               rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
+       }
+       rpr.hb = NewHeartbeater("ReaperHeartbeater",
+               cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
+       rpr.exited.Add(1)
+       go rpr.run()
+       rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
+               name:       "reaper",
+               targetChan: rpr.heartbeats,
+       })
+       var when string
+       if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
+               when = "never"
+       } else {
+               when = "after " + time.Duration(rpr.spanExpiryMs).String()
+       }
+       rpr.lg.Infof("Initializing span reaper: span time out = %s.\n", when)
+       return rpr
+}
+
+func (rpr *Reaper) run() {
+       defer func() {
+               rpr.lg.Info("Exiting Reaper goroutine.\n")
+               rpr.exited.Done()
+       }()
+
+       for {
+               _, isOpen := <-rpr.heartbeats
+               if !isOpen {
+                       return
+               }
+               rpr.handleHeartbeat()
+       }
+}
+
+func (rpr *Reaper) handleHeartbeat() {
+       // TODO: check dataStore fullness
+       now := common.TimeToUnixMs(time.Now().UTC())
+       d, updated := func() (int64, bool) {
+               rpr.lock.Lock()
+               defer rpr.lock.Unlock()
+               newReaperDate := now - rpr.spanExpiryMs
+               if newReaperDate > rpr.reaperDate {
+                       rpr.reaperDate = newReaperDate
+                       return rpr.reaperDate, true
+               } else {
+                       return rpr.reaperDate, false
+               }
+       }()
+       if rpr.lg.DebugEnabled() {
+               if updated {
+                       rpr.lg.Debugf("Updating UTC reaper date to %s.\n",
+                               common.UnixMsToTime(d).Format(time.RFC3339))
+               } else {
+                       rpr.lg.Debugf("Not updating previous reaperDate of 
%s.\n",
+                               common.UnixMsToTime(d).Format(time.RFC3339))
+               }
+       }
+}
+
+func (rpr *Reaper) GetReaperDate() int64 {
+       rpr.lock.Lock()
+       defer rpr.lock.Unlock()
+       return rpr.reaperDate
+}
+
+func (rpr *Reaper) SetReaperDate(rdate int64) {
+       rpr.lock.Lock()
+       defer rpr.lock.Unlock()
+       rpr.reaperDate = rdate
+}
+
+func (rpr *Reaper) Shutdown() {
+       rpr.hb.Shutdown()
+       close(rpr.heartbeats)
+}
+
+// The Data Store.
+type dataStore struct {
+       lg *common.Logger
+
+       // The shards which manage our LevelDB instances.
+       shards []*shard
+
+       // The read options to use for LevelDB.
+       readOpts *levigo.ReadOptions
+
+       // The write options to use for LevelDB.
+       writeOpts *levigo.WriteOptions
+
+       // If non-null, a semaphore we will increment once for each span we 
receive.
+       // Used for testing.
+       WrittenSpans *common.Semaphore
+
+       // The metrics sink.
+       msink *MetricsSink
+
+       // The heartbeater which periodically asks shards to update the 
MetricsSink.
+       hb *Heartbeater
+
+       // The reaper for this datastore
+       rpr *Reaper
+
+       // When this datastore was started (in UTC milliseconds since the epoch)
+       startMs int64
+}
+
+func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) 
(*dataStore, error) {
+       dld := NewDataStoreLoader(cnf)
+       defer dld.Close()
+       err := dld.Load()
+       if err != nil {
+               dld.lg.Errorf("Error loading datastore: %s\n", err.Error())
+               return nil, err
+       }
+       store := &dataStore{
+               lg:           dld.lg,
+               shards:       make([]*shard, len(dld.shards)),
+               readOpts:     dld.readOpts,
+               writeOpts:    dld.writeOpts,
+               WrittenSpans: writtenSpans,
+               msink:        NewMetricsSink(cnf),
+               hb: NewHeartbeater("DatastoreHeartbeater",
+                       
cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), dld.lg),
+               rpr:     NewReaper(cnf),
+               startMs: common.TimeToUnixMs(time.Now().UTC()),
+       }
+       spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
+       for shdIdx := range store.shards {
+               shd := &shard{
+                       store:      store,
+                       ldb:        dld.shards[shdIdx].ldb,
+                       path:       dld.shards[shdIdx].path,
+                       incoming:   make(chan []*IncomingSpan, spanBufferSize),
+                       heartbeats: make(chan interface{}, 1),
+               }
+               shd.exited.Add(1)
+               go shd.processIncoming()
+               store.shards[shdIdx] = shd
+               store.hb.AddHeartbeatTarget(&HeartbeatTarget{
+                       name:       fmt.Sprintf("shard(%s)", shd.path),
+                       targetChan: shd.heartbeats,
+               })
+       }
+       dld.DisownResources()
+       return store, nil
+}
+
+// Close the DataStore.
+func (store *dataStore) Close() {
+       if store.hb != nil {
+               store.hb.Shutdown()
+               store.hb = nil
+       }
+       for idx := range store.shards {
+               if store.shards[idx] != nil {
+                       store.shards[idx].Close()
+                       store.shards[idx] = nil
+               }
+       }
+       if store.rpr != nil {
+               store.rpr.Shutdown()
+               store.rpr = nil
+       }
+       if store.readOpts != nil {
+               store.readOpts.Close()
+               store.readOpts = nil
+       }
+       if store.writeOpts != nil {
+               store.writeOpts.Close()
+               store.writeOpts = nil
+       }
+       if store.lg != nil {
+               store.lg.Close()
+               store.lg = nil
+       }
+}
+
+// Get the index of the shard which stores the given spanId.
+func (store *dataStore) getShardIndex(sid common.SpanId) int {
+       return int(sid.Hash32() % uint32(len(store.shards)))
+}
+
+const WRITESPANS_BATCH_SIZE = 128
+
+// SpanIngestor is a class used internally to ingest spans from an RPC
+// endpoint.  It groups spans destined for a particular shard into small
+// batches, so that we can reduce the number of objects that need to be sent
+// over the shard's "incoming" channel.  Since sending objects over a channel
+// requires goroutine synchronization, this improves performance.
+//
+// SpanIngestor also allows us to reuse the same encoder object for many spans,
+// rather than creating a new encoder per span.  This avoids re-doing the
+// encoder setup for each span, and also generates less garbage.
+type SpanIngestor struct {
+       // The logger to use.
+       lg *common.Logger
+
+       // The dataStore we are ingesting spans into.
+       store *dataStore
+
+       // The remote address these spans are coming from.
+       addr string
+
+       // Default TracerId
+       defaultTrid string
+
+       // The msgpack handle to use to serialize the spans.
+       mh codec.MsgpackHandle
+
+       // The msgpack encoder to use to serialize the spans.
+       // Caching this avoids generating a lot of garbage and burning CPUs
+       // creating new encoder objects for each span.
+       enc *codec.Encoder
+
+       // The buffer which codec.Encoder is currently serializing to.
+       // We have to create a new buffer for each span because once we hand it 
off to the shard, the
+       // shard manages the buffer lifecycle.
+       spanDataBytes []byte
+
+       // An array mapping shard index to span batch.
+       batches []*SpanIngestorBatch
+
+       // The total number of spans ingested.  Includes dropped spans.
+       totalIngested int
+
+       // The total number of spans the ingestor dropped because of a 
server-side error.
+       serverDropped int
+}
+
+// A batch of spans destined for a particular shard.
+type SpanIngestorBatch struct {
+       incoming []*IncomingSpan
+}
+
+func (store *dataStore) NewSpanIngestor(lg *common.Logger,
+       addr string, defaultTrid string) *SpanIngestor {
+       ing := &SpanIngestor{
+               lg:            lg,
+               store:         store,
+               addr:          addr,
+               defaultTrid:   defaultTrid,
+               spanDataBytes: make([]byte, 0, 1024),
+               batches:       make([]*SpanIngestorBatch, len(store.shards)),
+       }
+       ing.mh.WriteExt = true
+       ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh)
+       for batchIdx := range ing.batches {
+               ing.batches[batchIdx] = &SpanIngestorBatch{
+                       incoming: make([]*IncomingSpan, 0, 
WRITESPANS_BATCH_SIZE),
+               }
+       }
+       return ing
+}
+
+func (ing *SpanIngestor) IngestSpan(span *common.Span) {
+       ing.totalIngested++
+       // Make sure the span ID is valid.
+       spanIdProblem := span.Id.FindProblem()
+       if spanIdProblem != "" {
+               // Can't print the invalid span ID because String() might fail.
+               ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem)
+               ing.serverDropped++
+               return
+       }
+
+       // Set the default tracer id, if needed.
+       if span.TracerId == "" {
+               span.TracerId = ing.defaultTrid
+       }
+
+       // Encode the span data.  Doing the encoding here is better than doing 
it
+       // in the shard goroutine, because we can achieve more parallelism.
+       // There is one shard goroutine per shard, but potentially many more
+       // ingestors per shard.
+       err := ing.enc.Encode(span.SpanData)
+       if err != nil {
+               ing.lg.Warnf("Failed to encode span ID %s: %s\n",
+                       span.Id.String(), err.Error())
+               ing.serverDropped++
+               return
+       }
+       spanDataBytes := ing.spanDataBytes
+       ing.spanDataBytes = make([]byte, 0, 1024)
+       ing.enc.ResetBytes(&ing.spanDataBytes)
+
+       // Determine which shard this span should go to.
+       shardIdx := ing.store.getShardIndex(span.Id)
+       batch := ing.batches[shardIdx]
+       incomingLen := len(batch.incoming)
+       if ing.lg.TraceEnabled() {
+               ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, 
"+
+                       "incomingLen=%d, cap(batch.incoming)=%d\n",
+                       span.Id.String(), shardIdx, incomingLen, 
cap(batch.incoming))
+       }
+       if incomingLen+1 == cap(batch.incoming) {
+               if ing.lg.TraceEnabled() {
+                       ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d 
spans for "+
+                               "shard %d\n", len(batch.incoming), shardIdx)
+               }
+               ing.store.WriteSpans(shardIdx, batch.incoming)
+               batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE)
+               incomingLen = 0
+       } else {
+               batch.incoming = batch.incoming[0 : incomingLen+1]
+       }
+       batch.incoming[incomingLen] = &IncomingSpan{
+               Addr:          ing.addr,
+               Span:          span,
+               SpanDataBytes: spanDataBytes,
+       }
+}
+
+func (ing *SpanIngestor) Close(startTime time.Time) {
+       for shardIdx := range ing.batches {
+               batch := ing.batches[shardIdx]
+               if len(batch.incoming) > 0 {
+                       if ing.lg.TraceEnabled() {
+                               ing.lg.Tracef("SpanIngestor#Close: flushing %d 
span(s) for "+
+                                       "shard %d\n", len(batch.incoming), 
shardIdx)
+                       }
+                       ing.store.WriteSpans(shardIdx, batch.incoming)
+               }
+               batch.incoming = nil
+       }
+       ing.lg.Debugf("Closed span ingestor for %s.  Ingested %d span(s); 
dropped "+
+               "%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped)
+
+       endTime := time.Now()
+       ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested,
+               ing.serverDropped, endTime.Sub(startTime))
+}
+
+func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) {
+       store.shards[shardIdx].incoming <- ispans
+}
+
+func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
+       return store.shards[store.getShardIndex(sid)].FindSpan(sid)
+}
+
+func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
+       lg := shd.store.lg
+       primaryKey := append([]byte{SPAN_ID_INDEX_PREFIX}, sid.Val()...)
+       buf, err := shd.ldb.Get(shd.store.readOpts, primaryKey)
+       if err != nil {
+               if strings.Index(err.Error(), "NotFound:") != -1 {
+                       return nil
+               }
+               lg.Warnf("Shard(%s): FindSpan(%s) error: %s\n",
+                       shd.path, sid.String(), err.Error())
+               return nil
+       }
+       var span *common.Span
+       span, err = shd.decodeSpan(sid, buf)
+       if err != nil {
+               lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding 
[%s]\n",
+                       shd.path, sid.String(), err.Error(), 
hex.EncodeToString(buf))
+               return nil
+       }
+       return span
+}
+
+func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, 
error) {
+       r := bytes.NewBuffer(buf)
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       decoder := codec.NewDecoder(r, mh)
+       data := common.SpanData{}
+       err := decoder.Decode(&data)
+       if err != nil {
+               return nil, err
+       }
+       if data.Parents == nil {
+               data.Parents = []common.SpanId{}
+       }
+       return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
+}
+
+// Find the children of a given span id.
+func (store *dataStore) FindChildren(sid common.SpanId, lim int32) 
[]common.SpanId {
+       childIds := make([]common.SpanId, 0)
+       var err error
+
+       startIdx := store.getShardIndex(sid)
+       idx := startIdx
+       numShards := len(store.shards)
+       for {
+               if lim == 0 {
+                       break
+               }
+               shd := store.shards[idx]
+               childIds, lim, err = shd.FindChildren(sid, childIds, lim)
+               if err != nil {
+                       store.lg.Errorf("Shard(%s): FindChildren(%s) error: 
%s\n",
+                               shd.path, sid.String(), err.Error())
+               }
+               idx++
+               if idx >= numShards {
+                       idx = 0
+               }
+               if idx == startIdx {
+                       break
+               }
+       }
+       return childIds
+}
+
+type predicateData struct {
+       *common.Predicate
+       key []byte
+}
+
+func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
+       p := predicateData{Predicate: pred}
+
+       // Parse the input value given to make sure it matches up with the field
+       // type.
+       switch pred.Field {
+       case common.SPAN_ID:
+               // Span IDs are sent as hex strings.
+               var id common.SpanId
+               if err := id.FromString(pred.Val); err != nil {
+                       return nil, errors.New(fmt.Sprintf("Unable to parse 
span id '%s': %s",
+                               pred.Val, err.Error()))
+               }
+               p.key = id.Val()
+               break
+       case common.DESCRIPTION:
+               // Any string is valid for a description.
+               p.key = []byte(pred.Val)
+               break
+       case common.BEGIN_TIME, common.END_TIME, common.DURATION:
+               // Parse a base-10 signed numeric field.
+               v, err := strconv.ParseInt(pred.Val, 10, 64)
+               if err != nil {
+                       return nil, errors.New(fmt.Sprintf("Unable to parse %s 
'%s': %s",
+                               pred.Field, pred.Val, err.Error()))
+               }
+               p.key = u64toSlice(s2u64(v))
+               break
+       case common.TRACER_ID:
+               // Any string is valid for a tracer ID.
+               p.key = []byte(pred.Val)
+               break
+       default:
+               return nil, errors.New(fmt.Sprintf("Unknown field %s", 
pred.Field))
+       }
+
+       // Validate the predicate operation.
+       switch pred.Op {
+       case common.EQUALS, common.LESS_THAN_OR_EQUALS,
+               common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN:
+               break
+       case common.CONTAINS:
+               if p.fieldIsNumeric() {
+                       return nil, errors.New(fmt.Sprintf("Can't use CONTAINS 
on a "+
+                               "numeric field like '%s'", pred.Field))
+               }
+       default:
+               return nil, errors.New(fmt.Sprintf("Unknown predicate operation 
'%s'",
+                       pred.Op))
+       }
+
+       return &p, nil
+}
+
+// Get the index prefix for this predicate, or 0 if it is not indexed.
+func (pred *predicateData) getIndexPrefix() byte {
+       switch pred.Field {
+       case common.SPAN_ID:
+               return SPAN_ID_INDEX_PREFIX
+       case common.BEGIN_TIME:
+               return BEGIN_TIME_INDEX_PREFIX
+       case common.END_TIME:
+               return END_TIME_INDEX_PREFIX
+       case common.DURATION:
+               return DURATION_INDEX_PREFIX
+       default:
+               return INVALID_INDEX_PREFIX
+       }
+}
+
+// Returns true if the predicate type is numeric.
+func (pred *predicateData) fieldIsNumeric() bool {
+       switch pred.Field {
+       case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, 
common.DURATION:
+               return true
+       default:
+               return false
+       }
+}
+
+// Get the values that this predicate cares about for a given span.
+func (pred *predicateData) extractRelevantSpanData(span *common.Span) []byte {
+       switch pred.Field {
+       case common.SPAN_ID:
+               return span.Id.Val()
+       case common.DESCRIPTION:
+               return []byte(span.Description)
+       case common.BEGIN_TIME:
+               return u64toSlice(s2u64(span.Begin))
+       case common.END_TIME:
+               return u64toSlice(s2u64(span.End))
+       case common.DURATION:
+               return u64toSlice(s2u64(span.Duration()))
+       case common.TRACER_ID:
+               return []byte(span.TracerId)
+       default:
+               panic(fmt.Sprintf("Unknown field type %s.", pred.Field))
+       }
+}
+
+func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) 
bool {
+       // nil is after everything.
+       if a == nil {
+               if b == nil {
+                       return false
+               }
+               return false
+       } else if b == nil {
+               return true
+       }
+       // Compare the spans according to this predicate.
+       aVal := pred.extractRelevantSpanData(a)
+       bVal := pred.extractRelevantSpanData(b)
+       cmp := bytes.Compare(aVal, bVal)
+       if pred.Op.IsDescending() {
+               return cmp > 0
+       } else {
+               return cmp < 0
+       }
+}
+
+type satisfiedByReturn int
+
+const (
+       NOT_SATISFIED     satisfiedByReturn = iota
+       NOT_YET_SATISFIED                   = iota
+       SATISFIED                           = iota
+)
+
+func (r satisfiedByReturn) String() string {
+       switch r {
+       case NOT_SATISFIED:
+               return "NOT_SATISFIED"
+       case NOT_YET_SATISFIED:
+               return "NOT_YET_SATISFIED"
+       case SATISFIED:
+               return "SATISFIED"
+       default:
+               return "(unknown)"
+       }
+}
+
+// Determine whether the predicate is satisfied by the given span.
+func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn {
+       val := pred.extractRelevantSpanData(span)
+       switch pred.Op {
+       case common.CONTAINS:
+               if bytes.Contains(val, pred.key) {
+                       return SATISFIED
+               } else {
+                       return NOT_SATISFIED
+               }
+       case common.EQUALS:
+               if bytes.Equal(val, pred.key) {
+                       return SATISFIED
+               } else {
+                       return NOT_SATISFIED
+               }
+       case common.LESS_THAN_OR_EQUALS:
+               if bytes.Compare(val, pred.key) <= 0 {
+                       return SATISFIED
+               } else {
+                       return NOT_YET_SATISFIED
+               }
+       case common.GREATER_THAN_OR_EQUALS:
+               if bytes.Compare(val, pred.key) >= 0 {
+                       return SATISFIED
+               } else {
+                       return NOT_SATISFIED
+               }
+       case common.GREATER_THAN:
+               cmp := bytes.Compare(val, pred.key)
+               if cmp <= 0 {
+                       return NOT_YET_SATISFIED
+               } else {
+                       return SATISFIED
+               }
+       default:
+               panic(fmt.Sprintf("unknown Op type %s should have been caught "+
+                       "during normalization", pred.Op))
+       }
+}
+
+func (pred *predicateData) createSource(store *dataStore, prev *common.Span) 
(*source, error) {
+       var ret *source
+       src := source{store: store,
+               pred:      pred,
+               shards:    make([]*shard, len(store.shards)),
+               iters:     make([]*levigo.Iterator, 0, len(store.shards)),
+               nexts:     make([]*common.Span, len(store.shards)),
+               numRead:   make([]int, len(store.shards)),
+               keyPrefix: pred.getIndexPrefix(),
+       }
+       if src.keyPrefix == INVALID_INDEX_PREFIX {
+               return nil, errors.New(fmt.Sprintf("Can't create source from 
unindexed "+
+                       "predicate on field %s", pred.Field))
+       }
+       defer func() {
+               if ret == nil {
+                       src.Close()
+               }
+       }()
+       for shardIdx := range store.shards {
+               shd := store.shards[shardIdx]
+               src.shards[shardIdx] = shd
+               src.iters = append(src.iters, 
shd.ldb.NewIterator(store.readOpts))
+       }
+       var searchKey []byte
+       lg := store.lg
+       if prev != nil {
+               // If prev != nil, this query RPC is the continuation of a 
previous
+               // one.  The final result returned the last time is 'prev'.
+               //
+               // To avoid returning the same results multiple times, we 
adjust the
+               // predicate here.  If the predicate is on the span id field, we
+               // simply manipulate the span ID we're looking for.
+               //
+               // If the predicate is on a secondary index, we also use span 
ID, but
+               // in a slightly different way.  Since the secondary indices are
+               // organized as [type-code][8b-secondary-key][8b-span-id], 
elements
+               // with the same secondary index field are ordered by span ID.  
So we
+               // create a 17-byte key incorporating the span ID from 'prev.'
+               startId := common.INVALID_SPAN_ID
+               switch pred.Op {
+               case common.EQUALS:
+                       if pred.Field == common.SPAN_ID {
+                               // This is an annoying corner case.  There can 
only be one
+                               // result each time we do an EQUALS search for 
a span id.
+                               // Span id is the primary key for all our spans.
+                               // But for some reason someone is asking for 
another result.
+                               // We modify the query to search for the 
illegal 0 span ID,
+                               // which will never be present.
+                               if lg.DebugEnabled() {
+                                       lg.Debugf("Attempted to use a 
continuation token with an EQUALS "+
+                                               "SPAN_ID query. %s.  Setting 
search id = 0",
+                                               pred.Predicate.String())
+                               }
+                               startId = common.INVALID_SPAN_ID
+                       } else {
+                               // When doing an EQUALS search on a secondary 
index, the
+                               // results are sorted by span id.
+                               startId = prev.Id.Next()
+                       }
+               case common.LESS_THAN_OR_EQUALS:
+                       // Subtract one from the previous span id.  Since the 
previous
+                       // start ID will never be 0 (0 is an illegal span id), 
we'll never
+                       // wrap around when doing this.
+                       startId = prev.Id.Prev()
+               case common.GREATER_THAN_OR_EQUALS:
+                       // We can't add one to the span id, since the previous 
span ID
+                       // might be the maximum value.  So just switch over to 
using
+                       // GREATER_THAN.
+                       pred.Op = common.GREATER_THAN
+                       startId = prev.Id
+               case common.GREATER_THAN:
+                       // This one is easy.
+                       startId = prev.Id
+               default:
+                       str := fmt.Sprintf("Can't use a %v predicate as a 
source.", pred.Predicate.String())
+                       lg.Error(str + "\n")
+                       panic(str)
+               }
+               if pred.Field == common.SPAN_ID {
+                       pred.key = startId.Val()
+                       searchKey = append([]byte{src.keyPrefix}, 
startId.Val()...)
+               } else {
+                       // Start where the previous query left off.  This means 
adjusting
+                       // our uintKey.
+                       pred.key = pred.extractRelevantSpanData(prev)
+                       searchKey = append(append([]byte{src.keyPrefix}, 
pred.key...),
+                               startId.Val()...)
+               }
+               if lg.TraceEnabled() {
+                       lg.Tracef("Handling continuation token %s for %s.  
startId=%d, "+
+                               "pred.uintKey=%s\n", prev, 
pred.Predicate.String(), startId,
+                               hex.EncodeToString(pred.key))
+               }
+       } else {
+               searchKey = append([]byte{src.keyPrefix}, pred.key...)
+       }
+       for i := range src.iters {
+               src.iters[i].Seek(searchKey)
+       }
+       ret = &src
+       return ret, nil
+}
+
+// A source of spans.
+type source struct {
+       store     *dataStore
+       pred      *predicateData
+       shards    []*shard
+       iters     []*levigo.Iterator
+       nexts     []*common.Span
+       numRead   []int
+       keyPrefix byte
+}
+
+func CreateReaperSource(shd *shard) (*source, error) {
+       store := shd.store
+       p := &common.Predicate{
+               Op:    common.GREATER_THAN_OR_EQUALS,
+               Field: common.BEGIN_TIME,
+               Val:   common.INVALID_SPAN_ID.String(),
+       }
+       pred, err := loadPredicateData(p)
+       if err != nil {
+               return nil, err
+       }
+       src := &source{
+               store:     store,
+               pred:      pred,
+               shards:    []*shard{shd},
+               iters:     make([]*levigo.Iterator, 1),
+               nexts:     make([]*common.Span, 1),
+               numRead:   make([]int, 1),
+               keyPrefix: pred.getIndexPrefix(),
+       }
+       iter := shd.ldb.NewIterator(store.readOpts)
+       src.iters[0] = iter
+       searchKey := append(append([]byte{src.keyPrefix}, pred.key...),
+               pred.key...)
+       iter.Seek(searchKey)
+       return src, nil
+}
+
+// Fill in the entry in the 'next' array for a specific shard.
+func (src *source) populateNextFromShard(shardIdx int) {
+       lg := src.store.lg
+       var err error
+       iter := src.iters[shardIdx]
+       shdPath := src.shards[shardIdx].path
+       if iter == nil {
+               lg.Debugf("Can't populate: No more entries in shard %s\n", 
shdPath)
+               return // There are no more entries in this shard.
+       }
+       if src.nexts[shardIdx] != nil {
+               lg.Debugf("No need to populate shard %s\n", shdPath)
+               return // We already have a valid entry for this shard.
+       }
+       for {
+               if !iter.Valid() {
+                       lg.Debugf("Can't populate: Iterator for shard %s is no 
longer valid.\n", shdPath)
+                       break // Can't read past end of DB
+               }
+               src.numRead[shardIdx]++
+               key := iter.Key()
+               if len(key) < 1 {
+                       lg.Warnf("Encountered invalid zero-byte key in shard 
%s.\n", shdPath)
+                       break
+               }
+               ret := src.checkKeyPrefix(key[0], iter)
+               if ret == NOT_SATISFIED {
+                       break // Can't read past end of indexed section
+               } else if ret == NOT_YET_SATISFIED {
+                       if src.pred.Op.IsDescending() {
+                               iter.Prev()
+                       } else {
+                               iter.Next()
+                       }
+                       continue // Try again because we are not yet at the 
indexed section.
+               }
+               var span *common.Span
+               var sid common.SpanId
+               if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
+                       // The span id maps to the span itself.
+                       sid = common.SpanId(key[1:17])
+                       span, err = src.shards[shardIdx].decodeSpan(sid, 
iter.Value())
+                       if err != nil {
+                               if lg.DebugEnabled() {
+                                       lg.Debugf("Internal error decoding span 
%s in shard %s: %s\n",
+                                               sid.String(), shdPath, 
err.Error())
+                               }
+                               break
+                       }
+               } else {
+                       // With a secondary index, we have to look up the span 
by id.
+                       sid = common.SpanId(key[9:25])
+                       span = src.shards[shardIdx].FindSpan(sid)
+                       if span == nil {
+                               if lg.DebugEnabled() {
+                                       lg.Debugf("Internal error rehydrating 
span %s in shard %s\n",
+                                               sid.String(), shdPath)
+                               }
+                               break
+                       }
+               }
+               if src.pred.Op.IsDescending() {
+                       iter.Prev()
+               } else {
+                       iter.Next()
+               }
+               ret = src.pred.satisfiedBy(span)
+               if ret == SATISFIED {
+                       if lg.DebugEnabled() {
+                               lg.Debugf("Populated valid span %v from shard 
%s.\n", sid, shdPath)
+                       }
+                       src.nexts[shardIdx] = span // Found valid entry
+                       return
+               }
+               if ret == NOT_SATISFIED {
+                       // This and subsequent entries don't satisfy predicate
+                       break
+               }
+       }
+       lg.Debugf("Closing iterator for shard %s.\n", shdPath)
+       iter.Close()
+       src.iters[shardIdx] = nil
+}
+
+// Check the key prefix against the key prefix of the query.
+func (src *source) checkKeyPrefix(kp byte, iter *levigo.Iterator) 
satisfiedByReturn {
+       if kp == src.keyPrefix {
+               return SATISFIED
+       } else if kp < src.keyPrefix {
+               if src.pred.Op.IsDescending() {
+                       return NOT_SATISFIED
+               } else {
+                       return NOT_YET_SATISFIED
+               }
+       } else {
+               if src.pred.Op.IsDescending() {
+                       return NOT_YET_SATISFIED
+               } else {
+                       return NOT_SATISFIED
+               }
+       }
+}
+
+func (src *source) next() *common.Span {
+       for shardIdx := range src.shards {
+               src.populateNextFromShard(shardIdx)
+       }
+       var best *common.Span
+       bestIdx := -1
+       for shardIdx := range src.iters {
+               span := src.nexts[shardIdx]
+               if src.pred.spanPtrIsBefore(span, best) {
+                       best = span
+                       bestIdx = shardIdx
+               }
+       }
+       if bestIdx >= 0 {
+               src.nexts[bestIdx] = nil
+       }
+       return best
+}
+
+func (src *source) Close() {
+       for i := range src.iters {
+               if src.iters[i] != nil {
+                       src.iters[i].Close()
+               }
+       }
+       src.iters = nil
+}
+
+func (src *source) getStats() string {
+       ret := fmt.Sprintf("Source stats: pred = %s", src.pred.String())
+       prefix := ". "
+       for shardIdx := range src.shards {
+               next := fmt.Sprintf("%sRead %d spans from %s", prefix,
+                       src.numRead[shardIdx], src.shards[shardIdx].path)
+               prefix = ", "
+               ret = ret + next
+       }
+       return ret
+}
+
+func (store *dataStore) obtainSource(preds *[]*predicateData, span 
*common.Span) (*source, error) {
+       // Read spans from the first predicate that is indexed.
+       p := *preds
+       for i := range p {
+               pred := p[i]
+               if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
+                       *preds = append(p[0:i], p[i+1:]...)
+                       return pred.createSource(store, span)
+               }
+       }
+       // If there are no predicates that are indexed, read rows in order of 
span id.
+       spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
+               Field: common.SPAN_ID,
+               Val:   common.INVALID_SPAN_ID.String(),
+       }
+       spanIdPredData, err := loadPredicateData(&spanIdPred)
+       if err != nil {
+               return nil, err
+       }
+       return spanIdPredData.createSource(store, span)
+}
+
+func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, 
error, []int) {
+       lg := store.lg
+       // Parse predicate data.
+       var err error
+       preds := make([]*predicateData, len(query.Predicates))
+       for i := range query.Predicates {
+               preds[i], err = loadPredicateData(&query.Predicates[i])
+               if err != nil {
+                       return nil, err, nil
+               }
+       }
+       // Get a source of rows.
+       var src *source
+       src, err = store.obtainSource(&preds, query.Prev)
+       if err != nil {
+               return nil, err, nil
+       }
+       defer src.Close()
+       if lg.DebugEnabled() {
+               lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, 
preds, src)
+       }
+
+       // Filter the spans through the remaining predicates.
+       reserved := 32
+       if query.Lim < reserved {
+               reserved = query.Lim
+       }
+       ret := make([]*common.Span, 0, reserved)
+       for {
+               if len(ret) >= query.Lim {
+                       if lg.DebugEnabled() {
+                               lg.Debugf("HandleQuery %s: hit query limit 
after obtaining "+
+                                       "%d results. %s\n.", query, query.Lim, 
src.getStats())
+                       }
+                       break // we hit the result size limit
+               }
+               span := src.next()
+               if span == nil {
+                       if lg.DebugEnabled() {
+                               lg.Debugf("HandleQuery %s: found %d result(s), 
which are "+
+                                       "all that exist. %s\n", query, 
len(ret), src.getStats())
+                       }
+                       break // the source has no more spans to give
+               }
+               if lg.DebugEnabled() {
+                       lg.Debugf("src.next returned span %s\n", span.ToJson())
+               }
+               satisfied := true
+               for predIdx := range preds {
+                       if preds[predIdx].satisfiedBy(span) != SATISFIED {
+                               satisfied = false
+                               break
+                       }
+               }
+               if satisfied {
+                       ret = append(ret, span)
+               }
+       }
+       return ret, nil, src.numRead
+}
+
+func (store *dataStore) ServerStats() *common.ServerStats {
+       serverStats := common.ServerStats{
+               Dirs: make([]common.StorageDirectoryStats, len(store.shards)),
+       }
+       for shardIdx := range store.shards {
+               shard := store.shards[shardIdx]
+               serverStats.Dirs[shardIdx].Path = shard.path
+               r := levigo.Range{
+                       Start: []byte{0},
+                       Limit: []byte{0xff},
+               }
+               vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
+               serverStats.Dirs[shardIdx].ApproximateBytes = vals[0]
+               serverStats.Dirs[shardIdx].LevelDbStats =
+                       shard.ldb.PropertyValue("leveldb.stats")
+               store.msink.lg.Debugf("levedb.stats for %s: %s\n",
+                       shard.path, shard.ldb.PropertyValue("leveldb.stats"))
+       }
+       serverStats.LastStartMs = store.startMs
+       serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
+       serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
+       store.msink.PopulateServerStats(&serverStats)
+       return &serverStats
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/htrace/htraced/datastore_test.go
new file mode 100644
index 0000000..a7ecead
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/datastore_test.go
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       htrace "htrace/client"
+       "htrace/common"
+       "htrace/conf"
+       "htrace/test"
+       "math/rand"
+       "os"
+       "reflect"
+       "sort"
+       "testing"
+       "time"
+)
+
+// Test creating and tearing down a datastore.
+func TestCreateDatastore(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore",
+               DataDirs: make([]string, 3)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+}
+
+var SIMPLE_TEST_SPANS []common.Span = []common.Span{
+       common.Span{Id: common.TestId("00000000000000000000000000000001"),
+               SpanData: common.SpanData{
+                       Begin:       123,
+                       End:         456,
+                       Description: "getFileDescriptors",
+                       Parents:     []common.SpanId{},
+                       TracerId:    "firstd",
+               }},
+       common.Span{Id: common.TestId("00000000000000000000000000000002"),
+               SpanData: common.SpanData{
+                       Begin:       125,
+                       End:         200,
+                       Description: "openFd",
+                       Parents:     
[]common.SpanId{common.TestId("00000000000000000000000000000001")},
+                       TracerId:    "secondd",
+               }},
+       common.Span{Id: common.TestId("00000000000000000000000000000003"),
+               SpanData: common.SpanData{
+                       Begin:       200,
+                       End:         456,
+                       Description: "passFd",
+                       Parents:     
[]common.SpanId{common.TestId("00000000000000000000000000000001")},
+                       TracerId:    "thirdd",
+               }},
+}
+
+func createSpans(spans []common.Span, store *dataStore) {
+       ing := store.NewSpanIngestor(store.lg, "127.0.0.1", "")
+       for idx := range spans {
+               ing.IngestSpan(&spans[idx])
+       }
+       ing.Close(time.Now())
+       store.WrittenSpans.Waits(int64(len(spans)))
+}
+
+// Test creating a datastore and adding some spans.
+func TestDatastoreWriteAndRead(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
+               Cnf: map[string]string{
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
+               },
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+
+       span := 
ht.Store.FindSpan(common.TestId("00000000000000000000000000000001"))
+       if span == nil {
+               t.Fatal()
+       }
+       if !span.Id.Equal(common.TestId("00000000000000000000000000000001")) {
+               t.Fatal()
+       }
+       common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span)
+       children := 
ht.Store.FindChildren(common.TestId("00000000000000000000000000000001"), 1)
+       if len(children) != 1 {
+               t.Fatalf("expected 1 child, but got %d\n", len(children))
+       }
+       children = 
ht.Store.FindChildren(common.TestId("00000000000000000000000000000001"), 2)
+       if len(children) != 2 {
+               t.Fatalf("expected 2 children, but got %d\n", len(children))
+       }
+       sort.Sort(common.SpanIdSlice(children))
+       if 
!children[0].Equal(common.TestId("00000000000000000000000000000002")) {
+               t.Fatal()
+       }
+       if 
!children[1].Equal(common.TestId("00000000000000000000000000000003")) {
+               t.Fatal()
+       }
+}
+
+func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
+       expectedSpans []common.Span) {
+       testQueryExt(t, ht, query, expectedSpans, nil)
+}
+
+func testQueryExt(t *testing.T, ht *MiniHTraced, query *common.Query,
+       expectedSpans []common.Span, expectedNumScanned []int) {
+       spans, err, numScanned := ht.Store.HandleQuery(query)
+       if err != nil {
+               t.Fatalf("Query %s failed: %s\n", query.String(), err.Error())
+       }
+       expectedBuf := new(bytes.Buffer)
+       dec := json.NewEncoder(expectedBuf)
+       err = dec.Encode(expectedSpans)
+       if err != nil {
+               t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", 
err.Error())
+       }
+       spansBuf := new(bytes.Buffer)
+       dec = json.NewEncoder(spansBuf)
+       err = dec.Encode(spans)
+       if err != nil {
+               t.Fatalf("Failed to encode result spans to JSON: %s\n", 
err.Error())
+       }
+       t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
+               len(expectedSpans))
+       common.ExpectStrEqual(t, string(expectedBuf.Bytes()), 
string(spansBuf.Bytes()))
+       if expectedNumScanned != nil {
+               if !reflect.DeepEqual(expectedNumScanned, numScanned) {
+                       t.Fatalf("Invalid values for numScanned: got %v, 
expected %v\n",
+                               expectedNumScanned, numScanned)
+               }
+       }
+}
+
+// Test queries on the datastore.
+func TestSimpleQuery(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+               Cnf: map[string]string{
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
+               },
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+}
+
+func TestQueries2(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+               Cnf: map[string]string{
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
+               },
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.DESCRIPTION,
+                               Val:   "getFileDescriptors",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.DESCRIPTION,
+                               Val:   "getFileDescriptors",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries3(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+               Cnf: map[string]string{
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
+               },
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.CONTAINS,
+                               Field: common.DESCRIPTION,
+                               Val:   "Fd",
+                       },
+                       common.Predicate{
+                               Op:    common.GREATER_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "100",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   
common.TestId("00000000000000000000000000000000").String(),
+                       },
+               },
+               Lim: 200,
+       }, []common.Span{})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   
common.TestId("00000000000000000000000000000002").String(),
+                       },
+               },
+               Lim: 200,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries4(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
+               Cnf: map[string]string{
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
+               },
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[2]})
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN_OR_EQUALS,
+                               Field: common.DESCRIPTION,
+                               Val:   "openFd",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.DESCRIPTION,
+                               Val:   "openFd",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[2]})
+}
+
+var TEST_QUERIES5_SPANS []common.Span = []common.Span{
+       common.Span{Id: common.TestId("10000000000000000000000000000001"),
+               SpanData: common.SpanData{
+                       Begin:       123,
+                       End:         456,
+                       Description: "span1",
+                       Parents:     []common.SpanId{},
+                       TracerId:    "myTracer",
+               }},
+       common.Span{Id: common.TestId("10000000000000000000000000000002"),
+               SpanData: common.SpanData{
+                       Begin:       123,
+                       End:         200,
+                       Description: "span2",
+                       Parents:     
[]common.SpanId{common.TestId("10000000000000000000000000000001")},
+                       TracerId:    "myTracer",
+               }},
+       common.Span{Id: common.TestId("10000000000000000000000000000003"),
+               SpanData: common.SpanData{
+                       Begin:       124,
+                       End:         457,
+                       Description: "span3",
+                       Parents:     
[]common.SpanId{common.TestId("10000000000000000000000000000001")},
+                       TracerId:    "myTracer",
+               }},
+}
+
+func TestQueries5(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries5",
+               WrittenSpans: common.NewSemaphore(0),
+               DataDirs:     make([]string, 1),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(TEST_QUERIES5_SPANS, ht.Store)
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.BEGIN_TIME,
+                               Val:   "123",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{TEST_QUERIES5_SPANS[2]})
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.END_TIME,
+                               Val:   "200",
+                       },
+               },
+               Lim: 500,
+       }, []common.Span{TEST_QUERIES5_SPANS[0], TEST_QUERIES5_SPANS[2]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.END_TIME,
+                               Val:   "999",
+                       },
+               },
+               Lim: 500,
+       }, []common.Span{TEST_QUERIES5_SPANS[2],
+               TEST_QUERIES5_SPANS[0],
+               TEST_QUERIES5_SPANS[1],
+       })
+}
+
+func BenchmarkDatastoreWrites(b *testing.B) {
+       htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
+               Cnf: map[string]string{
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
+                       conf.HTRACE_LOG_LEVEL:                     "INFO",
+               },
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               b.Fatalf("Error creating MiniHTraced: %s\n", err.Error())
+       }
+       ht.Store.lg.Infof("BenchmarkDatastoreWrites: b.N = %d\n", b.N)
+       defer func() {
+               if r := recover(); r != nil {
+                       ht.Store.lg.Infof("panic: %s\n", r.(error))
+               }
+               ht.Close()
+       }()
+       rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
+       allSpans := make([]*common.Span, b.N)
+       for n := range allSpans {
+               allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
+       }
+
+       // Reset the timer to avoid including the time required to create new
+       // random spans in the benchmark total.
+       b.ResetTimer()
+
+       // Write many random spans.
+       ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
+       for n := 0; n < b.N; n++ {
+               ing.IngestSpan(allSpans[n])
+       }
+       ing.Close(time.Now())
+       // Wait for all the spans to be written.
+       ht.Store.WrittenSpans.Waits(int64(b.N))
+       assertNumWrittenEquals(b, ht.Store.msink, b.N)
+}
+
+func verifySuccessfulLoad(t *testing.T, allSpans common.SpanSlice,
+       dataDirs []string) {
+       htraceBld := &MiniHTracedBuilder{
+               Name:                "TestReloadDataStore#verifySuccessfulLoad",
+               DataDirs:            dataDirs,
+               KeepDataDirsOnClose: true,
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       defer hcl.Close()
+       for i := 0; i < len(allSpans); i++ {
+               span, err := hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+       // Look up the spans we wrote.
+       var span *common.Span
+       for i := 0; i < len(allSpans); i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+}
+
+func verifyFailedLoad(t *testing.T, dataDirs []string, expectedErr string) {
+       htraceBld := &MiniHTracedBuilder{
+               Name:                "TestReloadDataStore#verifyFailedLoad",
+               DataDirs:            dataDirs,
+               KeepDataDirsOnClose: true,
+       }
+       _, err := htraceBld.Build()
+       if err == nil {
+               t.Fatalf("expected failure to load, but the load succeeded.")
+       }
+       common.AssertErrContains(t, err, expectedErr)
+}
+
+func TestReloadDataStore(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
+               Cnf: map[string]string{
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
+               },
+               DataDirs:            make([]string, 2),
+               KeepDataDirsOnClose: true,
+               WrittenSpans:        common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       dataDirs := make([]string, len(ht.DataDirs))
+       copy(dataDirs, ht.DataDirs)
+       defer func() {
+               if ht != nil {
+                       ht.Close()
+               }
+               for i := range dataDirs {
+                       os.RemoveAll(dataDirs[i])
+               }
+       }()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       hcnf := ht.Cnf.Clone()
+
+       // Create some random trace spans.
+       NUM_TEST_SPANS := 5
+       allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+       err = hcl.WriteSpans(allSpans)
+       if err != nil {
+               t.Fatalf("WriteSpans failed: %s\n", err.Error())
+       }
+       ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
+
+       // Look up the spans we wrote.
+       var span *common.Span
+       for i := 0; i < NUM_TEST_SPANS; i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+       hcl.Close()
+       ht.Close()
+       ht = nil
+
+       // Verify that we can reload the datastore, even if we configure the 
data
+       // directories in a different order.
+       verifySuccessfulLoad(t, allSpans, []string{dataDirs[1], dataDirs[0]})
+
+       // If we try to reload the datastore with only one directory, it won't 
work
+       // (we need both).
+       verifyFailedLoad(t, []string{dataDirs[1]},
+               "The TotalShards field of all shards is 2, but we have 1 
shards.")
+
+       // Test that we give an intelligent error message when 0 directories are
+       // configured.
+       verifyFailedLoad(t, []string{}, "No shard directories found.")
+
+       // Can't specify the same directory more than once... will get "lock
+       // already held by process"
+       verifyFailedLoad(t, []string{dataDirs[0], dataDirs[1], dataDirs[1]},
+               " already held by process.")
+
+       // Open the datastore and modify it to have the wrong DaemonId
+       dld := NewDataStoreLoader(hcnf)
+       defer func() {
+               if dld != nil {
+                       dld.Close()
+                       dld = nil
+               }
+       }()
+       dld.LoadShards()
+       sinfo, err := dld.shards[0].readShardInfo()
+       if err != nil {
+               t.Fatalf("error reading shard info for shard %s: %s\n",
+                       dld.shards[0].path, err.Error())
+       }
+       newDaemonId := sinfo.DaemonId + 1
+       dld.lg.Infof("Read %s from shard %s.  Changing daemonId to 0x%016x\n.",
+               asJson(sinfo), dld.shards[0].path, newDaemonId)
+       sinfo.DaemonId = newDaemonId
+       err = dld.shards[0].writeShardInfo(sinfo)
+       if err != nil {
+               t.Fatalf("error writing shard info for shard %s: %s\n",
+                       dld.shards[0].path, err.Error())
+       }
+       dld.Close()
+       dld = nil
+       verifyFailedLoad(t, dataDirs, "DaemonId mismatch.")
+
+       // Open the datastore and modify it to have the wrong TotalShards
+       dld = NewDataStoreLoader(hcnf)
+       dld.LoadShards()
+       sinfo, err = dld.shards[0].readShardInfo()
+       if err != nil {
+               t.Fatalf("error reading shard info for shard %s: %s\n",
+                       dld.shards[0].path, err.Error())
+       }
+       newDaemonId = sinfo.DaemonId - 1
+       dld.lg.Infof("Read %s from shard %s.  Changing daemonId to 0x%016x, "+
+               "TotalShards to 3\n.",
+               asJson(sinfo), dld.shards[0].path, newDaemonId)
+       sinfo.DaemonId = newDaemonId
+       sinfo.TotalShards = 3
+       err = dld.shards[0].writeShardInfo(sinfo)
+       if err != nil {
+               t.Fatalf("error writing shard info for shard %s: %s\n",
+                       dld.shards[0].path, err.Error())
+       }
+       dld.Close()
+       dld = nil
+       verifyFailedLoad(t, dataDirs, "TotalShards mismatch.")
+
+       // Open the datastore and modify it to have the wrong LayoutVersion
+       dld = NewDataStoreLoader(hcnf)
+       dld.LoadShards()
+       for shardIdx := range dld.shards {
+               sinfo, err = dld.shards[shardIdx].readShardInfo()
+               if err != nil {
+                       t.Fatalf("error reading shard info for shard %s: %s\n",
+                               dld.shards[shardIdx].path, err.Error())
+               }
+               dld.lg.Infof("Read %s from shard %s.  Changing TotalShards to 
2, "+
+                       "LayoutVersion to 2\n", asJson(sinfo), 
dld.shards[shardIdx].path)
+               sinfo.TotalShards = 2
+               sinfo.LayoutVersion = 2
+               err = dld.shards[shardIdx].writeShardInfo(sinfo)
+               if err != nil {
+                       t.Fatalf("error writing shard info for shard %s: %s\n",
+                               dld.shards[0].path, err.Error())
+               }
+       }
+       dld.Close()
+       dld = nil
+       verifyFailedLoad(t, dataDirs, "The layout version of all shards is 2, "+
+               "but we only support")
+
+       // It should work with data.store.clear set.
+       htraceBld = &MiniHTracedBuilder{
+               Name:                "TestReloadDataStore#clear",
+               DataDirs:            dataDirs,
+               KeepDataDirsOnClose: true,
+               Cnf:                 
map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"},
+       }
+       ht, err = htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+}
+
+func TestQueriesWithContinuationTokens1(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: 
"TestQueriesWithContinuationTokens1",
+               Cnf: map[string]string{
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
+               },
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
+       // Adding a prev value to this query excludes the first result that we
+       // would normally get.
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.BEGIN_TIME,
+                               Val:   "120",
+                       },
+               },
+               Lim:  5,
+               Prev: &SIMPLE_TEST_SPANS[0],
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+       // There is only one result from an EQUALS query on SPAN_ID.
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   
common.TestId("00000000000000000000000000000001").String(),
+                       },
+               },
+               Lim:  100,
+               Prev: &SIMPLE_TEST_SPANS[0],
+       }, []common.Span{})
+
+       // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the
+       // span we pass as a continuation token. (Primary index edition).
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   
common.TestId("00000000000000000000000000000002").String(),
+                       },
+               },
+               Lim:  100,
+               Prev: &SIMPLE_TEST_SPANS[1],
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+       // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back 
the
+       // span we pass as a continuation token. (Secondary index edition).
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.DURATION,
+                               Val:   "0",
+                       },
+               },
+               Lim:  100,
+               Prev: &SIMPLE_TEST_SPANS[1],
+       }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueryRowsScanned(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueryRowsScanned",
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
+       testQueryExt(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   
common.TestId("00000000000000000000000000000001").String(),
+                       },
+               },
+               Lim:  100,
+               Prev: nil,
+       }, []common.Span{SIMPLE_TEST_SPANS[0]},
+               []int{2, 1})
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/heartbeater.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/heartbeater.go 
b/htrace-htraced/go/src/htrace/htraced/heartbeater.go
new file mode 100644
index 0000000..3f4c951
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/heartbeater.go
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "htrace/common"
+       "sync"
+       "time"
+)
+
+type Heartbeater struct {
+       // The name of this heartbeater
+       name string
+
+       // How long to sleep between heartbeats, in milliseconds.
+       periodMs int64
+
+       // The logger to use.
+       lg *common.Logger
+
+       // The channels to send the heartbeat on.
+       targets []HeartbeatTarget
+
+       // Incoming requests to the heartbeater.  When this is closed, the
+       // heartbeater will exit.
+       req chan *HeartbeatTarget
+
+       wg sync.WaitGroup
+}
+
+type HeartbeatTarget struct {
+       // The name of the heartbeat target.
+       name string
+
+       // The channel for the heartbeat target.
+       targetChan chan interface{}
+}
+
+func (tgt *HeartbeatTarget) String() string {
+       return tgt.name
+}
+
+func NewHeartbeater(name string, periodMs int64, lg *common.Logger) 
*Heartbeater {
+       hb := &Heartbeater{
+               name:     name,
+               periodMs: periodMs,
+               lg:       lg,
+               targets:  make([]HeartbeatTarget, 0, 4),
+               req:      make(chan *HeartbeatTarget),
+       }
+       hb.wg.Add(1)
+       go hb.run()
+       return hb
+}
+
+func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) {
+       hb.req <- tgt
+}
+
+func (hb *Heartbeater) Shutdown() {
+       close(hb.req)
+       hb.wg.Wait()
+}
+
+func (hb *Heartbeater) String() string {
+       return hb.name
+}
+
+func (hb *Heartbeater) run() {
+       defer func() {
+               hb.lg.Debugf("%s: exiting.\n", hb.String())
+               hb.wg.Done()
+       }()
+       period := time.Duration(hb.periodMs) * time.Millisecond
+       for {
+               periodEnd := time.Now().Add(period)
+               for {
+                       timeToWait := periodEnd.Sub(time.Now())
+                       if timeToWait <= 0 {
+                               break
+                       } else if timeToWait > period {
+                               // Smooth over jitter or clock changes
+                               timeToWait = period
+                               periodEnd = time.Now().Add(period)
+                       }
+                       select {
+                       case tgt, open := <-hb.req:
+                               if !open {
+                                       return
+                               }
+                               hb.targets = append(hb.targets, *tgt)
+                               hb.lg.Debugf("%s: added %s.\n", hb.String(), 
tgt.String())
+                       case <-time.After(timeToWait):
+                       }
+               }
+               for targetIdx := range hb.targets {
+                       select {
+                       case hb.targets[targetIdx].targetChan <- nil:
+                       default:
+                               // We failed to send a heartbeat because the 
other goroutine was busy and
+                               // hasn't cleared the previous one from its 
channel.  This could indicate a
+                               // stuck goroutine.
+                               hb.lg.Infof("%s: could not send heartbeat to 
%s.\n",
+                                       hb.String(), hb.targets[targetIdx])
+                       }
+               }
+       }
+}

Reply via email to