Repository: incubator-htrace Updated Branches: refs/heads/master ef46897ff -> 021e49144
HTRACE-294. htraced: fix some metrics issues (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/021e4914 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/021e4914 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/021e4914 Branch: refs/heads/master Commit: 021e491446716a57eb7f37f9328fe72d103b9823 Parents: ef46897 Author: Colin P. Mccabe <[email protected]> Authored: Thu Nov 12 12:39:36 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Thu Nov 12 12:39:36 2015 -0800 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/client/client.go | 11 +- .../go/src/org/apache/htrace/common/log.go | 32 ++++ .../go/src/org/apache/htrace/common/rpc.go | 20 ++- .../src/org/apache/htrace/htraced/datastore.go | 29 ++-- .../go/src/org/apache/htrace/htraced/hrpc.go | 14 +- .../go/src/org/apache/htrace/htraced/htraced.go | 7 +- .../go/src/org/apache/htrace/htraced/metrics.go | 148 ++++++++++++++++--- .../org/apache/htrace/htraced/metrics_test.go | 90 ++++++++++- .../org/apache/htrace/htraced/mini_htraced.go | 7 + .../go/src/org/apache/htrace/htraced/rest.go | 22 ++- .../go/src/org/apache/htrace/htracedTool/cmd.go | 8 +- .../src/main/webapp/app/server_info_view.js | 14 ++ .../src/main/webapp/app/server_stats.js | 5 +- htrace-webapp/src/main/webapp/index.html | 19 +++ 14 files changed, 373 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go index fb46e62..28b9e29 100644 --- a/htrace-htraced/go/src/org/apache/htrace/client/client.go +++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go @@ -47,9 +47,11 @@ type Client struct { // HRPC address of the htraced server. hrpcAddr string +} - // The HRPC client, or null if it is not enabled. - hcr *hClient +// Disable HRPC +func (hcl *Client) DisableHrpc() { + hcl.hrpcAddr = "" } // Get the htraced server version information. @@ -243,9 +245,6 @@ func (hcl *Client) DumpAll(lim int, out chan *common.Span) error { } func (hcl *Client) Close() { - if hcl.hcr != nil { - hcl.hcr.Close() - } hcl.restAddr = "" - hcl.hcr = nil + hcl.hrpcAddr = "" } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/common/log.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/log.go b/htrace-htraced/go/src/org/apache/htrace/common/log.go index 2e3e267..4066094 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/log.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/log.go @@ -22,6 +22,7 @@ package common import ( "errors" "fmt" + "log" "org/apache/htrace/conf" "os" "path/filepath" @@ -294,3 +295,34 @@ func (lg *Logger) Close() { lg.sink.Unref() lg.sink = nil } + +// Wraps an htrace logger in a golang standard logger. +// +// This is a bit messy because of the difference in interfaces between the +// golang standard logger and the htrace logger. The golang standard logger +// doesn't support log levels directly, so you must choose up front what htrace +// log level all messages should be treated as. Golang standard loggers expect +// to be able to write to an io.Writer, but make no guarantees about whether +// they will break messages into multiple Write() calls (although this does +// not seem to be a major problem in practice.) +// +// Despite these limitations, it's still useful to have this method to be able +// to log things that come out of the go HTTP server and other standard library +// systems. +type WrappedLogger struct { + lg *Logger + level Level +} + +func (lg *Logger) Wrap(prefix string, level Level) *log.Logger { + wlg := &WrappedLogger { + lg: lg, + level: level, + } + return log.New(wlg, prefix, 0) +} + +func (wlg *WrappedLogger) Write(p []byte) (int, error) { + wlg.lg.Write(wlg.level, string(p)) + return len(p), nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/common/rpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go index f071e37..74008bc 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go @@ -124,14 +124,28 @@ type ServerStats struct { // The total number of spans which have been reaped. ReapedSpans uint64 + + // The total number of spans which have been ingested since the server started, by WriteSpans + // requests. This number counts spans that didn't get written to persistent storage as well as + // those that did. + IngestedSpans uint64 + + // The total number of spans which have been dropped by clients since the server started, + // as reported by WriteSpans requests. + ClientDroppedSpans uint64 + + // The maximum latency of a writeSpans request, in milliseconds. + MaxWriteSpansLatencyMs uint32 + + // The average latency of a writeSpans request, in milliseconds. + AverageWriteSpansLatencyMs uint32 } type StorageDirectoryStats struct { Path string - // The approximate number of spans present in this shard. This may be an - // underestimate. - ApproxNumSpans uint64 + // The approximate number of bytes on disk present in this shard. + ApproximateBytes uint64 // leveldb.stats information LevelDbStats string http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index d0296c3..c676088 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -643,10 +643,6 @@ func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error { return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes()) } -func (store *dataStore) GetSpanMetrics() common.SpanMetricsMap { - return store.msink.AccessTotals() -} - // Close the DataStore. func (store *dataStore) Close() { if store.hb != nil { @@ -1241,21 +1237,32 @@ func (store *dataStore) ServerStats() *common.ServerStats { shard := store.shards[shardIdx] serverStats.Dirs[shardIdx].Path = shard.path r := levigo.Range{ - Start: append([]byte{SPAN_ID_INDEX_PREFIX}, - common.INVALID_SPAN_ID.Val()...), - Limit: append([]byte{SPAN_ID_INDEX_PREFIX + 1}, - common.INVALID_SPAN_ID.Val()...), + Start: []byte{0}, + Limit: []byte{0xff}, } vals := shard.ldb.GetApproximateSizes([]levigo.Range{r}) - serverStats.Dirs[shardIdx].ApproxNumSpans = vals[0] + serverStats.Dirs[shardIdx].ApproximateBytes = vals[0] serverStats.Dirs[shardIdx].LevelDbStats = shard.ldb.PropertyValue("leveldb.stats") - store.lg.Infof("levedb.stats for %s: %s\n", + store.msink.lg.Debugf("levedb.stats for %s: %s\n", shard.path, shard.ldb.PropertyValue("leveldb.stats")) } - serverStats.HostSpanMetrics = store.msink.AccessTotals() serverStats.LastStartMs = store.startMs serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC()) serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans) + wsData := store.msink.wsm.GetData() + serverStats.HostSpanMetrics = store.msink.AccessServerTotals() + for k, v := range wsData.clientDroppedMap { + smtx := serverStats.HostSpanMetrics[k] + if smtx == nil { + smtx = &common.SpanMetrics {} + serverStats.HostSpanMetrics[k] = smtx + } + smtx.ClientDropped = v + } + serverStats.IngestedSpans = wsData.ingestedSpans + serverStats.ClientDroppedSpans = wsData.clientDroppedSpans + serverStats.MaxWriteSpansLatencyMs = wsData.latencyMax + serverStats.AverageWriteSpansLatencyMs = wsData.latencyAverage return &serverStats } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go index 49587bb..0d72602 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go @@ -33,6 +33,7 @@ import ( "org/apache/htrace/common" "org/apache/htrace/conf" "reflect" + "time" ) // Handles HRPC calls @@ -195,9 +196,15 @@ func (cdc *HrpcServerCodec) Close() error { } func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, - resp *common.WriteSpansResp) (err error) { + resp *common.WriteSpansResp) (err error) { + startTime := time.Now() hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s). "+ "defaultTrid = %s\n", len(req.Spans), req.DefaultTrid) + client, _, err := net.SplitHostPort(req.Addr) + if err != nil { + return errors.New(fmt.Sprintf("Failed to split host and port " + + "for %s: %s\n", req.Addr, err.Error())) + } for i := range req.Spans { span := req.Spans[i] spanIdProblem := span.Id.FindProblem() @@ -211,10 +218,13 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson()) } hand.store.WriteSpan(&IncomingSpan{ - Addr: req.Addr, + Addr: client, Span: span, }) } + endTime := time.Now() + hand.store.msink.Update(client, req.ClientDropped, len(req.Spans), + endTime.Sub(startTime)) return nil } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go index b482aa3..97b72ca 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go @@ -23,10 +23,12 @@ import ( "bufio" "encoding/json" "fmt" + "github.com/jmhodges/levigo" "net" "org/apache/htrace/common" "org/apache/htrace/conf" "os" + "runtime" "strings" "time" ) @@ -84,12 +86,15 @@ func main() { // configuration. lg := common.NewLogger("main", cnf) defer lg.Close() - lg.Infof("*** Starting htraced ***\n") + lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, GIT_VERSION) scanner := bufio.NewScanner(cnfLog) for scanner.Scan() { lg.Infof(scanner.Text() + "\n") } common.InstallSignalHandlers(cnf) + lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0)) + lg.Infof("leveldb version=%d.%d\n", + levigo.GetLevelDBMajorVersion(), levigo.GetLevelDBMinorVersion()) // Initialize the datastore. store, err := CreateDataStore(cnf, nil) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go index 672f5f6..cfff418 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go @@ -21,9 +21,11 @@ package main import ( "encoding/json" + "math" "org/apache/htrace/common" "org/apache/htrace/conf" "sync" + "time" ) // @@ -36,6 +38,8 @@ import ( // them so that we can adjust the sampling rate there. // +const LATENCY_CIRC_BUF_SIZE = 4096 + type ServerSpanMetrics struct { // The total number of spans written to HTraced. Written uint64 @@ -131,12 +135,8 @@ type MetricsSink struct { // The maximum number of metrics totals we will maintain. maxMtx int - // The number of spans which each client has self-reported that it has - // dropped. - clientDroppedMap map[string]uint64 - - // Lock protecting clientDropped - clientDroppedLock sync.Mutex + // Metrics about WriteSpans requests + wsm WriteSpanMetrics } func NewMetricsSink(cnf *conf.Config) *MetricsSink { @@ -147,7 +147,10 @@ func NewMetricsSink(cnf *conf.Config) *MetricsSink { exited: make(chan interface{}), lg: common.NewLogger("metrics", cnf), maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES), - clientDroppedMap: make(map[string]uint64), + wsm: WriteSpanMetrics { + clientDroppedMap: make(map[string]uint64), + latencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE), + }, } go mcl.run() return &mcl @@ -187,21 +190,16 @@ func (msink *MetricsSink) run() { func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) { msink.lg.Debug("MetricsSink: accessing global metrics.\n") - msink.clientDroppedLock.Lock() - defer func() { - msink.clientDroppedLock.Unlock() - close(accessReq.done) - }() + defer close(accessReq.done) for addr, smtx := range msink.smtxMap { accessReq.mtxMap[addr] = &common.SpanMetrics{ Written: smtx.Written, ServerDropped: smtx.ServerDropped, - ClientDropped: msink.clientDroppedMap[addr], } } } -func (msink *MetricsSink) AccessTotals() common.SpanMetricsMap { +func (msink *MetricsSink) AccessServerTotals() common.SpanMetricsMap { accessReq := &AccessReq{ mtxMap: make(common.SpanMetricsMap), done: make(chan interface{}), @@ -220,15 +218,123 @@ func (msink *MetricsSink) Shutdown() { <-msink.exited } -func (msink *MetricsSink) UpdateClientDropped(client string, clientDropped uint64) { - msink.clientDroppedLock.Lock() - defer msink.clientDroppedLock.Unlock() - msink.clientDroppedMap[client] = clientDropped - if len(msink.clientDroppedMap) >= msink.maxMtx { +type WriteSpanMetrics struct { + // Lock protecting WriteSpanMetrics + lock sync.Mutex + + // The number of spans which each client has self-reported that it has + // dropped. + clientDroppedMap map[string]uint64 + + // The total number of new span writes we've gotten since startup. + ingestedSpans uint64 + + // The total number of spans all clients have dropped since startup. + clientDroppedSpans uint64 + + // The last few writeSpan latencies + latencyCircBuf *CircBufU32 +} + +type WriteSpanMetricsData struct { + clientDroppedMap map[string]uint64 + ingestedSpans uint64 + clientDroppedSpans uint64 + latencyMax uint32 + latencyAverage uint32 +} + +func (msink *MetricsSink) Update(client string, clientDropped uint64, clientWritten int, + wsLatency time.Duration) { + wsLatencyNs := wsLatency.Nanoseconds() / 1000000 + var wsLatency32 uint32 + if wsLatencyNs > math.MaxUint32 { + wsLatency32 = math.MaxUint32 + } else { + wsLatency32 = uint32(wsLatencyNs) + } + msink.wsm.update(msink.maxMtx, client, clientDropped, clientWritten, wsLatency32) +} + +func (wsm *WriteSpanMetrics) update(maxMtx int, client string, clientDropped uint64, + clientWritten int, wsLatency uint32) { + wsm.lock.Lock() + defer wsm.lock.Unlock() + wsm.clientDroppedMap[client] = clientDropped + if len(wsm.clientDroppedMap) >= maxMtx { // Delete a random entry - for k := range msink.clientDroppedMap { - delete(msink.clientDroppedMap, k) + for k := range wsm.clientDroppedMap { + delete(wsm.clientDroppedMap, k) return } } + wsm.ingestedSpans += uint64(clientWritten) + wsm.clientDroppedSpans += uint64(clientDropped) + wsm.latencyCircBuf.Append(wsLatency) +} + +func (wsm *WriteSpanMetrics) GetData() *WriteSpanMetricsData { + wsm.lock.Lock() + defer wsm.lock.Unlock() + clientDroppedMap := make(map[string]uint64) + for k, v := range wsm.clientDroppedMap { + clientDroppedMap[k] = v + } + return &WriteSpanMetricsData { + clientDroppedMap: clientDroppedMap, + ingestedSpans: wsm.ingestedSpans, + clientDroppedSpans: wsm.clientDroppedSpans, + latencyMax: wsm.latencyCircBuf.Max(), + latencyAverage: wsm.latencyCircBuf.Average(), + } +} + +// A circular buffer of uint32s which supports appending and taking the +// average, and some other things. +type CircBufU32 struct { + // The next slot to fill + slot int + + // The number of slots which are in use. This number only ever + // increases until the buffer is full. + slotsUsed int + + // The buffer + buf []uint32 +} + +func NewCircBufU32(size int) *CircBufU32 { + return &CircBufU32 { + slotsUsed: -1, + buf: make([]uint32, size), + } +} + +func (cbuf *CircBufU32) Max() uint32 { + var max uint32 + for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ { + if cbuf.buf[bufIdx] > max { + max = cbuf.buf[bufIdx] + } + } + return max +} + +func (cbuf *CircBufU32) Average() uint32 { + var total uint64 + for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ { + total += uint64(cbuf.buf[bufIdx]) + } + return uint32(total / uint64(cbuf.slotsUsed)) +} + +func (cbuf *CircBufU32) Append(val uint32) { + cbuf.buf[cbuf.slot] = val + cbuf.slot++ + if cbuf.slotsUsed < cbuf.slot { + cbuf.slotsUsed = cbuf.slot + } + if cbuf.slot >= len(cbuf.buf) { + cbuf.slot = 0 + } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go index c90d1da..48c20f0 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go @@ -20,6 +20,7 @@ package main import ( + htrace "org/apache/htrace/client" "org/apache/htrace/common" "org/apache/htrace/conf" "reflect" @@ -81,7 +82,7 @@ func compareTotals(a, b common.SpanMetricsMap) bool { func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) { for { time.Sleep(1 * time.Millisecond) - totals := msink.AccessTotals() + totals := msink.AccessServerTotals() if compareTotals(totals, expectedTotals) { return } @@ -98,7 +99,7 @@ func TestMetricsSinkMessages(t *testing.T) { t.Fatalf("failed to create conf: %s", err.Error()) } msink := NewMetricsSink(cnf) - totals := msink.AccessTotals() + totals := msink.AccessServerTotals() if len(totals) != 0 { t.Fatalf("Expected no data in the MetricsSink to start with.") } @@ -178,10 +179,93 @@ func TestMetricsSinkMessagesEviction(t *testing.T) { }, }) for { - totals := msink.AccessTotals() + totals := msink.AccessServerTotals() if len(totals) == 2 { break } } msink.Shutdown() } + +func TestIngestedSpansMetricsRest(t *testing.T) { + testIngestedSpansMetricsImpl(t, false) +} + +func TestIngestedSpansMetricsPacked(t *testing.T) { + testIngestedSpansMetricsImpl(t, true) +} + +func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) { + htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics", + DataDirs: make([]string, 2), + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + if !usePacked { + hcl.DisableHrpc() + } + + NUM_TEST_SPANS := 12 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans, + }) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) + } + for { + var stats *common.ServerStats + stats, err = hcl.GetServerStats() + if err != nil { + t.Fatalf("GetServerStats failed: %s\n", err.Error()) + } + if stats.IngestedSpans == uint64(NUM_TEST_SPANS) { + break + } + time.Sleep(1 * time.Millisecond) + } +} + +func TestCircBuf32(t *testing.T) { + cbuf := NewCircBufU32(3) + // We arbitrarily define that empty circular buffers have an average of 0. + if cbuf.Average() != 0 { + t.Fatalf("expected empty CircBufU32 to have an average of 0.\n") + } + if cbuf.Max() != 0 { + t.Fatalf("expected empty CircBufU32 to have a max of 0.\n") + } + cbuf.Append(2) + if cbuf.Average() != 2 { + t.Fatalf("expected one-element CircBufU32 to have an average of 2.\n") + } + cbuf.Append(10) + if cbuf.Average() != 6 { + t.Fatalf("expected two-element CircBufU32 to have an average of 6.\n") + } + cbuf.Append(12) + if cbuf.Average() != 8 { + t.Fatalf("expected three-element CircBufU32 to have an average of 8.\n") + } + cbuf.Append(14) + // The 14 overwrites the original 2 element. + if cbuf.Average() != 12 { + t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n") + } + cbuf.Append(1) + // The 1 overwrites the original 10 element. + if cbuf.Average() != 9 { + t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n") + } + if cbuf.Max() != 14 { + t.Fatalf("expected three-element CircBufU32 to have a max of 14.\n") + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go index 80df676..a50799a 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go @@ -165,6 +165,13 @@ func (ht *MiniHTraced) ClientConf() *conf.Config { conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String()) } +// Return a Config object that clients can use to connect to this MiniHTraceD +// by HTTP only (no HRPC). +func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config { + return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(), + conf.HTRACE_HRPC_ADDRESS, "") +} + func (ht *MiniHTraced) Close() { ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name) ht.Rsv.Close() http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go index a41e1c7..9b78d15 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go @@ -34,6 +34,7 @@ import ( "path/filepath" "strconv" "strings" + "time" ) // Set the response headers. @@ -198,7 +199,15 @@ type writeSpansHandler struct { } func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + startTime := time.Now() setResponseHeaders(w.Header()) + client, _, serr := net.SplitHostPort(req.RemoteAddr) + if serr != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Failed to split host and port for %s: %s\n", + req.RemoteAddr, serr.Error())) + return + } var dec *json.Decoder if hand.lg.TraceEnabled() { b, err := ioutil.ReadAll(req.Body) @@ -234,12 +243,14 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", spanIdProblem)) } else { hand.store.WriteSpan(&IncomingSpan{ - Addr: req.RemoteAddr, + Addr: client, Span: span, }) } } - hand.store.msink.UpdateClientDropped(req.RemoteAddr, msg.ClientDropped) + endTime := time.Now() + hand.store.msink.Update(client, msg.ClientDropped, len(msg.Spans), + endTime.Sub(startTime)) } type queryHandler struct { @@ -291,6 +302,7 @@ func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) } type RestServer struct { + http.Server listener net.Listener lg *common.Logger } @@ -337,14 +349,16 @@ func CreateRestServer(cnf *conf.Config, store *dataStore, } } - rsv.lg.Infof(`Serving static files from "%s"\n`, webdir) + rsv.lg.Infof(`Serving static files from "%s"` + "\n", webdir) r.PathPrefix("/").Handler(http.FileServer(http.Dir(webdir))).Methods("GET") // Log an error message for unknown non-GET requests. r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg}) rsv.listener = listener - go http.Serve(rsv.listener, r) + rsv.Handler = r + rsv.ErrorLog = rsv.lg.Wrap("[REST] ", common.INFO) + go rsv.Serve(rsv.listener) rsv.lg.Infof("Started REST server on %s\n", rsv.listener.Addr().String()) return rsv, nil } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go index 04dc269..88071c7 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go +++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go @@ -212,13 +212,19 @@ func printServerStats(hcl *htrace.Client) int { fmt.Fprintf(w, "Server Time\t%s\n", common.UnixMsToTime(stats.CurMs).Format(time.RFC3339)) fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans) + fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans) + fmt.Fprintf(w, "Spans dropped by clients\t%d\n", stats.ClientDroppedSpans) + dur := time.Millisecond * time.Duration(stats.AverageWriteSpansLatencyMs) + fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String()) + dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs) + fmt.Fprintf(w, "Maximum WriteSpan Latency\t%s\n", dur.String()) fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs)) w.Flush() fmt.Println("") for i := range stats.Dirs { dir := stats.Dirs[i] fmt.Printf("==== %s ===\n", dir.Path) - fmt.Printf("Approximate number of spans: %d\n", dir.ApproxNumSpans) + fmt.Printf("Approximate number of bytes: %d\n", dir.ApproximateBytes) stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1) fmt.Printf("%s\n", stats) } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/app/server_info_view.js ---------------------------------------------------------------------- diff --git a/htrace-webapp/src/main/webapp/app/server_info_view.js b/htrace-webapp/src/main/webapp/app/server_info_view.js index f3473d3..4255995 100644 --- a/htrace-webapp/src/main/webapp/app/server_info_view.js +++ b/htrace-webapp/src/main/webapp/app/server_info_view.js @@ -22,6 +22,7 @@ var htrace = htrace || {}; htrace.ServerInfoView = Backbone.View.extend({ events: { "click .serverConfigurationButton": "showServerConfigurationModal", + "click .storageDirectoryStatsButton": "showStorageDirectoryStatsModal", }, render: function() { @@ -110,5 +111,18 @@ htrace.ServerInfoView = Backbone.View.extend({ {title: "HTraced Server Configuration", body: out})); } }) + }, + + showStorageDirectoryStatsModal: function() { + var dirs = this.model.stats.get("Dirs"); + var out = ""; + for (var dirIdx = 0; dirIdx < dirs.length; dirIdx++) { + var dir = dirs[dirIdx]; + out += "<h3>" + dir.Path + "</h3>"; + out += "Approximate size in bytes: " + dir.ApproximateBytes + "<br/>"; + out += "<pre>" + dir.LevelDbStats + "</pre></pre><br/><p/>"; + } + htrace.showModal(_.template($("#modal-table-template").html())( + {title: "HTraced Storage Directory Statistics", body: out})); } }); http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/app/server_stats.js ---------------------------------------------------------------------- diff --git a/htrace-webapp/src/main/webapp/app/server_stats.js b/htrace-webapp/src/main/webapp/app/server_stats.js index e4289ef..783041c 100644 --- a/htrace-webapp/src/main/webapp/app/server_stats.js +++ b/htrace-webapp/src/main/webapp/app/server_stats.js @@ -20,7 +20,10 @@ // htraced server statistics. See rest.go. htrace.ServerStats = Backbone.Model.extend({ defaults: { - "ReapedSpans": "(unknown)", + "LastStartMs": "0", + "CurMs": "0", + "IngestedSpans": "(unknown)", + "ReapedSpans": "(unknown)" }, url: function() { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/index.html ---------------------------------------------------------------------- diff --git a/htrace-webapp/src/main/webapp/index.html b/htrace-webapp/src/main/webapp/index.html index 2cebefe..a59282a 100644 --- a/htrace-webapp/src/main/webapp/index.html +++ b/htrace-webapp/src/main/webapp/index.html @@ -79,6 +79,22 @@ <td>Spans Reaped</td> <td><%= model.stats.get("ReapedSpans") %></td> </tr> + <tr> + <td>Spans Ingested</td> + <td><%= model.stats.get("IngestedSpans") %></td> + </tr> + <tr> + <td>Client Dropped Spans</td> + <td><%= model.stats.get("ClientDroppedSpans") %></td> + </tr> + <tr> + <td>Maximum WriteSpans Latency (ms)</td> + <td><%= model.stats.get("MaxWriteSpansLatencyMs") %></td> + </tr> + <tr> + <td>Average WriteSpans Latency (ms)</td> + <td><%= model.stats.get("AverageWriteSpansLatencyMs") %></td> + </tr> </tr> <td>Datastore Start Time</td> <td><%= htrace.dateToString(model.stats.get("LastStartMs")) %></td> @@ -93,6 +109,9 @@ <%= view.getServerStatsTableHtml() %> </div> <button type="button" class="btn btn-info serverConfigurationButton">Server Configuration</button> + <button type="button" class="btn btn-success storageDirectoryStatsButton">Storage Directory Stats</button> + <br/> + <p/> </div> <div class="col-md-1"> </div>
