http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go 
b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
new file mode 100644
index 0000000..9157965
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "htrace/common"
+       "htrace/conf"
+       "testing"
+       "time"
+)
+
+func TestHeartbeaterStartupShutdown(t *testing.T) {
+       cnfBld := conf.Builder{
+               Values:   conf.TEST_VALUES(),
+               Defaults: conf.DEFAULTS,
+       }
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create conf: %s", err.Error())
+       }
+       lg := common.NewLogger("heartbeater", cnf)
+       hb := NewHeartbeater("ExampleHeartbeater", 1, lg)
+       if hb.String() != "ExampleHeartbeater" {
+               t.Fatalf("hb.String() returned %s instead of %s\n", 
hb.String(), "ExampleHeartbeater")
+       }
+       hb.Shutdown()
+}
+
+// The number of milliseconds between heartbeats
+const HEARTBEATER_PERIOD = 5
+
+// The number of heartbeats to send in the test.
+const NUM_TEST_HEARTBEATS = 3
+
+func TestHeartbeaterSendsHeartbeats(t *testing.T) {
+       cnfBld := conf.Builder{
+               Values:   conf.TEST_VALUES(),
+               Defaults: conf.DEFAULTS,
+       }
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create conf: %s", err.Error())
+       }
+       lg := common.NewLogger("heartbeater", cnf)
+       // The minimum amount of time which the heartbeater test should take
+       MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * 
HEARTBEATER_PERIOD)
+       duration := MINIMUM_TEST_DURATION
+       for duration <= MINIMUM_TEST_DURATION {
+               start := time.Now()
+               testHeartbeaterSendsHeartbeatsImpl(t, lg)
+               end := time.Now()
+               duration = end.Sub(start)
+               lg.Debugf("Measured duration: %v; minimum expected duration: 
%v\n",
+                       duration, MINIMUM_TEST_DURATION)
+       }
+}
+
+func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) {
+       hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg)
+       if hb.String() != "ExampleHeartbeater" {
+               t.Fatalf("hb.String() returned %s instead of %s\n", 
hb.String(), "ExampleHeartbeater")
+       }
+       testChan := make(chan interface{}, NUM_TEST_HEARTBEATS)
+       gotAllHeartbeats := make(chan bool)
+       hb.AddHeartbeatTarget(&HeartbeatTarget{
+               name:       "ExampleHeartbeatTarget",
+               targetChan: testChan,
+       })
+       go func() {
+               for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+                       <-testChan
+               }
+               gotAllHeartbeats <- true
+               for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+                       _, open := <-testChan
+                       if !open {
+                               return
+                       }
+               }
+       }()
+       <-gotAllHeartbeats
+       hb.Shutdown()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/hrpc.go 
b/htrace-htraced/go/src/htrace/htraced/hrpc.go
new file mode 100644
index 0000000..8b5a728
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/hrpc.go
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bufio"
+       "bytes"
+       "encoding/binary"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "github.com/ugorji/go/codec"
+       "htrace/common"
+       "htrace/conf"
+       "io"
+       "net"
+       "net/rpc"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+const MAX_HRPC_HANDLERS = 32765
+
+// Handles HRPC calls
+type HrpcHandler struct {
+       lg    *common.Logger
+       store *dataStore
+}
+
+// The HRPC server
+type HrpcServer struct {
+       *rpc.Server
+       hand *HrpcHandler
+
+       // The listener we are using to accept new connections.
+       listener net.Listener
+
+       // A WaitGroup used to block until the HRPC server has exited.
+       exited sync.WaitGroup
+
+       // A channel containing server codecs to use.  This channel is fully
+       // buffered.  The number of entries it initially contains determines how
+       // many concurrent codecs we will have running at once.
+       cdcs chan *HrpcServerCodec
+
+       // Used to shut down
+       shutdown chan interface{}
+
+       // The I/O timeout to use when reading requests or sending responses.  
This
+       // timeout does not apply to the time we spend processing the message.
+       ioTimeo time.Duration
+
+       // A count of all I/O errors that we have encountered since the server
+       // started.  This counts errors like improperly formatted message 
frames,
+       // but not errors like properly formatted but invalid messages.
+       // This count is updated from multiple goroutines via sync/atomic.
+       ioErrorCount uint64
+
+       // The test hooks to use, or nil during normal operation.
+       testHooks *hrpcTestHooks
+}
+
+type hrpcTestHooks struct {
+       // A callback we make right after calling Accept() but before reading 
from
+       // the new connection.
+       HandleAdmission func()
+}
+
+// A codec which encodes HRPC data via JSON.  This structure holds the context
+// for a particular client connection.
+type HrpcServerCodec struct {
+       lg *common.Logger
+
+       // The current connection.
+       conn net.Conn
+
+       // The HrpcServer which this connection is part of.
+       hsv *HrpcServer
+
+       // The message length we read from the header.
+       length uint32
+
+       // The number of messages this connection has handled.
+       numHandled int
+
+       // The buffer for reading requests.  These buffers are reused for 
multiple
+       // requests to avoid allocating memory.
+       buf []byte
+
+       // Configuration for msgpack decoding
+       msgpackHandle codec.MsgpackHandle
+}
+
+func asJson(val interface{}) string {
+       js, err := json.Marshal(val)
+       if err != nil {
+               return "encoding error: " + err.Error()
+       }
+       return string(js)
+}
+
+func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
+       return newIoError(cdc, val, common.WARN)
+}
+
+func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
+       if cdc.lg.LevelEnabled(level) {
+               cdc.lg.Write(level, cdc.conn.RemoteAddr().String()+": 
"+val+"\n")
+       }
+       if level >= common.INFO {
+               atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
+       }
+       return errors.New(val)
+}
+
+func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
+       hdr := common.HrpcRequestHeader{}
+       if cdc.lg.TraceEnabled() {
+               cdc.lg.Tracef("%s: Reading HRPC request header.\n", 
cdc.conn.RemoteAddr())
+       }
+       cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
+       err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
+       if err != nil {
+               if err == io.EOF && cdc.numHandled > 0 {
+                       return newIoError(cdc, fmt.Sprintf("Remote closed 
connection "+
+                               "after writing %d message(s)", cdc.numHandled), 
common.DEBUG)
+               }
+               return newIoError(cdc,
+                       fmt.Sprintf("Error reading request header: %s", 
err.Error()), common.WARN)
+       }
+       if cdc.lg.TraceEnabled() {
+               cdc.lg.Tracef("%s: Read HRPC request header %s\n",
+                       cdc.conn.RemoteAddr(), asJson(&hdr))
+       }
+       if hdr.Magic != common.HRPC_MAGIC {
+               return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: 
expected "+
+                       "magic number of 0x%04x, but got 0x%04x", 
common.HRPC_MAGIC, hdr.Magic))
+       }
+       if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
+               return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too 
long.  "+
+                       "Maximum length is %d, but we got %d.", 
common.MAX_HRPC_BODY_LENGTH,
+                       hdr.Length))
+       }
+       req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+       if req.ServiceMethod == "" {
+               return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 
0x%04x",
+                       hdr.MethodId))
+       }
+       req.Seq = hdr.Seq
+       cdc.length = hdr.Length
+       return nil
+}
+
+func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+       remoteAddr := cdc.conn.RemoteAddr().String()
+       if cdc.lg.TraceEnabled() {
+               cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
+                       remoteAddr, cdc.length)
+       }
+       if cap(cdc.buf) < int(cdc.length) {
+               var pow uint
+               for pow = 0; (1 << pow) < int(cdc.length); pow++ {
+               }
+               cdc.buf = make([]byte, 0, 1<<pow)
+       }
+       _, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
+       if err != nil {
+               return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte 
"+
+                       "request body: %s", cdc.length, err.Error()))
+       }
+       var zeroTime time.Time
+       cdc.conn.SetDeadline(zeroTime)
+
+       dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
+       err = dec.Decode(body)
+       if cdc.lg.TraceEnabled() {
+               cdc.lg.Tracef("%s: read HRPC message: %s\n",
+                       remoteAddr, asJson(&body))
+       }
+       req := body.(*common.WriteSpansReq)
+       if req == nil {
+               return nil
+       }
+       // We decode WriteSpans requests in a streaming fashion, to avoid 
overloading the garbage
+       // collector with a ton of trace spans all at once.
+       startTime := time.Now()
+       client, _, err := net.SplitHostPort(remoteAddr)
+       if err != nil {
+               return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host 
and port "+
+                       "for %s: %s\n", remoteAddr, err.Error()))
+       }
+       hand := cdc.hsv.hand
+       ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+       for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
+               var span *common.Span
+               err := dec.Decode(&span)
+               if err != nil {
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to 
decode span %d "+
+                               "out of %d: %s\n", spanIdx, req.NumSpans, 
err.Error()))
+               }
+               ing.IngestSpan(span)
+       }
+       ing.Close(startTime)
+       return nil
+}
+
+var EMPTY []byte = make([]byte, 0)
+
+func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) 
error {
+       cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
+       var err error
+       buf := EMPTY
+       if msg != nil {
+               w := bytes.NewBuffer(make([]byte, 0, 128))
+               enc := codec.NewEncoder(w, &cdc.msgpackHandle)
+               err := enc.Encode(msg)
+               if err != nil {
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to 
marshal "+
+                               "response message: %s", err.Error()))
+               }
+               buf = w.Bytes()
+       }
+       hdr := common.HrpcResponseHeader{}
+       hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
+       hdr.Seq = resp.Seq
+       hdr.ErrLength = uint32(len(resp.Error))
+       hdr.Length = uint32(len(buf))
+       writer := bufio.NewWriterSize(cdc.conn, 256)
+       err = binary.Write(writer, binary.LittleEndian, &hdr)
+       if err != nil {
+               return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write 
response "+
+                       "header: %s", err.Error()))
+       }
+       if hdr.ErrLength > 0 {
+               _, err = io.WriteString(writer, resp.Error)
+               if err != nil {
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write 
error "+
+                               "string: %s", err.Error()))
+               }
+       }
+       if hdr.Length > 0 {
+               var length int
+               length, err = writer.Write(buf)
+               if err != nil {
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write 
response "+
+                               "message: %s", err.Error()))
+               }
+               if uint32(length) != hdr.Length {
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write 
all of "+
+                               "response message: %s", err.Error()))
+               }
+       }
+       err = writer.Flush()
+       if err != nil {
+               return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the 
response "+
+                       "bytes: %s", err.Error()))
+       }
+       cdc.numHandled++
+       return nil
+}
+
+func (cdc *HrpcServerCodec) Close() error {
+       err := cdc.conn.Close()
+       cdc.conn = nil
+       cdc.length = 0
+       cdc.numHandled = 0
+       cdc.hsv.cdcs <- cdc
+       return err
+}
+
+func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
+       resp *common.WriteSpansResp) (err error) {
+       // Nothing to do here; WriteSpans is handled in ReadRequestBody.
+       return nil
+}
+
+func CreateHrpcServer(cnf *conf.Config, store *dataStore,
+       testHooks *hrpcTestHooks) (*HrpcServer, error) {
+       lg := common.NewLogger("hrpc", cnf)
+       numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
+       if numHandlers < 1 {
+               lg.Warnf("%s must be positive: using 1 handler.\n", 
conf.HTRACE_NUM_HRPC_HANDLERS)
+               numHandlers = 1
+       }
+       if numHandlers > MAX_HRPC_HANDLERS {
+               lg.Warnf("%s cannot be more than %d: using %d handlers\n",
+                       conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, 
MAX_HRPC_HANDLERS)
+               numHandlers = MAX_HRPC_HANDLERS
+       }
+       hsv := &HrpcServer{
+               Server: rpc.NewServer(),
+               hand: &HrpcHandler{
+                       lg:    lg,
+                       store: store,
+               },
+               cdcs:     make(chan *HrpcServerCodec, numHandlers),
+               shutdown: make(chan interface{}),
+               ioTimeo: time.Millisecond *
+                       
time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
+               testHooks: testHooks,
+       }
+       for i := 0; i < numHandlers; i++ {
+               hsv.cdcs <- &HrpcServerCodec{
+                       lg:  lg,
+                       hsv: hsv,
+                       msgpackHandle: codec.MsgpackHandle{
+                               WriteExt: true,
+                       },
+               }
+       }
+       var err error
+       hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
+       if err != nil {
+               return nil, err
+       }
+       hsv.Server.Register(hsv.hand)
+       hsv.exited.Add(1)
+       go hsv.run()
+       lg.Infof("Started HRPC server on %s with %d handler routines. "+
+               "ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
+               hsv.ioTimeo.String())
+       return hsv, nil
+}
+
+func (hsv *HrpcServer) run() {
+       lg := hsv.hand.lg
+       srvAddr := hsv.listener.Addr().String()
+       defer func() {
+               lg.Infof("HrpcServer on %s exiting\n", srvAddr)
+               hsv.exited.Done()
+       }()
+       for {
+               select {
+               case cdc := <-hsv.cdcs:
+                       conn, err := hsv.listener.Accept()
+                       if err != nil {
+                               lg.Errorf("HrpcServer on %s got accept error: 
%s\n", srvAddr, err.Error())
+                               hsv.cdcs <- cdc // never blocks; there is 
always sufficient buffer space
+                               continue
+                       }
+                       if lg.TraceEnabled() {
+                               lg.Tracef("%s: Accepted HRPC connection.\n", 
conn.RemoteAddr())
+                       }
+                       cdc.conn = conn
+                       cdc.numHandled = 0
+                       if hsv.testHooks != nil && 
hsv.testHooks.HandleAdmission != nil {
+                               hsv.testHooks.HandleAdmission()
+                       }
+                       go hsv.ServeCodec(cdc)
+               case <-hsv.shutdown:
+                       return
+               }
+       }
+}
+
+func (hsv *HrpcServer) Addr() net.Addr {
+       return hsv.listener.Addr()
+}
+
+func (hsv *HrpcServer) GetNumIoErrors() uint64 {
+       return atomic.LoadUint64(&hsv.ioErrorCount)
+}
+
+func (hsv *HrpcServer) Close() {
+       close(hsv.shutdown)
+       hsv.listener.Close()
+       hsv.exited.Wait()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/htraced.go 
b/htrace-htraced/go/src/htrace/htraced/htraced.go
new file mode 100644
index 0000000..0d41e0d
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/htraced.go
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bufio"
+       "encoding/json"
+       "fmt"
+       "github.com/alecthomas/kingpin"
+       "github.com/jmhodges/levigo"
+       "htrace/common"
+       "htrace/conf"
+       "net"
+       "os"
+       "runtime"
+       "time"
+)
+
+var RELEASE_VERSION string
+var GIT_VERSION string
+
+const USAGE = `htraced: the HTrace server daemon.
+
+htraced receives trace spans sent from HTrace clients.  It exposes a REST
+interface which others can query.  It also runs a web server with a graphical
+user interface.  htraced stores its span data in levelDB files on the local
+disks.
+
+Usage:
+--help: this help message
+
+-Dk=v: set configuration key 'k' to value 'v'
+For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost,
+port 8080.  -Dlog.level=DEBUG will set the default log level to DEBUG.
+
+-Dk: set configuration key 'k' to 'true'
+
+Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME 
+ `
+configuration file.  We find this file by searching the paths in the
+` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate 
way
+of setting configuration when launching the daemon.
+`
+
+func main() {
+       // Load the htraced configuration.
+       // This also parses the -Dfoo=bar command line arguments and removes 
them
+       // from os.Argv.
+       cnf, cnfLog := conf.LoadApplicationConfig("htraced.")
+
+       // Parse the remaining command-line arguments.
+       app := kingpin.New(os.Args[0], USAGE)
+       version := app.Command("version", "Print server version and exit.")
+       cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
+
+       // Handle the "version" command-line argument.
+       if cmd == version.FullCommand() {
+               fmt.Printf("Running htraced %s [%s].\n", RELEASE_VERSION, 
GIT_VERSION)
+               os.Exit(0)
+       }
+
+       // Open the HTTP port.
+       // We want to do this first, before initializing the datastore or 
setting up
+       // logging.  That way, if someone accidentally starts two daemons with 
the
+       // same config file, the second invocation will exit with a "port in 
use"
+       // error rather than potentially disrupting the first invocation.
+       rstListener, listenErr := net.Listen("tcp", 
cnf.Get(conf.HTRACE_WEB_ADDRESS))
+       if listenErr != nil {
+               fmt.Fprintf(os.Stderr, "Error opening HTTP port: %s\n",
+                       listenErr.Error())
+               os.Exit(1)
+       }
+
+       // Print out the startup banner and information about the daemon
+       // configuration.
+       lg := common.NewLogger("main", cnf)
+       defer lg.Close()
+       lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, 
GIT_VERSION)
+       scanner := bufio.NewScanner(cnfLog)
+       for scanner.Scan() {
+               lg.Infof(scanner.Text() + "\n")
+       }
+       common.InstallSignalHandlers(cnf)
+       if runtime.GOMAXPROCS(0) == 1 {
+               ncpu := runtime.NumCPU()
+               runtime.GOMAXPROCS(ncpu)
+               lg.Infof("setting GOMAXPROCS=%d\n", ncpu)
+       } else {
+               lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0))
+       }
+       lg.Infof("leveldb version=%d.%d\n",
+               levigo.GetLevelDBMajorVersion(), 
levigo.GetLevelDBMinorVersion())
+
+       // Initialize the datastore.
+       store, err := CreateDataStore(cnf, nil)
+       if err != nil {
+               lg.Errorf("Error creating datastore: %s\n", err.Error())
+               os.Exit(1)
+       }
+       var rsv *RestServer
+       rsv, err = CreateRestServer(cnf, store, rstListener)
+       if err != nil {
+               lg.Errorf("Error creating REST server: %s\n", err.Error())
+               os.Exit(1)
+       }
+       var hsv *HrpcServer
+       if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+               hsv, err = CreateHrpcServer(cnf, store, nil)
+               if err != nil {
+                       lg.Errorf("Error creating HRPC server: %s\n", 
err.Error())
+                       os.Exit(1)
+               }
+       } else {
+               lg.Infof("Not starting HRPC server because no value was given 
for %s.\n",
+                       conf.HTRACE_HRPC_ADDRESS)
+       }
+       naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
+       if naddr != "" {
+               notif := StartupNotification{
+                       HttpAddr:  rsv.Addr().String(),
+                       ProcessId: os.Getpid(),
+               }
+               if hsv != nil {
+                       notif.HrpcAddr = hsv.Addr().String()
+               }
+               err = sendStartupNotification(naddr, &notif)
+               if err != nil {
+                       fmt.Fprintf(os.Stderr, "Failed to send startup 
notification: "+
+                               "%s\n", err.Error())
+                       os.Exit(1)
+               }
+       }
+       for {
+               time.Sleep(time.Duration(10) * time.Hour)
+       }
+}
+
+// A startup notification message that we optionally send on startup.
+// Used by unit tests.
+type StartupNotification struct {
+       HttpAddr  string
+       HrpcAddr  string
+       ProcessId int
+}
+
+func sendStartupNotification(naddr string, notif *StartupNotification) error {
+       conn, err := net.Dial("tcp", naddr)
+       if err != nil {
+               return err
+       }
+       defer func() {
+               if conn != nil {
+                       conn.Close()
+               }
+       }()
+       var buf []byte
+       buf, err = json.Marshal(notif)
+       if err != nil {
+               return err
+       }
+       _, err = conn.Write(buf)
+       conn.Close()
+       conn = nil
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/loader.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/loader.go 
b/htrace-htraced/go/src/htrace/htraced/loader.go
new file mode 100644
index 0000000..95c5c3e
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/loader.go
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "github.com/jmhodges/levigo"
+       "github.com/ugorji/go/codec"
+       "htrace/common"
+       "htrace/conf"
+       "io"
+       "math"
+       "math/rand"
+       "os"
+       "strings"
+       "syscall"
+       "time"
+)
+
+// Routines for loading the datastore.
+
+// The leveldb key which has information about the shard.
+const SHARD_INFO_KEY = 'w'
+
+// A constant signifying that we don't know what the layout version is.
+const UNKNOWN_LAYOUT_VERSION = 0
+
+// The current layout version.  We cannot read layout versions newer than this.
+// We may sometimes be able to read older versions, but only by doing an
+// upgrade.
+const CURRENT_LAYOUT_VERSION = 3
+
+type DataStoreLoader struct {
+       // The dataStore logger.
+       lg *common.Logger
+
+       // True if we should clear the stored data.
+       ClearStored bool
+
+       // The shards that we're loading
+       shards []*ShardLoader
+
+       // The options to use for opening datastores in LevelDB.
+       openOpts *levigo.Options
+
+       // The read options to use for LevelDB.
+       readOpts *levigo.ReadOptions
+
+       // The write options to use for LevelDB.
+       writeOpts *levigo.WriteOptions
+}
+
+// Information about a Shard.
+type ShardInfo struct {
+       // The layout version of the datastore.
+       // We should always keep this field so that old software can recognize 
new
+       // layout versions, even if it can't read them.
+       LayoutVersion uint64
+
+       // A random number identifying this daemon.
+       DaemonId uint64
+
+       // The total number of shards in this datastore.
+       TotalShards uint32
+
+       // The index of this shard within the datastore.
+       ShardIndex uint32
+}
+
+// Create a new datastore loader.
+// Initializes the loader, but does not load any leveldb instances.
+func NewDataStoreLoader(cnf *conf.Config) *DataStoreLoader {
+       dld := &DataStoreLoader{
+               lg:          common.NewLogger("datastore", cnf),
+               ClearStored: cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR),
+       }
+       dld.readOpts = levigo.NewReadOptions()
+       dld.readOpts.SetFillCache(true)
+       dld.readOpts.SetVerifyChecksums(false)
+       dld.writeOpts = levigo.NewWriteOptions()
+       dld.writeOpts.SetSync(false)
+       dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
+       rdirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
+       // Filter out empty entries
+       dirs := make([]string, 0, len(rdirs))
+       for i := range rdirs {
+               if strings.TrimSpace(rdirs[i]) != "" {
+                       dirs = append(dirs, rdirs[i])
+               }
+       }
+       dld.shards = make([]*ShardLoader, len(dirs))
+       for i := range dirs {
+               dld.shards[i] = &ShardLoader{
+                       dld:  dld,
+                       path: dirs[i] + conf.PATH_SEP + "db",
+               }
+       }
+       dld.openOpts = levigo.NewOptions()
+       cacheSize := cnf.GetInt(conf.HTRACE_LEVELDB_CACHE_SIZE)
+       dld.openOpts.SetCache(levigo.NewLRUCache(cacheSize))
+       dld.openOpts.SetParanoidChecks(false)
+       writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE)
+       if writeBufferSize > 0 {
+               dld.openOpts.SetWriteBufferSize(writeBufferSize)
+       }
+       maxFdPerShard := dld.calculateMaxOpenFilesPerShard()
+       if maxFdPerShard > 0 {
+               dld.openOpts.SetMaxOpenFiles(maxFdPerShard)
+       }
+       return dld
+}
+
+func (dld *DataStoreLoader) Close() {
+       if dld.lg != nil {
+               dld.lg.Close()
+               dld.lg = nil
+       }
+       if dld.openOpts != nil {
+               dld.openOpts.Close()
+               dld.openOpts = nil
+       }
+       if dld.readOpts != nil {
+               dld.readOpts.Close()
+               dld.readOpts = nil
+       }
+       if dld.writeOpts != nil {
+               dld.writeOpts.Close()
+               dld.writeOpts = nil
+       }
+       if dld.shards != nil {
+               for i := range dld.shards {
+                       if dld.shards[i] != nil {
+                               dld.shards[i].Close()
+                       }
+               }
+               dld.shards = nil
+       }
+}
+
+func (dld *DataStoreLoader) DisownResources() {
+       dld.lg = nil
+       dld.openOpts = nil
+       dld.readOpts = nil
+       dld.writeOpts = nil
+       dld.shards = nil
+}
+
+// The maximum number of file descriptors we'll use on non-datastore things.
+const NON_DATASTORE_FD_MAX = 300
+
+// The minimum number of file descriptors per shard we will set.  Setting fewer
+// than this number could trigger a bug in some early versions of leveldb.
+const MIN_FDS_PER_SHARD = 80
+
+func (dld *DataStoreLoader) calculateMaxOpenFilesPerShard() int {
+       var rlim syscall.Rlimit
+       err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim)
+       if err != nil {
+               dld.lg.Warnf("Unable to calculate maximum open files per shard: 
"+
+                       "getrlimit failed: %s\n", err.Error())
+               return 0
+       }
+       // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems,
+       // but there's no harm in being careful.  'int' in golang always holds 
at
+       // least 32 bits.
+       var maxFd int
+       if rlim.Cur > uint64(math.MaxInt32) {
+               maxFd = math.MaxInt32
+       } else {
+               maxFd = int(rlim.Cur)
+       }
+       if len(dld.shards) == 0 {
+               dld.lg.Warnf("Unable to calculate maximum open files per shard, 
" +
+                       "since there are 0 shards configured.\n")
+               return 0
+       }
+       fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(dld.shards)
+       if fdsPerShard < MIN_FDS_PER_SHARD {
+               dld.lg.Warnf("Expected to be able to use at least %d "+
+                       "fds per shard, but we have %d shards and %d total fds 
to allocate, "+
+                       "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD,
+                       len(dld.shards), maxFd-NON_DATASTORE_FD_MAX, 
fdsPerShard)
+               return 0
+       }
+       dld.lg.Infof("maxFd = %d.  Setting maxFdPerShard = %d\n",
+               maxFd, fdsPerShard)
+       return fdsPerShard
+}
+
+// Load information about all shards.
+func (dld *DataStoreLoader) LoadShards() {
+       for i := range dld.shards {
+               shd := dld.shards[i]
+               shd.load()
+       }
+}
+
+// Verify that the shard infos are consistent.
+// Reorders the shardInfo structures based on their ShardIndex.
+func (dld *DataStoreLoader) VerifyShardInfos() error {
+       if len(dld.shards) < 1 {
+               return errors.New("No shard directories found.")
+       }
+       // Make sure no shards had errors.
+       for i := range dld.shards {
+               shd := dld.shards[i]
+               if shd.infoErr != nil {
+                       return shd.infoErr
+               }
+       }
+       // Make sure that if any shards are empty, all shards are empty.
+       emptyShards := ""
+       prefix := ""
+       for i := range dld.shards {
+               if dld.shards[i].info == nil {
+                       emptyShards = prefix + dld.shards[i].path
+                       prefix = ", "
+               }
+       }
+       if emptyShards != "" {
+               for i := range dld.shards {
+                       if dld.shards[i].info != nil {
+                               return errors.New(fmt.Sprintf("Shards %s were 
empty, but "+
+                                       "the other shards had data.", 
emptyShards))
+                       }
+               }
+               // All shards are empty.
+               return nil
+       }
+       // Make sure that all shards have the same layout version, daemonId, 
and number of total
+       // shards.
+       layoutVersion := dld.shards[0].info.LayoutVersion
+       daemonId := dld.shards[0].info.DaemonId
+       totalShards := dld.shards[0].info.TotalShards
+       for i := 1; i < len(dld.shards); i++ {
+               shd := dld.shards[i]
+               if layoutVersion != shd.info.LayoutVersion {
+                       return errors.New(fmt.Sprintf("Layout version mismatch. 
 Shard "+
+                               "%s has layout version 0x%016x, but shard %s 
has layout "+
+                               "version 0x%016x.",
+                               dld.shards[0].path, layoutVersion, shd.path, 
shd.info.LayoutVersion))
+               }
+               if daemonId != shd.info.DaemonId {
+                       return errors.New(fmt.Sprintf("DaemonId mismatch. Shard 
%s has "+
+                               "daemonId 0x%016x, but shard %s has daemonId 
0x%016x.",
+                               dld.shards[0].path, daemonId, shd.path, 
shd.info.DaemonId))
+               }
+               if totalShards != shd.info.TotalShards {
+                       return errors.New(fmt.Sprintf("TotalShards mismatch.  
Shard %s has "+
+                               "TotalShards = %d, but shard %s has TotalShards 
= %d.",
+                               dld.shards[0].path, totalShards, shd.path, 
shd.info.TotalShards))
+               }
+               if shd.info.ShardIndex >= totalShards {
+                       return errors.New(fmt.Sprintf("Invalid ShardIndex.  
Shard %s has "+
+                               "ShardIndex = %d, but TotalShards = %d.",
+                               shd.path, shd.info.ShardIndex, 
shd.info.TotalShards))
+               }
+       }
+       if layoutVersion != CURRENT_LAYOUT_VERSION {
+               return errors.New(fmt.Sprintf("The layout version of all shards 
"+
+                       "is %d, but we only support version %d.",
+                       layoutVersion, CURRENT_LAYOUT_VERSION))
+       }
+       if totalShards != uint32(len(dld.shards)) {
+               return errors.New(fmt.Sprintf("The TotalShards field of all 
shards "+
+                       "is %d, but we have %d shards.", totalShards, 
len(dld.shards)))
+       }
+       // Reorder shards in order of their ShardIndex.
+       reorderedShards := make([]*ShardLoader, len(dld.shards))
+       for i := 0; i < len(dld.shards); i++ {
+               shd := dld.shards[i]
+               shardIdx := shd.info.ShardIndex
+               if reorderedShards[shardIdx] != nil {
+                       return errors.New(fmt.Sprintf("Both shard %s and "+
+                               "shard %s have ShardIndex %d.", shd.path,
+                               reorderedShards[shardIdx].path, shardIdx))
+               }
+               reorderedShards[shardIdx] = shd
+       }
+       dld.shards = reorderedShards
+       return nil
+}
+
+func (dld *DataStoreLoader) Load() error {
+       var err error
+       // If data.store.clear was set, clear existing data.
+       if dld.ClearStored {
+               err = dld.clearStored()
+               if err != nil {
+                       return err
+               }
+       }
+       // Make sure the shard directories exist in all cases, with a mkdir -p
+       for i := range dld.shards {
+               err := os.MkdirAll(dld.shards[i].path, 0777)
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Failed to MkdirAll(%s): 
%s",
+                               dld.shards[i].path, err.Error()))
+               }
+       }
+       // Get information about each shard, and verify them.
+       dld.LoadShards()
+       err = dld.VerifyShardInfos()
+       if err != nil {
+               return err
+       }
+       if dld.shards[0].ldb != nil {
+               dld.lg.Infof("Loaded %d leveldb instances with "+
+                       "DaemonId of 0x%016x\n", len(dld.shards),
+                       dld.shards[0].info.DaemonId)
+       } else {
+               // Create leveldb instances if needed.
+               rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
+               daemonId := uint64(rnd.Int63())
+               dld.lg.Infof("Initializing %d leveldb instances with a new "+
+                       "DaemonId of 0x%016x\n", len(dld.shards), daemonId)
+               dld.openOpts.SetCreateIfMissing(true)
+               for i := range dld.shards {
+                       shd := dld.shards[i]
+                       shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
+                       if err != nil {
+                               return errors.New(fmt.Sprintf("levigo.Open(%s) 
failed to "+
+                                       "create the shard: %s", shd.path, 
err.Error()))
+                       }
+                       info := &ShardInfo{
+                               LayoutVersion: CURRENT_LAYOUT_VERSION,
+                               DaemonId:      daemonId,
+                               TotalShards:   uint32(len(dld.shards)),
+                               ShardIndex:    uint32(i),
+                       }
+                       err = shd.writeShardInfo(info)
+                       if err != nil {
+                               return errors.New(fmt.Sprintf("levigo.Open(%s) 
failed to "+
+                                       "write shard info: %s", shd.path, 
err.Error()))
+                       }
+                       dld.lg.Infof("Shard %s initialized with ShardInfo %s 
\n",
+                               shd.path, asJson(info))
+               }
+       }
+       return nil
+}
+
+func (dld *DataStoreLoader) clearStored() error {
+       for i := range dld.shards {
+               path := dld.shards[i].path
+               fi, err := os.Stat(path)
+               if err != nil && !os.IsNotExist(err) {
+                       dld.lg.Errorf("Failed to stat %s: %s\n", path, 
err.Error())
+                       return err
+               }
+               if fi != nil {
+                       err = os.RemoveAll(path)
+                       if err != nil {
+                               dld.lg.Errorf("Failed to clear existing 
datastore directory %s: %s\n",
+                                       path, err.Error())
+                               return err
+                       }
+                       dld.lg.Infof("Cleared existing datastore directory 
%s\n", path)
+               }
+       }
+       return nil
+}
+
+type ShardLoader struct {
+       // The parent DataStoreLoader
+       dld *DataStoreLoader
+
+       // Path to the shard
+       path string
+
+       // Leveldb instance of the shard
+       ldb *levigo.DB
+
+       // Information about the shard
+       info *ShardInfo
+
+       // If non-null, the error we encountered trying to load the shard info.
+       infoErr error
+}
+
+func (shd *ShardLoader) Close() {
+       if shd.ldb != nil {
+               shd.ldb.Close()
+               shd.ldb = nil
+       }
+}
+
+// Load information about a particular shard.
+func (shd *ShardLoader) load() {
+       shd.info = nil
+       fi, err := os.Stat(shd.path)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       shd.infoErr = nil
+                       return
+               }
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "stat() error on leveldb directory "+
+                               "%s: %s", shd.path, err.Error()))
+               return
+       }
+       if !fi.Mode().IsDir() {
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "stat() error on leveldb directory "+
+                               "%s: inode is not directory.", shd.path))
+               return
+       }
+       var dbDir *os.File
+       dbDir, err = os.Open(shd.path)
+       if err != nil {
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "open() error on leveldb directory "+
+                               "%s: %s.", shd.path, err.Error()))
+               return
+       }
+       defer func() {
+               if dbDir != nil {
+                       dbDir.Close()
+               }
+       }()
+       _, err = dbDir.Readdirnames(1)
+       if err != nil {
+               if err == io.EOF {
+                       // The db directory is empty.
+                       shd.infoErr = nil
+                       return
+               }
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "Readdirnames() error on leveldb directory "+
+                               "%s: %s.", shd.path, err.Error()))
+               return
+       }
+       dbDir.Close()
+       dbDir = nil
+       shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
+       if err != nil {
+               shd.ldb = nil
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "levigo.Open() error on leveldb directory "+
+                               "%s: %s.", shd.path, err.Error()))
+               return
+       }
+       shd.info, err = shd.readShardInfo()
+       if err != nil {
+               shd.infoErr = err
+               return
+       }
+       shd.infoErr = nil
+}
+
+func (shd *ShardLoader) readShardInfo() (*ShardInfo, error) {
+       buf, err := shd.ldb.Get(shd.dld.readOpts, []byte{SHARD_INFO_KEY})
+       if err != nil {
+               return nil, errors.New(fmt.Sprintf("readShardInfo(%s): failed 
to "+
+                       "read shard info key: %s", shd.path, err.Error()))
+       }
+       if len(buf) == 0 {
+               return nil, errors.New(fmt.Sprintf("readShardInfo(%s): got 
zero-"+
+                       "length value for shard info key.", shd.path))
+       }
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       r := bytes.NewBuffer(buf)
+       decoder := codec.NewDecoder(r, mh)
+       shardInfo := &ShardInfo{
+               LayoutVersion: UNKNOWN_LAYOUT_VERSION,
+       }
+       err = decoder.Decode(shardInfo)
+       if err != nil {
+               return nil, errors.New(fmt.Sprintf("readShardInfo(%s): msgpack 
"+
+                       "decoding failed for shard info key: %s", shd.path, 
err.Error()))
+       }
+       return shardInfo, nil
+}
+
+func (shd *ShardLoader) writeShardInfo(info *ShardInfo) error {
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       w := new(bytes.Buffer)
+       enc := codec.NewEncoder(w, mh)
+       err := enc.Encode(info)
+       if err != nil {
+               return errors.New(fmt.Sprintf("msgpack encoding error: %s",
+                       err.Error()))
+       }
+       err = shd.ldb.Put(shd.dld.writeOpts, []byte{SHARD_INFO_KEY}, w.Bytes())
+       if err != nil {
+               return errors.New(fmt.Sprintf("leveldb write error: %s",
+                       err.Error()))
+       }
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/metrics.go 
b/htrace-htraced/go/src/htrace/htraced/metrics.go
new file mode 100644
index 0000000..d2feca8
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/metrics.go
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "htrace/common"
+       "htrace/conf"
+       "math"
+       "sync"
+       "time"
+)
+
+//
+// The Metrics Sink for HTraced.
+//
+// The Metrics sink keeps track of metrics for the htraced daemon.
+// It is important to have good metrics so that we can properly manager 
htraced.  In particular, we
+// need to know what rate we are receiving spans at, the main places spans 
came from.  If spans
+// were dropped because of a high sampling rates, we need to know which part 
of the system dropped
+// them so that we can adjust the sampling rate there.
+//
+
+const LATENCY_CIRC_BUF_SIZE = 4096
+
+type MetricsSink struct {
+       // The metrics sink logger.
+       lg *common.Logger
+
+       // The maximum number of entries we shuld allow in the HostSpanMetrics 
map.
+       maxMtx int
+
+       // The total number of spans ingested by the server (counting dropped 
spans)
+       IngestedSpans uint64
+
+       // The total number of spans written to leveldb since the server 
started.
+       WrittenSpans uint64
+
+       // The total number of spans dropped by the server.
+       ServerDropped uint64
+
+       // Per-host Span Metrics
+       HostSpanMetrics common.SpanMetricsMap
+
+       // The last few writeSpan latencies
+       wsLatencyCircBuf *CircBufU32
+
+       // Lock protecting all metrics
+       lock sync.Mutex
+}
+
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+       return &MetricsSink{
+               lg:               common.NewLogger("metrics", cnf),
+               maxMtx:           
cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+               HostSpanMetrics:  make(common.SpanMetricsMap),
+               wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
+       }
+}
+
+// Update the total number of spans which were ingested, as well as other
+// metrics that get updated during span ingest.
+func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
+       serverDropped int, wsLatency time.Duration) {
+       msink.lock.Lock()
+       defer msink.lock.Unlock()
+       msink.IngestedSpans += uint64(totalIngested)
+       msink.ServerDropped += uint64(serverDropped)
+       msink.updateSpanMetrics(addr, 0, serverDropped)
+       wsLatencyMs := wsLatency.Nanoseconds() / 1000000
+       var wsLatency32 uint32
+       if wsLatencyMs > math.MaxUint32 {
+               wsLatency32 = math.MaxUint32
+       } else {
+               wsLatency32 = uint32(wsLatencyMs)
+       }
+       msink.wsLatencyCircBuf.Append(wsLatency32)
+}
+
+// Update the per-host span metrics.  Must be called with the lock held.
+func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
+       serverDropped int) {
+       mtx, found := msink.HostSpanMetrics[addr]
+       if !found {
+               // Ensure that the per-host span metrics map doesn't grow too 
large.
+               if len(msink.HostSpanMetrics) >= msink.maxMtx {
+                       // Delete a random entry
+                       for k := range msink.HostSpanMetrics {
+                               msink.lg.Warnf("Evicting metrics entry for addr 
%s "+
+                                       "because there are more than %d 
addrs.\n", k, msink.maxMtx)
+                               delete(msink.HostSpanMetrics, k)
+                               break
+                       }
+               }
+               mtx = &common.SpanMetrics{}
+               msink.HostSpanMetrics[addr] = mtx
+       }
+       mtx.Written += uint64(numWritten)
+       mtx.ServerDropped += uint64(serverDropped)
+}
+
+// Update the total number of spans which were persisted to disk.
+func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
+       serverDropped int) {
+       msink.lock.Lock()
+       defer msink.lock.Unlock()
+       msink.WrittenSpans += uint64(totalWritten)
+       msink.ServerDropped += uint64(serverDropped)
+       msink.updateSpanMetrics(addr, totalWritten, serverDropped)
+}
+
+// Read the server stats.
+func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
+       msink.lock.Lock()
+       defer msink.lock.Unlock()
+       stats.IngestedSpans = msink.IngestedSpans
+       stats.WrittenSpans = msink.WrittenSpans
+       stats.ServerDroppedSpans = msink.ServerDropped
+       stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
+       stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
+       stats.HostSpanMetrics = make(common.SpanMetricsMap)
+       for k, v := range msink.HostSpanMetrics {
+               stats.HostSpanMetrics[k] = &common.SpanMetrics{
+                       Written:       v.Written,
+                       ServerDropped: v.ServerDropped,
+               }
+       }
+}
+
+// A circular buffer of uint32s which supports appending and taking the
+// average, and some other things.
+type CircBufU32 struct {
+       // The next slot to fill
+       slot int
+
+       // The number of slots which are in use.  This number only ever
+       // increases until the buffer is full.
+       slotsUsed int
+
+       // The buffer
+       buf []uint32
+}
+
+func NewCircBufU32(size int) *CircBufU32 {
+       return &CircBufU32{
+               slotsUsed: -1,
+               buf:       make([]uint32, size),
+       }
+}
+
+func (cbuf *CircBufU32) Max() uint32 {
+       var max uint32
+       for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+               if cbuf.buf[bufIdx] > max {
+                       max = cbuf.buf[bufIdx]
+               }
+       }
+       return max
+}
+
+func (cbuf *CircBufU32) Average() uint32 {
+       var total uint64
+       for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+               total += uint64(cbuf.buf[bufIdx])
+       }
+       return uint32(total / uint64(cbuf.slotsUsed))
+}
+
+func (cbuf *CircBufU32) Append(val uint32) {
+       cbuf.buf[cbuf.slot] = val
+       cbuf.slot++
+       if cbuf.slotsUsed < cbuf.slot {
+               cbuf.slotsUsed = cbuf.slot
+       }
+       if cbuf.slot >= len(cbuf.buf) {
+               cbuf.slot = 0
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/metrics_test.go 
b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
new file mode 100644
index 0000000..4f27ffd
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "fmt"
+       htrace "htrace/client"
+       "htrace/common"
+       "htrace/conf"
+       "reflect"
+       "testing"
+       "time"
+)
+
+func compareTotals(a, b common.SpanMetricsMap) bool {
+       for k, v := range a {
+               if !reflect.DeepEqual(v, b[k]) {
+                       return false
+               }
+       }
+       for k, v := range b {
+               if !reflect.DeepEqual(v, a[k]) {
+                       return false
+               }
+       }
+       return true
+}
+
+type Fatalfer interface {
+       Fatalf(format string, args ...interface{})
+}
+
+func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink,
+       expectedNumWritten int) {
+       var sstats common.ServerStats
+       msink.PopulateServerStats(&sstats)
+       if sstats.WrittenSpans != uint64(expectedNumWritten) {
+               t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n",
+                       sstats.WrittenSpans, len(SIMPLE_TEST_SPANS))
+       }
+       if sstats.HostSpanMetrics["127.0.0.1"] == nil {
+               t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] 
found.")
+       }
+       if sstats.HostSpanMetrics["127.0.0.1"].Written !=
+               uint64(expectedNumWritten) {
+               t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but "+
+                       "expected %d\n", 
sstats.HostSpanMetrics["127.0.0.1"].Written,
+                       len(SIMPLE_TEST_SPANS))
+       }
+}
+
+func TestMetricsSinkPerHostEviction(t *testing.T) {
+       cnfBld := conf.Builder{
+               Values:   conf.TEST_VALUES(),
+               Defaults: conf.DEFAULTS,
+       }
+       cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create conf: %s", err.Error())
+       }
+       msink := NewMetricsSink(cnf)
+       msink.UpdatePersisted("192.168.0.100", 20, 10)
+       msink.UpdatePersisted("192.168.0.101", 20, 10)
+       msink.UpdatePersisted("192.168.0.102", 20, 10)
+       msink.lock.Lock()
+       defer msink.lock.Unlock()
+       if len(msink.HostSpanMetrics) != 2 {
+               for k, v := range msink.HostSpanMetrics {
+                       fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v)
+               }
+               t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got 
%d\n",
+                       len(msink.HostSpanMetrics))
+       }
+}
+
+func TestIngestedSpansMetricsRest(t *testing.T) {
+       testIngestedSpansMetricsImpl(t, false)
+}
+
+func TestIngestedSpansMetricsPacked(t *testing.T) {
+       testIngestedSpansMetricsImpl(t, true)
+}
+
+func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics",
+               DataDirs: make([]string, 2),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks{
+               HrpcDisabled: !usePacked,
+       })
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+
+       NUM_TEST_SPANS := 12
+       allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+       err = hcl.WriteSpans(allSpans)
+       if err != nil {
+               t.Fatalf("WriteSpans failed: %s\n", err.Error())
+       }
+       for {
+               var stats *common.ServerStats
+               stats, err = hcl.GetServerStats()
+               if err != nil {
+                       t.Fatalf("GetServerStats failed: %s\n", err.Error())
+               }
+               if stats.IngestedSpans == uint64(NUM_TEST_SPANS) {
+                       break
+               }
+               time.Sleep(1 * time.Millisecond)
+       }
+}
+
+func TestCircBuf32(t *testing.T) {
+       cbuf := NewCircBufU32(3)
+       // We arbitrarily define that empty circular buffers have an average of 
0.
+       if cbuf.Average() != 0 {
+               t.Fatalf("expected empty CircBufU32 to have an average of 0.\n")
+       }
+       if cbuf.Max() != 0 {
+               t.Fatalf("expected empty CircBufU32 to have a max of 0.\n")
+       }
+       cbuf.Append(2)
+       if cbuf.Average() != 2 {
+               t.Fatalf("expected one-element CircBufU32 to have an average of 
2.\n")
+       }
+       cbuf.Append(10)
+       if cbuf.Average() != 6 {
+               t.Fatalf("expected two-element CircBufU32 to have an average of 
6.\n")
+       }
+       cbuf.Append(12)
+       if cbuf.Average() != 8 {
+               t.Fatalf("expected three-element CircBufU32 to have an average 
of 8.\n")
+       }
+       cbuf.Append(14)
+       // The 14 overwrites the original 2 element.
+       if cbuf.Average() != 12 {
+               t.Fatalf("expected three-element CircBufU32 to have an average 
of 12.\n")
+       }
+       cbuf.Append(1)
+       // The 1 overwrites the original 10 element.
+       if cbuf.Average() != 9 {
+               t.Fatalf("expected three-element CircBufU32 to have an average 
of 12.\n")
+       }
+       if cbuf.Max() != 14 {
+               t.Fatalf("expected three-element CircBufU32 to have a max of 
14.\n")
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/mini_htraced.go 
b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
new file mode 100644
index 0000000..af8d379
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "fmt"
+       "htrace/common"
+       "htrace/conf"
+       "io/ioutil"
+       "net"
+       "os"
+       "strings"
+)
+
+//
+// MiniHTraceD is used in unit tests to set up a daemon with certain settings.
+// It takes care of things like creating and cleaning up temporary directories.
+//
+
+// The default number of managed data directories to use.
+const DEFAULT_NUM_DATA_DIRS = 2
+
+// Builds a MiniHTraced object.
+type MiniHTracedBuilder struct {
+       // The name of the MiniHTraced to build.  This shows up in the test 
directory name and some
+       // other places.
+       Name string
+
+       // The configuration values to use for the MiniHTraced.
+       // If ths is nil, we use the default configuration for everything.
+       Cnf map[string]string
+
+       // The DataDirs to use.  Empty entries will turn into random names.
+       DataDirs []string
+
+       // If true, we will keep the data dirs around after MiniHTraced#Close
+       KeepDataDirsOnClose bool
+
+       // If non-null, the WrittenSpans semaphore to use when creating the 
DataStore.
+       WrittenSpans *common.Semaphore
+
+       // The test hooks to use for the HRPC server
+       HrpcTestHooks *hrpcTestHooks
+}
+
+type MiniHTraced struct {
+       Name                string
+       Cnf                 *conf.Config
+       DataDirs            []string
+       Store               *dataStore
+       Rsv                 *RestServer
+       Hsv                 *HrpcServer
+       Lg                  *common.Logger
+       KeepDataDirsOnClose bool
+}
+
+func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
+       var err error
+       var store *dataStore
+       var rsv *RestServer
+       var hsv *HrpcServer
+       if bld.Name == "" {
+               bld.Name = "HTraceTest"
+       }
+       if bld.Cnf == nil {
+               bld.Cnf = make(map[string]string)
+       }
+       if bld.DataDirs == nil {
+               bld.DataDirs = make([]string, 2)
+       }
+       for idx := range bld.DataDirs {
+               if bld.DataDirs[idx] == "" {
+                       bld.DataDirs[idx], err = ioutil.TempDir(os.TempDir(),
+                               fmt.Sprintf("%s%d", bld.Name, idx+1))
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       }
+       // Copy the default test configuration values.
+       for k, v := range conf.TEST_VALUES() {
+               _, hasVal := bld.Cnf[k]
+               if !hasVal {
+                       bld.Cnf[k] = v
+               }
+       }
+       bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
+               strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
+       cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               return nil, err
+       }
+       lg := common.NewLogger("mini.htraced", cnf)
+       defer func() {
+               if err != nil {
+                       if store != nil {
+                               store.Close()
+                       }
+                       for idx := range bld.DataDirs {
+                               if !bld.KeepDataDirsOnClose {
+                                       if bld.DataDirs[idx] != "" {
+                                               os.RemoveAll(bld.DataDirs[idx])
+                                       }
+                               }
+                       }
+                       if rsv != nil {
+                               rsv.Close()
+                       }
+                       lg.Infof("Failed to create MiniHTraced %s: %s\n", 
bld.Name, err.Error())
+                       lg.Close()
+               }
+       }()
+       store, err = CreateDataStore(cnf, bld.WrittenSpans)
+       if err != nil {
+               return nil, err
+       }
+       rstListener, listenErr := net.Listen("tcp", 
cnf.Get(conf.HTRACE_WEB_ADDRESS))
+       if listenErr != nil {
+               return nil, listenErr
+       }
+       defer func() {
+               if rstListener != nil {
+                       rstListener.Close()
+               }
+       }()
+       rsv, err = CreateRestServer(cnf, store, rstListener)
+       if err != nil {
+               return nil, err
+       }
+       rstListener = nil
+       hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks)
+       if err != nil {
+               return nil, err
+       }
+
+       lg.Infof("Created MiniHTraced %s\n", bld.Name)
+       return &MiniHTraced{
+               Name:                bld.Name,
+               Cnf:                 cnf,
+               DataDirs:            bld.DataDirs,
+               Store:               store,
+               Rsv:                 rsv,
+               Hsv:                 hsv,
+               Lg:                  lg,
+               KeepDataDirsOnClose: bld.KeepDataDirsOnClose,
+       }, nil
+}
+
+// Return a Config object that clients can use to connect to this MiniHTraceD.
+func (ht *MiniHTraced) ClientConf() *conf.Config {
+       return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+               conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
+}
+
+// Return a Config object that clients can use to connect to this MiniHTraceD
+// by HTTP only (no HRPC).
+func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
+       return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+               conf.HTRACE_HRPC_ADDRESS, "")
+}
+
+func (ht *MiniHTraced) Close() {
+       ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
+       ht.Rsv.Close()
+       ht.Hsv.Close()
+       ht.Store.Close()
+       if !ht.KeepDataDirsOnClose {
+               for idx := range ht.DataDirs {
+                       ht.Lg.Infof("Removing %s...\n", ht.DataDirs[idx])
+                       os.RemoveAll(ht.DataDirs[idx])
+               }
+       }
+       ht.Lg.Infof("Finished closing MiniHTraced %s\n", ht.Name)
+       ht.Lg.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/reaper_test.go 
b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
new file mode 100644
index 0000000..af11e38
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "fmt"
+       "htrace/common"
+       "htrace/conf"
+       "htrace/test"
+       "math/rand"
+       "testing"
+       "time"
+)
+
+func TestReapingOldSpans(t *testing.T) {
+       const NUM_TEST_SPANS = 20
+       testSpans := make([]*common.Span, NUM_TEST_SPANS)
+       rnd := rand.New(rand.NewSource(2))
+       now := common.TimeToUnixMs(time.Now().UTC())
+       for i := range testSpans {
+               testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i])
+               testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i)
+               testSpans[i].Description = fmt.Sprintf("Span%02d", i)
+       }
+       htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans",
+               Cnf: map[string]string{
+                       conf.HTRACE_SPAN_EXPIRY_MS:                
fmt.Sprintf("%d", 60*60*1000),
+                       conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS:    "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1",
+               },
+               WrittenSpans: common.NewSemaphore(0),
+               DataDirs:     make([]string, 2),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create mini htraced cluster: %s\n", 
err.Error())
+       }
+       ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
+       for spanIdx := range testSpans {
+               ing.IngestSpan(testSpans[spanIdx])
+       }
+       ing.Close(time.Now())
+       // Wait the spans to be created
+       ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
+       // Set a reaper date that will remove all the spans except final one.
+       ht.Store.rpr.SetReaperDate(now)
+
+       common.WaitFor(5*time.Minute, time.Millisecond, func() bool {
+               for i := 0; i < NUM_TEST_SPANS-1; i++ {
+                       span := ht.Store.FindSpan(testSpans[i].Id)
+                       if span != nil {
+                               ht.Store.lg.Debugf("Waiting for %s to be 
removed...\n",
+                                       testSpans[i].Description)
+                               return false
+                       }
+               }
+               span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id)
+               if span == nil {
+                       ht.Store.lg.Debugf("Did not expect %s to be removed\n",
+                               testSpans[NUM_TEST_SPANS-1].Description)
+                       return false
+               }
+               return true
+       })
+       defer ht.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/rest.go 
b/htrace-htraced/go/src/htrace/htraced/rest.go
new file mode 100644
index 0000000..1ba4791
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/rest.go
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "github.com/gorilla/mux"
+       "htrace/common"
+       "htrace/conf"
+       "net"
+       "net/http"
+       "os"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "time"
+)
+
+// Set the response headers.
+func setResponseHeaders(hdr http.Header) {
+       hdr.Set("Content-Type", "application/json")
+}
+
+// Write a JSON error response.
+func writeError(lg *common.Logger, w http.ResponseWriter, errCode int,
+       errStr string) {
+       str := strings.Replace(errStr, `"`, `'`, -1)
+       lg.Info(str + "\n")
+       w.WriteHeader(errCode)
+       w.Write([]byte(`{ "error" : "` + str + `"}`))
+}
+
+type serverVersionHandler struct {
+       lg *common.Logger
+}
+
+func (hand *serverVersionHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       setResponseHeaders(w.Header())
+       version := common.ServerVersion{ReleaseVersion: RELEASE_VERSION,
+               GitVersion: GIT_VERSION}
+       buf, err := json.Marshal(&version)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("error marshalling ServerVersion: %s\n", 
err.Error()))
+               return
+       }
+       if hand.lg.DebugEnabled() {
+               hand.lg.Debugf("Returned ServerVersion %s\n", string(buf))
+       }
+       w.Write(buf)
+}
+
+type serverDebugInfoHandler struct {
+       lg *common.Logger
+}
+
+func (hand *serverDebugInfoHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       setResponseHeaders(w.Header())
+       buf := make([]byte, 1<<20)
+       common.GetStackTraces(&buf)
+       resp := common.ServerDebugInfo{
+               StackTraces: string(buf),
+               GCStats:     common.GetGCStats(),
+       }
+       buf, err := json.Marshal(&resp)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("error marshalling ServerDebugInfo: %s\n", 
err.Error()))
+               return
+       }
+       w.Write(buf)
+       hand.lg.Info("Returned ServerDebugInfo\n")
+}
+
+type serverStatsHandler struct {
+       dataStoreHandler
+}
+
+func (hand *serverStatsHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       setResponseHeaders(w.Header())
+       hand.lg.Debugf("serverStatsHandler\n")
+       stats := hand.store.ServerStats()
+       buf, err := json.Marshal(&stats)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("error marshalling ServerStats: %s\n", 
err.Error()))
+               return
+       }
+       hand.lg.Debugf("Returned ServerStats %s\n", string(buf))
+       w.Write(buf)
+}
+
+type serverConfHandler struct {
+       cnf *conf.Config
+       lg  *common.Logger
+}
+
+func (hand *serverConfHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       setResponseHeaders(w.Header())
+       hand.lg.Debugf("serverConfHandler\n")
+       cnfMap := hand.cnf.Export()
+       buf, err := json.Marshal(&cnfMap)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("error marshalling serverConf: %s\n", 
err.Error()))
+               return
+       }
+       hand.lg.Debugf("Returned server configuration %s\n", string(buf))
+       w.Write(buf)
+}
+
+type dataStoreHandler struct {
+       lg    *common.Logger
+       store *dataStore
+}
+
+func (hand *dataStoreHandler) parseSid(w http.ResponseWriter,
+       str string) (common.SpanId, bool) {
+       var id common.SpanId
+       err := id.FromString(str)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusBadRequest,
+                       fmt.Sprintf("Failed to parse span ID %s: %s", str, 
err.Error()))
+               w.Write([]byte("Error parsing : " + err.Error()))
+               return common.INVALID_SPAN_ID, false
+       }
+       return id, true
+}
+
+func (hand *dataStoreHandler) getReqField32(fieldName string, w 
http.ResponseWriter,
+       req *http.Request) (int32, bool) {
+       str := req.FormValue(fieldName)
+       if str == "" {
+               writeError(hand.lg, w, http.StatusBadRequest, fmt.Sprintf("No 
%s specified.", fieldName))
+               return -1, false
+       }
+       val, err := strconv.ParseUint(str, 16, 32)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusBadRequest,
+                       fmt.Sprintf("Error parsing %s: %s.", fieldName, 
err.Error()))
+               return -1, false
+       }
+       return int32(val), true
+}
+
+type findSidHandler struct {
+       dataStoreHandler
+}
+
+func (hand *findSidHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       setResponseHeaders(w.Header())
+       req.ParseForm()
+       vars := mux.Vars(req)
+       stringSid := vars["id"]
+       sid, ok := hand.parseSid(w, stringSid)
+       if !ok {
+               return
+       }
+       hand.lg.Debugf("findSidHandler(sid=%s)\n", sid.String())
+       span := hand.store.FindSpan(sid)
+       if span == nil {
+               writeError(hand.lg, w, http.StatusNoContent,
+                       fmt.Sprintf("No such span as %s\n", sid.String()))
+               return
+       }
+       w.Write(span.ToJson())
+}
+
+type findChildrenHandler struct {
+       dataStoreHandler
+}
+
+func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       setResponseHeaders(w.Header())
+       req.ParseForm()
+       vars := mux.Vars(req)
+       stringSid := vars["id"]
+       sid, ok := hand.parseSid(w, stringSid)
+       if !ok {
+               return
+       }
+       var lim int32
+       lim, ok = hand.getReqField32("lim", w, req)
+       if !ok {
+               return
+       }
+       hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", sid.String(), 
lim)
+       children := hand.store.FindChildren(sid, lim)
+       jbytes, err := json.Marshal(children)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("Error marshalling children: %s", 
err.Error()))
+               return
+       }
+       w.Write(jbytes)
+}
+
+type writeSpansHandler struct {
+       dataStoreHandler
+}
+
+func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       startTime := time.Now()
+       setResponseHeaders(w.Header())
+       client, _, serr := net.SplitHostPort(req.RemoteAddr)
+       if serr != nil {
+               writeError(hand.lg, w, http.StatusBadRequest,
+                       fmt.Sprintf("Failed to split host and port for %s: 
%s\n",
+                               req.RemoteAddr, serr.Error()))
+               return
+       }
+       dec := json.NewDecoder(req.Body)
+       var msg common.WriteSpansReq
+       err := dec.Decode(&msg)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusBadRequest,
+                       fmt.Sprintf("Error parsing WriteSpansReq: %s", 
err.Error()))
+               return
+       }
+       if hand.lg.TraceEnabled() {
+               hand.lg.Tracef("%s: read WriteSpans REST message: %s\n",
+                       req.RemoteAddr, asJson(&msg))
+       }
+       ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
+       for spanIdx := 0; spanIdx < msg.NumSpans; spanIdx++ {
+               var span *common.Span
+               err := dec.Decode(&span)
+               if err != nil {
+                       writeError(hand.lg, w, http.StatusBadRequest,
+                               fmt.Sprintf("Failed to decode span %d out of 
%d: ",
+                                       spanIdx, msg.NumSpans, err.Error()))
+                       return
+               }
+               ing.IngestSpan(span)
+       }
+       ing.Close(startTime)
+       return
+}
+
+type queryHandler struct {
+       lg *common.Logger
+       dataStoreHandler
+}
+
+func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+       setResponseHeaders(w.Header())
+       queryString := req.FormValue("query")
+       if queryString == "" {
+               writeError(hand.lg, w, http.StatusBadRequest, "No query 
provided.\n")
+               return
+       }
+       var query common.Query
+       reader := bytes.NewBufferString(queryString)
+       dec := json.NewDecoder(reader)
+       err := dec.Decode(&query)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusBadRequest,
+                       fmt.Sprintf("Error parsing query '%s': %s", 
queryString, err.Error()))
+               return
+       }
+       var results []*common.Span
+       results, err, _ = hand.store.HandleQuery(&query)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("Internal error processing query %s: %s",
+                               query.String(), err.Error()))
+               return
+       }
+       var jbytes []byte
+       jbytes, err = json.Marshal(results)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("Error marshalling results: %s", 
err.Error()))
+               return
+       }
+       w.Write(jbytes)
+}
+
+type logErrorHandler struct {
+       lg *common.Logger
+}
+
+func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       hand.lg.Errorf("Got unknown request %s\n", req.RequestURI)
+       writeError(hand.lg, w, http.StatusBadRequest, "Unknown request.")
+}
+
+type RestServer struct {
+       http.Server
+       listener net.Listener
+       lg       *common.Logger
+}
+
+func CreateRestServer(cnf *conf.Config, store *dataStore,
+       listener net.Listener) (*RestServer, error) {
+       var err error
+       rsv := &RestServer{}
+       rsv.lg = common.NewLogger("rest", cnf)
+
+       r := mux.NewRouter().StrictSlash(false)
+
+       r.Handle("/server/info", &serverVersionHandler{lg: 
rsv.lg}).Methods("GET")
+       r.Handle("/server/version", &serverVersionHandler{lg: 
rsv.lg}).Methods("GET")
+       r.Handle("/server/debugInfo", &serverDebugInfoHandler{lg: 
rsv.lg}).Methods("GET")
+
+       serverStatsH := &serverStatsHandler{dataStoreHandler: dataStoreHandler{
+               store: store, lg: rsv.lg}}
+       r.Handle("/server/stats", serverStatsH).Methods("GET")
+
+       serverConfH := &serverConfHandler{cnf: cnf, lg: rsv.lg}
+       r.Handle("/server/conf", serverConfH).Methods("GET")
+
+       writeSpansH := &writeSpansHandler{dataStoreHandler: dataStoreHandler{
+               store: store, lg: rsv.lg}}
+       r.Handle("/writeSpans", writeSpansH).Methods("POST")
+
+       queryH := &queryHandler{lg: rsv.lg, dataStoreHandler: 
dataStoreHandler{store: store}}
+       r.Handle("/query", queryH).Methods("GET")
+
+       span := r.PathPrefix("/span").Subrouter()
+       findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: 
store, lg: rsv.lg}}
+       span.Handle("/{id}", findSidH).Methods("GET")
+
+       findChildrenH := &findChildrenHandler{dataStoreHandler: 
dataStoreHandler{store: store,
+               lg: rsv.lg}}
+       span.Handle("/{id}/children", findChildrenH).Methods("GET")
+
+       // Default Handler. This will serve requests for static requests.
+       webdir := os.Getenv("HTRACED_WEB_DIR")
+       if webdir == "" {
+               webdir, err = 
filepath.Abs(filepath.Join(filepath.Dir(os.Args[0]), "..", "web"))
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       rsv.lg.Infof(`Serving static files from "%s"`+"\n", webdir)
+       
r.PathPrefix("/").Handler(http.FileServer(http.Dir(webdir))).Methods("GET")
+
+       // Log an error message for unknown non-GET requests.
+       r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg})
+
+       rsv.listener = listener
+       rsv.Handler = r
+       rsv.ErrorLog = rsv.lg.Wrap("[REST] ", common.INFO)
+       go rsv.Serve(rsv.listener)
+       rsv.lg.Infof("Started REST server on %s\n", 
rsv.listener.Addr().String())
+       return rsv, nil
+}
+
+func (rsv *RestServer) Addr() net.Addr {
+       return rsv.listener.Addr()
+}
+
+func (rsv *RestServer) Close() {
+       rsv.listener.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htracedTool/cmd.go 
b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
new file mode 100644
index 0000000..65b67e5
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bufio"
+       "bytes"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "github.com/alecthomas/kingpin"
+       htrace "htrace/client"
+       "htrace/common"
+       "htrace/conf"
+       "io"
+       "os"
+       "sort"
+       "strings"
+       "text/tabwriter"
+       "time"
+)
+
+var RELEASE_VERSION string
+var GIT_VERSION string
+
+const EXIT_SUCCESS = 0
+const EXIT_FAILURE = 1
+
+var verbose bool
+
+const USAGE = `The Apache HTrace command-line tool.  This tool retrieves and 
modifies settings and
+other data on a running htraced daemon.
+
+If we find an ` + conf.CONFIG_FILE_NAME + ` configuration file in the list of 
directories
+specified in ` + conf.HTRACED_CONF_DIR + `, we will use that configuration; 
otherwise,
+the defaults will be used.
+`
+
+func main() {
+       // Load htraced configuration
+       cnf, cnfLog := conf.LoadApplicationConfig("htrace.tool.")
+       lg := common.NewLogger("conf", cnf)
+       defer lg.Close()
+       scanner := bufio.NewScanner(cnfLog)
+       for scanner.Scan() {
+               lg.Debugf(scanner.Text() + "\n")
+       }
+
+       // Parse argv
+       app := kingpin.New(os.Args[0], USAGE)
+       app.Flag("Dmy.key", "Set configuration key 'my.key' to 'my.value'.  
Replace 'my.key' "+
+               "with any key you want to set.").Default("my.value").String()
+       addr := app.Flag("addr", "Server address.").String()
+       verbose = *app.Flag("verbose", "Verbose.").Default("false").Bool()
+       version := app.Command("version", "Print the version of this program.")
+       serverVersion := app.Command("serverVersion", "Print the version of the 
htraced server.")
+       serverStats := app.Command("serverStats", "Print statistics retrieved 
from the htraced server.")
+       serverStatsJson := serverStats.Flag("json", "Display statistics as raw 
JSON.").Default("false").Bool()
+       serverDebugInfo := app.Command("serverDebugInfo", "Print the debug info 
of the htraced server.")
+       serverConf := app.Command("serverConf", "Print the server configuration 
retrieved from the htraced server.")
+       findSpan := app.Command("findSpan", "Print information about a trace 
span with a given ID.")
+       findSpanId := findSpan.Arg("id", "Span ID to find. Example: 
be305e54-4534-2110-a0b2-e06b9effe112").Required().String()
+       findChildren := app.Command("findChildren", "Print out the span IDs 
that are children of a given span ID.")
+       parentSpanId := findChildren.Arg("id", "Span ID to print children for. 
Example: be305e54-4534-2110-a0b2-e06b9effe112").
+               Required().String()
+       childLim := findChildren.Flag("lim", "Maximum number of child IDs to 
print.").Default("20").Int()
+       loadFile := app.Command("loadFile", "Write whitespace-separated JSON 
spans from a file to the server.")
+       loadFilePath := loadFile.Arg("path",
+               "A file containing whitespace-separated span 
JSON.").Required().String()
+       loadJson := app.Command("load", "Write JSON spans from the command-line 
to the server.")
+       loadJsonArg := loadJson.Arg("json", "A JSON span to write to the 
server.").Required().String()
+       dumpAll := app.Command("dumpAll", "Dump all spans from the htraced 
daemon.")
+       dumpAllOutPath := dumpAll.Arg("path", "The path to dump the trace spans 
to.").Default("-").String()
+       dumpAllLim := dumpAll.Flag("lim", "The number of spans to transfer from 
the server at once.").
+               Default("100").Int()
+       graph := app.Command("graph", "Visualize span JSON as a graph.")
+       graphJsonFile := graph.Arg("input", "The JSON file to 
load").Required().String()
+       graphDotFile := graph.Flag("output",
+               "The path to write a GraphViz dotfile to.  This file can be 
used as input to "+
+                       "GraphViz, in order to generate a pretty picture.  See 
graphviz.org for more "+
+                       "information about generating pictures of 
graphs.").Default("-").String()
+       query := app.Command("query", "Send a query to htraced.")
+       queryLim := query.Flag("lim", "Maximum number of spans to 
retrieve.").Default("20").Int()
+       queryArg := query.Arg("query", "The query string to send.  Query 
strings have the format "+
+               "[TYPE] [OPERATOR] [CONST], joined by AND 
statements.").Required().String()
+       rawQuery := app.Command("rawQuery", "Send a raw JSON query to htraced.")
+       rawQueryArg := rawQuery.Arg("json", "The query JSON to 
send.").Required().String()
+       cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
+
+       // Add the command-line settings into the configuration.
+       if *addr != "" {
+               cnf = cnf.Clone(conf.HTRACE_WEB_ADDRESS, *addr)
+       }
+
+       // Handle commands that don't require an HTrace client.
+       switch cmd {
+       case version.FullCommand():
+               os.Exit(printVersion())
+       case graph.FullCommand():
+               err := jsonSpanFileToDotFile(*graphJsonFile, *graphDotFile)
+               if err != nil {
+                       fmt.Printf("graphing error: %s\n", err.Error())
+                       os.Exit(EXIT_FAILURE)
+               }
+               os.Exit(EXIT_SUCCESS)
+       }
+
+       // Create HTrace client
+       hcl, err := htrace.NewClient(cnf, nil)
+       if err != nil {
+               fmt.Printf("Failed to create HTrace client: %s\n", err.Error())
+               os.Exit(EXIT_FAILURE)
+       }
+
+       // Handle commands that require an HTrace client.
+       switch cmd {
+       case version.FullCommand():
+               os.Exit(printVersion())
+       case serverVersion.FullCommand():
+               os.Exit(printServerVersion(hcl))
+       case serverStats.FullCommand():
+               if *serverStatsJson {
+                       os.Exit(printServerStatsJson(hcl))
+               } else {
+                       os.Exit(printServerStats(hcl))
+               }
+       case serverDebugInfo.FullCommand():
+               os.Exit(printServerDebugInfo(hcl))
+       case serverConf.FullCommand():
+               os.Exit(printServerConfJson(hcl))
+       case findSpan.FullCommand():
+               var id *common.SpanId
+               id.FromString(*findSpanId)
+               os.Exit(doFindSpan(hcl, *id))
+       case findChildren.FullCommand():
+               var id *common.SpanId
+               id.FromString(*parentSpanId)
+               os.Exit(doFindChildren(hcl, *id, *childLim))
+       case loadJson.FullCommand():
+               os.Exit(doLoadSpanJson(hcl, *loadJsonArg))
+       case loadFile.FullCommand():
+               os.Exit(doLoadSpanJsonFile(hcl, *loadFilePath))
+       case dumpAll.FullCommand():
+               err := doDumpAll(hcl, *dumpAllOutPath, *dumpAllLim)
+               if err != nil {
+                       fmt.Printf("dumpAll error: %s\n", err.Error())
+                       os.Exit(EXIT_FAILURE)
+               }
+               os.Exit(EXIT_SUCCESS)
+       case query.FullCommand():
+               err := doQueryFromString(hcl, *queryArg, *queryLim)
+               if err != nil {
+                       fmt.Printf("query error: %s\n", err.Error())
+                       os.Exit(EXIT_FAILURE)
+               }
+               os.Exit(EXIT_SUCCESS)
+       case rawQuery.FullCommand():
+               err := doRawQuery(hcl, *rawQueryArg)
+               if err != nil {
+                       fmt.Printf("raw query error: %s\n", err.Error())
+                       os.Exit(EXIT_FAILURE)
+               }
+               os.Exit(EXIT_SUCCESS)
+       }
+
+       app.UsageErrorf(os.Stderr, "You must supply a command to run.")
+}
+
+// Print the version of the htrace binary.
+func printVersion() int {
+       fmt.Printf("Running htracedTool %s [%s].\n", RELEASE_VERSION, 
GIT_VERSION)
+       return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info
+func printServerVersion(hcl *htrace.Client) int {
+       ver, err := hcl.GetServerVersion()
+       if err != nil {
+               fmt.Println(err.Error())
+               return EXIT_FAILURE
+       }
+       fmt.Printf("HTraced server version %s (%s)\n", ver.ReleaseVersion, 
ver.GitVersion)
+       return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info
+func printServerStats(hcl *htrace.Client) int {
+       stats, err := hcl.GetServerStats()
+       if err != nil {
+               fmt.Println(err.Error())
+               return EXIT_FAILURE
+       }
+       w := new(tabwriter.Writer)
+       w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+       fmt.Fprintf(w, "HTRACED SERVER STATS\n")
+       fmt.Fprintf(w, "Datastore Start\t%s\n",
+               common.UnixMsToTime(stats.LastStartMs).Format(time.RFC3339))
+       fmt.Fprintf(w, "Server Time\t%s\n",
+               common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
+       fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
+       fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans)
+       fmt.Fprintf(w, "Spans written\t%d\n", stats.WrittenSpans)
+       fmt.Fprintf(w, "Spans dropped by server\t%d\n", 
stats.ServerDroppedSpans)
+       dur := time.Millisecond * 
time.Duration(stats.AverageWriteSpansLatencyMs)
+       fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String())
+       dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs)
+       fmt.Fprintf(w, "Maximum WriteSpan Latency\t%s\n", dur.String())
+       fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs))
+       w.Flush()
+       fmt.Println("")
+       for i := range stats.Dirs {
+               dir := stats.Dirs[i]
+               fmt.Printf("==== %s ===\n", dir.Path)
+               fmt.Printf("Approximate number of bytes: %d\n", 
dir.ApproximateBytes)
+               stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
+               fmt.Printf("%s\n", stats)
+       }
+       w = new(tabwriter.Writer)
+       w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+       fmt.Fprintf(w, "HOST SPAN METRICS\n")
+       mtxMap := stats.HostSpanMetrics
+       keys := make(sort.StringSlice, len(mtxMap))
+       i := 0
+       for k, _ := range mtxMap {
+               keys[i] = k
+               i++
+       }
+       sort.Sort(keys)
+       for k := range keys {
+               mtx := mtxMap[keys[k]]
+               fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\n",
+                       keys[k], mtx.Written, mtx.ServerDropped)
+       }
+       w.Flush()
+       return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info as JSON
+func printServerStatsJson(hcl *htrace.Client) int {
+       stats, err := hcl.GetServerStats()
+       if err != nil {
+               fmt.Println(err.Error())
+               return EXIT_FAILURE
+       }
+       buf, err := json.MarshalIndent(stats, "", "  ")
+       if err != nil {
+               fmt.Printf("Error marshalling server stats: %s", err.Error())
+               return EXIT_FAILURE
+       }
+       fmt.Printf("%s\n", string(buf))
+       return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/debugInfo
+func printServerDebugInfo(hcl *htrace.Client) int {
+       stats, err := hcl.GetServerDebugInfo()
+       if err != nil {
+               fmt.Println(err.Error())
+               return EXIT_FAILURE
+       }
+       fmt.Println("=== GOROUTINE STACKS ===")
+       fmt.Print(stats.StackTraces)
+       fmt.Println("=== END GOROUTINE STACKS ===")
+       fmt.Println("=== GC STATISTICS ===")
+       fmt.Print(stats.GCStats)
+       fmt.Println("=== END GC STATISTICS ===")
+       return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/conf as JSON
+func printServerConfJson(hcl *htrace.Client) int {
+       cnf, err := hcl.GetServerConf()
+       if err != nil {
+               fmt.Println(err.Error())
+               return EXIT_FAILURE
+       }
+       buf, err := json.MarshalIndent(cnf, "", "  ")
+       if err != nil {
+               fmt.Printf("Error marshalling server conf: %s", err.Error())
+               return EXIT_FAILURE
+       }
+       fmt.Printf("%s\n", string(buf))
+       return EXIT_SUCCESS
+}
+
+// Print information about a trace span.
+func doFindSpan(hcl *htrace.Client, sid common.SpanId) int {
+       span, err := hcl.FindSpan(sid)
+       if err != nil {
+               fmt.Println(err.Error())
+               return EXIT_FAILURE
+       }
+       if span == nil {
+               fmt.Printf("Span ID not found.\n")
+               return EXIT_FAILURE
+       }
+       pbuf, err := json.MarshalIndent(span, "", "  ")
+       if err != nil {
+               fmt.Printf("Error: error pretty-printing span to JSON: %s\n", 
err.Error())
+               return EXIT_FAILURE
+       }
+       fmt.Printf("%s\n", string(pbuf))
+       return EXIT_SUCCESS
+}
+
+func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int {
+       if spanFile == "" {
+               fmt.Printf("You must specify the json file to load.\n")
+               return EXIT_FAILURE
+       }
+       file, err := OpenInputFile(spanFile)
+       if err != nil {
+               fmt.Printf("Failed to open %s: %s\n", spanFile, err.Error())
+               return EXIT_FAILURE
+       }
+       defer file.Close()
+       return doLoadSpans(hcl, bufio.NewReader(file))
+}
+
+func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
+       return doLoadSpans(hcl, bytes.NewBufferString(spanJson))
+}
+
+func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
+       dec := json.NewDecoder(reader)
+       spans := make([]*common.Span, 0, 32)
+       var err error
+       for {
+               var span common.Span
+               if err = dec.Decode(&span); err != nil {
+                       if err == io.EOF {
+                               break
+                       }
+                       fmt.Printf("Failed to decode JSON: %s\n", err.Error())
+                       return EXIT_FAILURE
+               }
+               spans = append(spans, &span)
+       }
+       if verbose {
+               fmt.Printf("Writing ")
+               prefix := ""
+               for i := range spans {
+                       fmt.Printf("%s%s", prefix, spans[i].ToJson())
+                       prefix = ", "
+               }
+               fmt.Printf("\n")
+       }
+       err = hcl.WriteSpans(spans)
+       if err != nil {
+               fmt.Println(err.Error())
+               return EXIT_FAILURE
+       }
+       return EXIT_SUCCESS
+}
+
+// Find information about the children of a span.
+func doFindChildren(hcl *htrace.Client, sid common.SpanId, lim int) int {
+       spanIds, err := hcl.FindChildren(sid, lim)
+       if err != nil {
+               fmt.Printf("%s\n", err.Error())
+               return EXIT_FAILURE
+       }
+       pbuf, err := json.MarshalIndent(spanIds, "", "  ")
+       if err != nil {
+               fmt.Println("Error: error pretty-printing span IDs to JSON: 
%s", err.Error())
+               return 1
+       }
+       fmt.Printf("%s\n", string(pbuf))
+       return 0
+}
+
+// Dump all spans from the htraced daemon.
+func doDumpAll(hcl *htrace.Client, outPath string, lim int) error {
+       file, err := CreateOutputFile(outPath)
+       if err != nil {
+               return err
+       }
+       w := bufio.NewWriter(file)
+       defer func() {
+               if file != nil {
+                       w.Flush()
+                       file.Close()
+               }
+       }()
+       out := make(chan *common.Span, 50)
+       var dumpErr error
+       go func() {
+               dumpErr = hcl.DumpAll(lim, out)
+       }()
+       var numSpans int64
+       nextLogTime := time.Now().Add(time.Second * 5)
+       for {
+               span, channelOpen := <-out
+               if !channelOpen {
+                       break
+               }
+               if err == nil {
+                       _, err = fmt.Fprintf(w, "%s\n", span.ToJson())
+               }
+               if verbose {
+                       numSpans++
+                       now := time.Now()
+                       if !now.Before(nextLogTime) {
+                               nextLogTime = now.Add(time.Second * 5)
+                               fmt.Printf("received %d span(s)...\n", numSpans)
+                       }
+               }
+       }
+       if err != nil {
+               return errors.New(fmt.Sprintf("Write error %s", err.Error()))
+       }
+       if dumpErr != nil {
+               return errors.New(fmt.Sprintf("Dump error %s", dumpErr.Error()))
+       }
+       err = w.Flush()
+       if err != nil {
+               return err
+       }
+       err = file.Close()
+       file = nil
+       if err != nil {
+               return err
+       }
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htracedTool/file.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htracedTool/file.go 
b/htrace-htraced/go/src/htrace/htracedTool/file.go
new file mode 100644
index 0000000..ca9c18d
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htracedTool/file.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bufio"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "htrace/common"
+       "io"
+       "os"
+)
+
+// A file used for input.
+// Transparently supports using stdin for input.
+type InputFile struct {
+       *os.File
+       path string
+}
+
+// Open an input file.  Stdin will be used when path is -
+func OpenInputFile(path string) (*InputFile, error) {
+       if path == "-" {
+               return &InputFile{File: os.Stdin, path: path}, nil
+       }
+       file, err := os.Open(path)
+       if err != nil {
+               return nil, err
+       }
+       return &InputFile{File: file, path: path}, nil
+}
+
+func (file *InputFile) Close() {
+       if file.path != "-" {
+               file.File.Close()
+       }
+}
+
+// A file used for output.
+// Transparently supports using stdout for output.
+type OutputFile struct {
+       *os.File
+       path string
+}
+
+// Create an output file.  Stdout will be used when path is -
+func CreateOutputFile(path string) (*OutputFile, error) {
+       if path == "-" {
+               return &OutputFile{File: os.Stdout, path: path}, nil
+       }
+       file, err := os.Create(path)
+       if err != nil {
+               return nil, err
+       }
+       return &OutputFile{File: file, path: path}, nil
+}
+
+func (file *OutputFile) Close() error {
+       if file.path != "-" {
+               return file.File.Close()
+       }
+       return nil
+}
+
+// FailureDeferringWriter is a writer which allows us to call Printf multiple
+// times and then check if all the printfs succeeded at the very end, rather
+// than checking after each call.   We will not attempt to write more data
+// after the first write failure.
+type FailureDeferringWriter struct {
+       io.Writer
+       err error
+}
+
+func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter {
+       return &FailureDeferringWriter{writer, nil}
+}
+
+func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) {
+       if w.err != nil {
+               return
+       }
+       str := fmt.Sprintf(format, v...)
+       _, err := w.Writer.Write([]byte(str))
+       if err != nil {
+               w.err = err
+       }
+}
+
+func (w *FailureDeferringWriter) Error() error {
+       return w.err
+}
+
+// Read a file full of whitespace-separated span JSON into a slice of spans.
+func readSpansFile(path string) (common.SpanSlice, error) {
+       file, err := OpenInputFile(path)
+       if err != nil {
+               return nil, err
+       }
+       defer file.Close()
+       return readSpans(bufio.NewReader(file))
+}
+
+// Read whitespace-separated span JSON into a slice of spans.
+func readSpans(reader io.Reader) (common.SpanSlice, error) {
+       spans := make(common.SpanSlice, 0)
+       dec := json.NewDecoder(reader)
+       for {
+               var span common.Span
+               err := dec.Decode(&span)
+               if err != nil {
+                       if err != io.EOF {
+                               return nil, errors.New(fmt.Sprintf("Decode 
error after decoding %d "+
+                                       "span(s): %s", len(spans), err.Error()))
+                       }
+                       break
+               }
+               spans = append(spans, &span)
+       }
+       return spans, nil
+}


Reply via email to