Repository: incubator-htrace
Updated Branches:
  refs/heads/master ef46897ff -> 021e49144


HTRACE-294. htraced: fix some metrics issues (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/021e4914
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/021e4914
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/021e4914

Branch: refs/heads/master
Commit: 021e491446716a57eb7f37f9328fe72d103b9823
Parents: ef46897
Author: Colin P. Mccabe <[email protected]>
Authored: Thu Nov 12 12:39:36 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Thu Nov 12 12:39:36 2015 -0800

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/client/client.go   |  11 +-
 .../go/src/org/apache/htrace/common/log.go      |  32 ++++
 .../go/src/org/apache/htrace/common/rpc.go      |  20 ++-
 .../src/org/apache/htrace/htraced/datastore.go  |  29 ++--
 .../go/src/org/apache/htrace/htraced/hrpc.go    |  14 +-
 .../go/src/org/apache/htrace/htraced/htraced.go |   7 +-
 .../go/src/org/apache/htrace/htraced/metrics.go | 148 ++++++++++++++++---
 .../org/apache/htrace/htraced/metrics_test.go   |  90 ++++++++++-
 .../org/apache/htrace/htraced/mini_htraced.go   |   7 +
 .../go/src/org/apache/htrace/htraced/rest.go    |  22 ++-
 .../go/src/org/apache/htrace/htracedTool/cmd.go |   8 +-
 .../src/main/webapp/app/server_info_view.js     |  14 ++
 .../src/main/webapp/app/server_stats.js         |   5 +-
 htrace-webapp/src/main/webapp/index.html        |  19 +++
 14 files changed, 373 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go 
b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index fb46e62..28b9e29 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -47,9 +47,11 @@ type Client struct {
 
        // HRPC address of the htraced server.
        hrpcAddr string
+}
 
-       // The HRPC client, or null if it is not enabled.
-       hcr *hClient
+// Disable HRPC
+func (hcl *Client) DisableHrpc() {
+       hcl.hrpcAddr = ""
 }
 
 // Get the htraced server version information.
@@ -243,9 +245,6 @@ func (hcl *Client) DumpAll(lim int, out chan *common.Span) 
error {
 }
 
 func (hcl *Client) Close() {
-       if hcl.hcr != nil {
-               hcl.hcr.Close()
-       }
        hcl.restAddr = ""
-       hcl.hcr = nil
+       hcl.hrpcAddr = ""
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/common/log.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/log.go 
b/htrace-htraced/go/src/org/apache/htrace/common/log.go
index 2e3e267..4066094 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/log.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/log.go
@@ -22,6 +22,7 @@ package common
 import (
        "errors"
        "fmt"
+       "log"
        "org/apache/htrace/conf"
        "os"
        "path/filepath"
@@ -294,3 +295,34 @@ func (lg *Logger) Close() {
        lg.sink.Unref()
        lg.sink = nil
 }
+
+// Wraps an htrace logger in a golang standard logger.
+//
+// This is a bit messy because of the difference in interfaces between the
+// golang standard logger and the htrace logger.  The golang standard logger
+// doesn't support log levels directly, so you must choose up front what htrace
+// log level all messages should be treated as.  Golang standard loggers expect
+// to be able to write to an io.Writer, but make no guarantees about whether
+// they will break messages into multiple Write() calls (although this does
+// not seem to be a major problem in practice.)
+//
+// Despite these limitations, it's still useful to have this method to be able
+// to log things that come out of the go HTTP server and other standard library
+// systems.
+type WrappedLogger struct {
+       lg *Logger
+       level Level
+}
+
+func (lg *Logger) Wrap(prefix string, level Level) *log.Logger {
+       wlg := &WrappedLogger {
+               lg: lg,
+               level: level,
+       }
+       return log.New(wlg, prefix, 0)
+}
+
+func (wlg *WrappedLogger) Write(p []byte) (int, error) {
+       wlg.lg.Write(wlg.level, string(p))
+       return len(p), nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go 
b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index f071e37..74008bc 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -124,14 +124,28 @@ type ServerStats struct {
 
        // The total number of spans which have been reaped.
        ReapedSpans uint64
+
+       // The total number of spans which have been ingested since the server 
started, by WriteSpans
+       // requests.  This number counts spans that didn't get written to 
persistent storage as well as
+       // those that did.
+       IngestedSpans uint64
+
+       // The total number of spans which have been dropped by clients since 
the server started,
+       // as reported by WriteSpans requests.
+       ClientDroppedSpans uint64
+
+       // The maximum latency of a writeSpans request, in milliseconds.
+       MaxWriteSpansLatencyMs uint32
+
+       // The average latency of a writeSpans request, in milliseconds.
+       AverageWriteSpansLatencyMs uint32
 }
 
 type StorageDirectoryStats struct {
        Path string
 
-       // The approximate number of spans present in this shard.  This may be 
an
-       // underestimate.
-       ApproxNumSpans uint64
+       // The approximate number of bytes on disk present in this shard.
+       ApproximateBytes uint64
 
        // leveldb.stats information
        LevelDbStats string

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index d0296c3..c676088 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -643,10 +643,6 @@ func writeDataStoreVersion(store *dataStore, ldb 
*levigo.DB, v uint32) error {
        return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
 }
 
-func (store *dataStore) GetSpanMetrics() common.SpanMetricsMap {
-       return store.msink.AccessTotals()
-}
-
 // Close the DataStore.
 func (store *dataStore) Close() {
        if store.hb != nil {
@@ -1241,21 +1237,32 @@ func (store *dataStore) ServerStats() 
*common.ServerStats {
                shard := store.shards[shardIdx]
                serverStats.Dirs[shardIdx].Path = shard.path
                r := levigo.Range{
-                       Start: append([]byte{SPAN_ID_INDEX_PREFIX},
-                               common.INVALID_SPAN_ID.Val()...),
-                       Limit: append([]byte{SPAN_ID_INDEX_PREFIX + 1},
-                               common.INVALID_SPAN_ID.Val()...),
+                       Start: []byte{0},
+                       Limit: []byte{0xff},
                }
                vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
-               serverStats.Dirs[shardIdx].ApproxNumSpans = vals[0]
+               serverStats.Dirs[shardIdx].ApproximateBytes = vals[0]
                serverStats.Dirs[shardIdx].LevelDbStats =
                        shard.ldb.PropertyValue("leveldb.stats")
-               store.lg.Infof("levedb.stats for %s: %s\n",
+               store.msink.lg.Debugf("levedb.stats for %s: %s\n",
                        shard.path, shard.ldb.PropertyValue("leveldb.stats"))
        }
-       serverStats.HostSpanMetrics = store.msink.AccessTotals()
        serverStats.LastStartMs = store.startMs
        serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
        serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
+       wsData := store.msink.wsm.GetData()
+       serverStats.HostSpanMetrics = store.msink.AccessServerTotals()
+       for k, v := range wsData.clientDroppedMap {
+               smtx := serverStats.HostSpanMetrics[k]
+               if smtx == nil {
+                       smtx = &common.SpanMetrics {}
+                       serverStats.HostSpanMetrics[k] = smtx
+               }
+               smtx.ClientDropped = v
+       }
+       serverStats.IngestedSpans = wsData.ingestedSpans
+       serverStats.ClientDroppedSpans = wsData.clientDroppedSpans
+       serverStats.MaxWriteSpansLatencyMs = wsData.latencyMax
+       serverStats.AverageWriteSpansLatencyMs = wsData.latencyAverage
        return &serverStats
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index 49587bb..0d72602 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -33,6 +33,7 @@ import (
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "reflect"
+       "time"
 )
 
 // Handles HRPC calls
@@ -195,9 +196,15 @@ func (cdc *HrpcServerCodec) Close() error {
 }
 
 func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
-       resp *common.WriteSpansResp) (err error) {
+               resp *common.WriteSpansResp) (err error) {
+       startTime := time.Now()
        hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s).  "+
                "defaultTrid = %s\n", len(req.Spans), req.DefaultTrid)
+       client, _, err := net.SplitHostPort(req.Addr)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Failed to split host and port " +
+                       "for %s: %s\n", req.Addr, err.Error()))
+       }
        for i := range req.Spans {
                span := req.Spans[i]
                spanIdProblem := span.Id.FindProblem()
@@ -211,10 +218,13 @@ func (hand *HrpcHandler) WriteSpans(req 
*common.WriteSpansReq,
                        hand.lg.Tracef("writing span %d: %s\n", i, 
span.ToJson())
                }
                hand.store.WriteSpan(&IncomingSpan{
-                       Addr: req.Addr,
+                       Addr: client,
                        Span: span,
                })
        }
+       endTime := time.Now()
+       hand.store.msink.Update(client, req.ClientDropped, len(req.Spans),
+                       endTime.Sub(startTime))
        return nil
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
index b482aa3..97b72ca 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
@@ -23,10 +23,12 @@ import (
        "bufio"
        "encoding/json"
        "fmt"
+       "github.com/jmhodges/levigo"
        "net"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "os"
+       "runtime"
        "strings"
        "time"
 )
@@ -84,12 +86,15 @@ func main() {
        // configuration.
        lg := common.NewLogger("main", cnf)
        defer lg.Close()
-       lg.Infof("*** Starting htraced ***\n")
+       lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, 
GIT_VERSION)
        scanner := bufio.NewScanner(cnfLog)
        for scanner.Scan() {
                lg.Infof(scanner.Text() + "\n")
        }
        common.InstallSignalHandlers(cnf)
+       lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0))
+       lg.Infof("leveldb version=%d.%d\n",
+               levigo.GetLevelDBMajorVersion(), 
levigo.GetLevelDBMinorVersion())
 
        // Initialize the datastore.
        store, err := CreateDataStore(cnf, nil)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
index 672f5f6..cfff418 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
@@ -21,9 +21,11 @@ package main
 
 import (
        "encoding/json"
+       "math"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "sync"
+       "time"
 )
 
 //
@@ -36,6 +38,8 @@ import (
 // them so that we can adjust the sampling rate there.
 //
 
+const LATENCY_CIRC_BUF_SIZE = 4096
+
 type ServerSpanMetrics struct {
        // The total number of spans written to HTraced.
        Written uint64
@@ -131,12 +135,8 @@ type MetricsSink struct {
        // The maximum number of metrics totals we will maintain.
        maxMtx int
 
-       // The number of spans which each client has self-reported that it has
-       // dropped.
-       clientDroppedMap map[string]uint64
-
-       // Lock protecting clientDropped
-       clientDroppedLock sync.Mutex
+       // Metrics about WriteSpans requests
+       wsm WriteSpanMetrics
 }
 
 func NewMetricsSink(cnf *conf.Config) *MetricsSink {
@@ -147,7 +147,10 @@ func NewMetricsSink(cnf *conf.Config) *MetricsSink {
                exited:           make(chan interface{}),
                lg:               common.NewLogger("metrics", cnf),
                maxMtx:           
cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
-               clientDroppedMap: make(map[string]uint64),
+               wsm: WriteSpanMetrics {
+                       clientDroppedMap: make(map[string]uint64),
+                       latencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
+               },
        }
        go mcl.run()
        return &mcl
@@ -187,21 +190,16 @@ func (msink *MetricsSink) run() {
 
 func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) {
        msink.lg.Debug("MetricsSink: accessing global metrics.\n")
-       msink.clientDroppedLock.Lock()
-       defer func() {
-               msink.clientDroppedLock.Unlock()
-               close(accessReq.done)
-       }()
+       defer close(accessReq.done)
        for addr, smtx := range msink.smtxMap {
                accessReq.mtxMap[addr] = &common.SpanMetrics{
                        Written:       smtx.Written,
                        ServerDropped: smtx.ServerDropped,
-                       ClientDropped: msink.clientDroppedMap[addr],
                }
        }
 }
 
-func (msink *MetricsSink) AccessTotals() common.SpanMetricsMap {
+func (msink *MetricsSink) AccessServerTotals() common.SpanMetricsMap {
        accessReq := &AccessReq{
                mtxMap: make(common.SpanMetricsMap),
                done:   make(chan interface{}),
@@ -220,15 +218,123 @@ func (msink *MetricsSink) Shutdown() {
        <-msink.exited
 }
 
-func (msink *MetricsSink) UpdateClientDropped(client string, clientDropped 
uint64) {
-       msink.clientDroppedLock.Lock()
-       defer msink.clientDroppedLock.Unlock()
-       msink.clientDroppedMap[client] = clientDropped
-       if len(msink.clientDroppedMap) >= msink.maxMtx {
+type WriteSpanMetrics struct {
+       // Lock protecting WriteSpanMetrics
+       lock sync.Mutex
+
+       // The number of spans which each client has self-reported that it has
+       // dropped.
+       clientDroppedMap map[string]uint64
+
+       // The total number of new span writes we've gotten since startup.
+       ingestedSpans uint64
+
+       // The total number of spans all clients have dropped since startup.
+       clientDroppedSpans uint64
+
+       // The last few writeSpan latencies
+       latencyCircBuf *CircBufU32
+}
+
+type WriteSpanMetricsData struct {
+       clientDroppedMap map[string]uint64
+       ingestedSpans uint64
+       clientDroppedSpans uint64
+       latencyMax uint32
+       latencyAverage uint32
+}
+
+func (msink *MetricsSink) Update(client string, clientDropped uint64, 
clientWritten int,
+               wsLatency time.Duration) {
+       wsLatencyNs := wsLatency.Nanoseconds() / 1000000
+       var wsLatency32 uint32
+       if wsLatencyNs > math.MaxUint32 {
+               wsLatency32 = math.MaxUint32
+       } else {
+               wsLatency32 = uint32(wsLatencyNs)
+       }
+       msink.wsm.update(msink.maxMtx, client, clientDropped, clientWritten, 
wsLatency32)
+}
+
+func (wsm *WriteSpanMetrics) update(maxMtx int, client string, clientDropped 
uint64,
+               clientWritten int, wsLatency uint32) {
+       wsm.lock.Lock()
+       defer wsm.lock.Unlock()
+       wsm.clientDroppedMap[client] = clientDropped
+       if len(wsm.clientDroppedMap) >= maxMtx {
                // Delete a random entry
-               for k := range msink.clientDroppedMap {
-                       delete(msink.clientDroppedMap, k)
+               for k := range wsm.clientDroppedMap {
+                       delete(wsm.clientDroppedMap, k)
                        return
                }
        }
+       wsm.ingestedSpans += uint64(clientWritten)
+       wsm.clientDroppedSpans += uint64(clientDropped)
+       wsm.latencyCircBuf.Append(wsLatency)
+}
+
+func (wsm *WriteSpanMetrics) GetData() *WriteSpanMetricsData {
+       wsm.lock.Lock()
+       defer wsm.lock.Unlock()
+       clientDroppedMap := make(map[string]uint64)
+       for k, v := range wsm.clientDroppedMap {
+               clientDroppedMap[k] = v
+       }
+       return &WriteSpanMetricsData {
+               clientDroppedMap: clientDroppedMap,
+               ingestedSpans: wsm.ingestedSpans,
+               clientDroppedSpans: wsm.clientDroppedSpans,
+               latencyMax: wsm.latencyCircBuf.Max(),
+               latencyAverage: wsm.latencyCircBuf.Average(),
+       }
+}
+
+// A circular buffer of uint32s which supports appending and taking the
+// average, and some other things.
+type CircBufU32 struct {
+       // The next slot to fill
+       slot int
+
+       // The number of slots which are in use.  This number only ever
+       // increases until the buffer is full.
+       slotsUsed int
+
+       // The buffer
+       buf []uint32
+}
+
+func NewCircBufU32(size int) *CircBufU32 {
+       return &CircBufU32 {
+               slotsUsed: -1,
+               buf: make([]uint32, size),
+       }
+}
+
+func (cbuf *CircBufU32) Max() uint32 {
+       var max uint32
+       for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+               if cbuf.buf[bufIdx] > max {
+                       max = cbuf.buf[bufIdx]
+               }
+       }
+       return max
+}
+
+func (cbuf *CircBufU32) Average() uint32 {
+       var total uint64
+       for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+               total += uint64(cbuf.buf[bufIdx])
+       }
+       return uint32(total / uint64(cbuf.slotsUsed))
+}
+
+func (cbuf *CircBufU32) Append(val uint32) {
+       cbuf.buf[cbuf.slot] = val
+       cbuf.slot++
+       if cbuf.slotsUsed < cbuf.slot {
+               cbuf.slotsUsed = cbuf.slot
+       }
+       if cbuf.slot >= len(cbuf.buf) {
+               cbuf.slot = 0
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index c90d1da..48c20f0 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -20,6 +20,7 @@
 package main
 
 import (
+       htrace "org/apache/htrace/client"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "reflect"
@@ -81,7 +82,7 @@ func compareTotals(a, b common.SpanMetricsMap) bool {
 func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) {
        for {
                time.Sleep(1 * time.Millisecond)
-               totals := msink.AccessTotals()
+               totals := msink.AccessServerTotals()
                if compareTotals(totals, expectedTotals) {
                        return
                }
@@ -98,7 +99,7 @@ func TestMetricsSinkMessages(t *testing.T) {
                t.Fatalf("failed to create conf: %s", err.Error())
        }
        msink := NewMetricsSink(cnf)
-       totals := msink.AccessTotals()
+       totals := msink.AccessServerTotals()
        if len(totals) != 0 {
                t.Fatalf("Expected no data in the MetricsSink to start with.")
        }
@@ -178,10 +179,93 @@ func TestMetricsSinkMessagesEviction(t *testing.T) {
                },
        })
        for {
-               totals := msink.AccessTotals()
+               totals := msink.AccessServerTotals()
                if len(totals) == 2 {
                        break
                }
        }
        msink.Shutdown()
 }
+
+func TestIngestedSpansMetricsRest(t *testing.T) {
+       testIngestedSpansMetricsImpl(t, false)
+}
+
+func TestIngestedSpansMetricsPacked(t *testing.T) {
+       testIngestedSpansMetricsImpl(t, true)
+}
+
+func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics",
+               DataDirs: make([]string, 2),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf())
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       if !usePacked {
+               hcl.DisableHrpc()
+       }
+
+       NUM_TEST_SPANS := 12
+       allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+       err = hcl.WriteSpans(&common.WriteSpansReq{
+               Spans: allSpans,
+       })
+       if err != nil {
+               t.Fatalf("WriteSpans failed: %s\n", err.Error())
+       }
+       for {
+               var stats *common.ServerStats
+               stats, err = hcl.GetServerStats()
+               if err != nil {
+                       t.Fatalf("GetServerStats failed: %s\n", err.Error())
+               }
+               if stats.IngestedSpans == uint64(NUM_TEST_SPANS) {
+                       break
+               }
+               time.Sleep(1 * time.Millisecond)
+       }
+}
+
+func TestCircBuf32(t *testing.T) {
+       cbuf := NewCircBufU32(3)
+       // We arbitrarily define that empty circular buffers have an average of 
0.
+       if cbuf.Average() != 0 {
+               t.Fatalf("expected empty CircBufU32 to have an average of 0.\n")
+       }
+       if cbuf.Max() != 0 {
+               t.Fatalf("expected empty CircBufU32 to have a max of 0.\n")
+       }
+       cbuf.Append(2)
+       if cbuf.Average() != 2 {
+               t.Fatalf("expected one-element CircBufU32 to have an average of 
2.\n")
+       }
+       cbuf.Append(10)
+       if cbuf.Average() != 6 {
+               t.Fatalf("expected two-element CircBufU32 to have an average of 
6.\n")
+       }
+       cbuf.Append(12)
+       if cbuf.Average() != 8 {
+               t.Fatalf("expected three-element CircBufU32 to have an average 
of 8.\n")
+       }
+       cbuf.Append(14)
+       // The 14 overwrites the original 2 element.
+       if cbuf.Average() != 12 {
+               t.Fatalf("expected three-element CircBufU32 to have an average 
of 12.\n")
+       }
+       cbuf.Append(1)
+       // The 1 overwrites the original 10 element.
+       if cbuf.Average() != 9 {
+               t.Fatalf("expected three-element CircBufU32 to have an average 
of 12.\n")
+       }
+       if cbuf.Max() != 14 {
+               t.Fatalf("expected three-element CircBufU32 to have a max of 
14.\n")
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index 80df676..a50799a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -165,6 +165,13 @@ func (ht *MiniHTraced) ClientConf() *conf.Config {
                conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
 }
 
+// Return a Config object that clients can use to connect to this MiniHTraceD
+// by HTTP only (no HRPC).
+func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
+       return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+               conf.HTRACE_HRPC_ADDRESS, "")
+}
+
 func (ht *MiniHTraced) Close() {
        ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
        ht.Rsv.Close()

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index a41e1c7..9b78d15 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -34,6 +34,7 @@ import (
        "path/filepath"
        "strconv"
        "strings"
+       "time"
 )
 
 // Set the response headers.
@@ -198,7 +199,15 @@ type writeSpansHandler struct {
 }
 
 func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
+       startTime := time.Now()
        setResponseHeaders(w.Header())
+       client, _, serr := net.SplitHostPort(req.RemoteAddr)
+       if serr != nil {
+               writeError(hand.lg, w, http.StatusBadRequest,
+                       fmt.Sprintf("Failed to split host and port for %s: 
%s\n",
+                               req.RemoteAddr, serr.Error()))
+               return
+       }
        var dec *json.Decoder
        if hand.lg.TraceEnabled() {
                b, err := ioutil.ReadAll(req.Body)
@@ -234,12 +243,14 @@ func (hand *writeSpansHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Reques
                        hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", 
spanIdProblem))
                } else {
                        hand.store.WriteSpan(&IncomingSpan{
-                               Addr: req.RemoteAddr,
+                               Addr: client,
                                Span: span,
                        })
                }
        }
-       hand.store.msink.UpdateClientDropped(req.RemoteAddr, msg.ClientDropped)
+       endTime := time.Now()
+       hand.store.msink.Update(client, msg.ClientDropped, len(msg.Spans),
+                       endTime.Sub(startTime))
 }
 
 type queryHandler struct {
@@ -291,6 +302,7 @@ func (hand *logErrorHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Request)
 }
 
 type RestServer struct {
+       http.Server
        listener net.Listener
        lg       *common.Logger
 }
@@ -337,14 +349,16 @@ func CreateRestServer(cnf *conf.Config, store *dataStore,
                }
        }
 
-       rsv.lg.Infof(`Serving static files from "%s"\n`, webdir)
+       rsv.lg.Infof(`Serving static files from "%s"` + "\n", webdir)
        
r.PathPrefix("/").Handler(http.FileServer(http.Dir(webdir))).Methods("GET")
 
        // Log an error message for unknown non-GET requests.
        r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg})
 
        rsv.listener = listener
-       go http.Serve(rsv.listener, r)
+       rsv.Handler = r
+       rsv.ErrorLog = rsv.lg.Wrap("[REST] ", common.INFO)
+       go rsv.Serve(rsv.listener)
        rsv.lg.Infof("Started REST server on %s\n", 
rsv.listener.Addr().String())
        return rsv, nil
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go 
b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 04dc269..88071c7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -212,13 +212,19 @@ func printServerStats(hcl *htrace.Client) int {
        fmt.Fprintf(w, "Server Time\t%s\n",
                common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
        fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
+       fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans)
+       fmt.Fprintf(w, "Spans dropped by clients\t%d\n", 
stats.ClientDroppedSpans)
+       dur := time.Millisecond * 
time.Duration(stats.AverageWriteSpansLatencyMs)
+       fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String())
+       dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs)
+       fmt.Fprintf(w, "Maximum WriteSpan Latency\t%s\n", dur.String())
        fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs))
        w.Flush()
        fmt.Println("")
        for i := range stats.Dirs {
                dir := stats.Dirs[i]
                fmt.Printf("==== %s ===\n", dir.Path)
-               fmt.Printf("Approximate number of spans: %d\n", 
dir.ApproxNumSpans)
+               fmt.Printf("Approximate number of bytes: %d\n", 
dir.ApproximateBytes)
                stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
                fmt.Printf("%s\n", stats)
        }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/app/server_info_view.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_info_view.js 
b/htrace-webapp/src/main/webapp/app/server_info_view.js
index f3473d3..4255995 100644
--- a/htrace-webapp/src/main/webapp/app/server_info_view.js
+++ b/htrace-webapp/src/main/webapp/app/server_info_view.js
@@ -22,6 +22,7 @@ var htrace = htrace || {};
 htrace.ServerInfoView = Backbone.View.extend({
   events: {
     "click .serverConfigurationButton": "showServerConfigurationModal",
+    "click .storageDirectoryStatsButton": "showStorageDirectoryStatsModal",
   },
 
   render: function() {
@@ -110,5 +111,18 @@ htrace.ServerInfoView = Backbone.View.extend({
             {title: "HTraced Server Configuration", body: out}));
       }
     })
+  },
+
+  showStorageDirectoryStatsModal: function() {
+    var dirs = this.model.stats.get("Dirs");
+    var out = "";
+    for (var dirIdx = 0; dirIdx < dirs.length; dirIdx++) {
+      var dir = dirs[dirIdx];
+      out += "<h3>" + dir.Path + "</h3>";
+      out += "Approximate size in bytes: " + dir.ApproximateBytes + "<br/>";
+      out += "<pre>" + dir.LevelDbStats + "</pre></pre><br/><p/>";
+    }
+    htrace.showModal(_.template($("#modal-table-template").html())(
+          {title: "HTraced Storage Directory Statistics", body: out}));
   }
 });

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/app/server_stats.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_stats.js 
b/htrace-webapp/src/main/webapp/app/server_stats.js
index e4289ef..783041c 100644
--- a/htrace-webapp/src/main/webapp/app/server_stats.js
+++ b/htrace-webapp/src/main/webapp/app/server_stats.js
@@ -20,7 +20,10 @@
 // htraced server statistics.  See rest.go.
 htrace.ServerStats = Backbone.Model.extend({
   defaults: {
-    "ReapedSpans": "(unknown)",
+    "LastStartMs": "0",
+    "CurMs": "0",
+    "IngestedSpans": "(unknown)",
+    "ReapedSpans": "(unknown)"
   },
 
   url: function() {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/index.html 
b/htrace-webapp/src/main/webapp/index.html
index 2cebefe..a59282a 100644
--- a/htrace-webapp/src/main/webapp/index.html
+++ b/htrace-webapp/src/main/webapp/index.html
@@ -79,6 +79,22 @@
               <td>Spans Reaped</td>
               <td><%= model.stats.get("ReapedSpans") %></td>
             </tr>
+            <tr>
+              <td>Spans Ingested</td>
+              <td><%= model.stats.get("IngestedSpans") %></td>
+            </tr>
+            <tr>
+              <td>Client Dropped Spans</td>
+              <td><%= model.stats.get("ClientDroppedSpans") %></td>
+            </tr>
+            <tr>
+              <td>Maximum WriteSpans Latency (ms)</td>
+              <td><%= model.stats.get("MaxWriteSpansLatencyMs") %></td>
+            </tr>
+            <tr>
+              <td>Average WriteSpans Latency (ms)</td>
+              <td><%= model.stats.get("AverageWriteSpansLatencyMs") %></td>
+            </tr>
             </tr>
               <td>Datastore Start Time</td>
               <td><%= htrace.dateToString(model.stats.get("LastStartMs")) 
%></td>
@@ -93,6 +109,9 @@
             <%= view.getServerStatsTableHtml() %>
           </div>
           <button type="button" class="btn btn-info 
serverConfigurationButton">Server Configuration</button>
+          <button type="button" class="btn btn-success 
storageDirectoryStatsButton">Storage Directory Stats</button>
+          <br/>
+          <p/>
         </div>
         <div class="col-md-1">
         </div>

Reply via email to