Repository: incubator-htrace Updated Branches: refs/heads/master fe19368a3 -> 699c8cf80
HTRACE-298. htraced: improve datastore serialization and metrics (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/699c8cf8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/699c8cf8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/699c8cf8 Branch: refs/heads/master Commit: 699c8cf80058913a89988552ef2d357690309b6f Parents: fe19368 Author: Colin P. Mccabe <[email protected]> Authored: Sat Nov 21 13:12:20 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Sat Nov 21 13:26:04 2015 -0800 ---------------------------------------------------------------------- htrace-htraced/go/Godeps/Godeps.json | 2 +- .../go/src/org/apache/htrace/common/rpc.go | 31 +- .../src/org/apache/htrace/conf/config_keys.go | 8 +- .../org/apache/htrace/htraced/client_test.go | 98 ++++++ .../src/org/apache/htrace/htraced/datastore.go | 254 +++++++++++---- .../org/apache/htrace/htraced/datastore_test.go | 61 ++-- .../go/src/org/apache/htrace/htraced/hrpc.go | 23 +- .../go/src/org/apache/htrace/htraced/metrics.go | 319 ++++++------------- .../org/apache/htrace/htraced/metrics_test.go | 156 ++------- .../org/apache/htrace/htraced/reaper_test.go | 11 +- .../go/src/org/apache/htrace/htraced/rest.go | 23 +- .../go/src/org/apache/htrace/htracedTool/cmd.go | 9 +- .../src/main/webapp/app/server_info_view.js | 4 +- .../src/main/webapp/app/server_stats.js | 7 +- htrace-webapp/src/main/webapp/index.html | 12 +- 15 files changed, 491 insertions(+), 527 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/Godeps/Godeps.json ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/Godeps/Godeps.json b/htrace-htraced/go/Godeps/Godeps.json index 47aa90e..7c737fe 100644 --- a/htrace-htraced/go/Godeps/Godeps.json +++ b/htrace-htraced/go/Godeps/Godeps.json @@ -24,7 +24,7 @@ }, { "ImportPath": "github.com/ugorji/go/codec", - "Rev": "08bbe4aa39b9f189f4e294b5c8408b5fa5787bb2" + "Rev": "1a8bf87a90ddcdc7deaa0038f127ac62135fdd58" } ] } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/common/rpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go index 2627c26..6375688 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go @@ -38,7 +38,7 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024 // A request to write spans to htraced. type WriteSpansReq struct { - Addr string // This gets filled in by the RPC layer. + Addr string `json:",omitempty"` // This gets filled in by the RPC layer. DefaultTrid string `json:",omitempty"` Spans []*Span ClientDropped uint64 `json:",omitempty"` @@ -98,10 +98,19 @@ type SpanMetrics struct { // The total number of spans dropped by the server. ServerDropped uint64 - // The total number of spans dropped by the client. Note that this number - // is tracked on the client itself and doesn't get updated if the client - // can't contact the server. - ClientDropped uint64 + // The total number of spans dropped by the client. + // + // This number is just an estimate and may be incorrect for many reasons. + // If the client can't contact the server at all, then obviously the server + // will never increment ClientDropped... even though spans are being + // dropped. The client may also tell the server about some new spans it + // has dropped, but then for some reason fail to get the acknowledgement + // from the server. In that case, the client would re-send its client + // dropped estimate and it would be double-counted by the server + // + // The intention here is to provide a rough estimate of how overloaded + // htraced clients are, not to provide strongly consistent numbers. + ClientDroppedEstimate uint64 } // A map from network address strings to SpanMetrics structures. @@ -130,9 +139,15 @@ type ServerStats struct { // those that did. IngestedSpans uint64 - // The total number of spans which have been dropped by clients since the server started, - // as reported by WriteSpans requests. - ClientDroppedSpans uint64 + // The total number of spans which have been written to leveldb since the server started. + WrittenSpans uint64 + + // The total number of spans dropped by the server since the server started. + ServerDroppedSpans uint64 + + // An estimate of the total number of spans dropped by the server since the server started. + // See SpanMetrics#ClientDroppedEstimate + ClientDroppedEstimate uint64 // The maximum latency of a writeSpans request, in milliseconds. MaxWriteSpansLatencyMs uint32 http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go index 511833c..573ce21 100644 --- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go +++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go @@ -68,9 +68,9 @@ const HTRACE_LOG_PATH = "log.path" // The log level to use for the logs in htrace. const HTRACE_LOG_LEVEL = "log.level" -// The period between metrics heartbeats. This is the approximate interval at which we will -// update global metrics. -const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms" +// The period between datastore heartbeats. This is the approximate interval at which we will +// prune expired spans. +const HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS = "datastore.heartbeat.period.ms" // The maximum number of addresses for which we will maintain metrics. const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries" @@ -106,7 +106,7 @@ var DEFAULTS = map[string]string{ HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100", HTRACE_LOG_PATH: "", HTRACE_LOG_LEVEL: "INFO", - HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000), + HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000), HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000", HTRACE_SPAN_EXPIRY_MS: "0", HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000), http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go index fae871c..36e8369 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go @@ -21,6 +21,8 @@ package main import ( "fmt" + "github.com/ugorji/go/codec" + "math" "math/rand" "org/apache/htrace/common" "org/apache/htrace/conf" @@ -391,3 +393,99 @@ func TestHrpcIoTimeout(t *testing.T) { close(finishClient) wg.Wait() } + +func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) { + htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans", + Cnf: map[string]string{ + conf.HTRACE_LOG_LEVEL: "INFO", + }, + WrittenSpans: common.NewSemaphore(int64(1-N)), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + rnd := rand.New(rand.NewSource(1)) + allSpans := make([]*common.Span, N) + for n := 0; n < N; n++ { + allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n]) + } + // Determine how many calls to WriteSpans we should make. Each writeSpans + // message should be small enough so that it doesn't exceed the max RPC + // body length limit. TODO: a production-quality golang client would do + // this internally rather than needing us to do it here in the unit test. + bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5 + reqs := make([]*common.WriteSpansReq, 0, 4) + curReq := -1 + curReqLen := bodyLen + var curReqSpans uint32 + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + var mbuf [8192]byte + buf := mbuf[:0] + enc := codec.NewEncoderBytes(&buf, mh) + for n := 0; n < N; n++ { + span := allSpans[n] + if (curReqSpans >= maxSpansPerRpc) || + (curReqLen >= bodyLen) { + reqs = append(reqs, &common.WriteSpansReq{}) + curReqLen = 0 + curReq++ + curReqSpans = 0 + } + buf = mbuf[:0] + enc.ResetBytes(&buf) + err := enc.Encode(span) + if err != nil { + panic(fmt.Sprintf("Error encoding span %s: %s\n", + span.String(), err.Error())) + } + bufLen := len(buf) + if bufLen > (bodyLen / 5) { + panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen)) + } + curReqLen += bufLen + reqs[curReq].Spans = append(reqs[curReq].Spans, span) + curReqSpans++ + } + ht.Store.lg.Infof("num spans: %d. num WriteSpansReq calls: %d\n", N, len(reqs)) + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + panic(fmt.Sprintf("failed to create client: %s", err.Error())) + } + defer hcl.Close() + + // Reset the timer to avoid including the time required to create new + // random spans in the benchmark total. + if b != nil { + b.ResetTimer() + } + + // Write many random spans. + for reqIdx := range(reqs) { + go func() { + err = hcl.WriteSpans(reqs[reqIdx]) + if err != nil { + panic(fmt.Sprintf("failed to send WriteSpans request %d: %s", + reqIdx, err.Error())) + } + }() + } + // Wait for all the spans to be written. + ht.Store.WrittenSpans.Wait() +} + +// This is a test of how quickly we can create new spans via WriteSpans RPCs. +// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore. +// Unlike that benchmark, it sends the spans via RPC. +// Suggested flags for running this: +// -tags unsafe -cpu 16 -benchtime=1m +func BenchmarkWriteSpans(b *testing.B) { + doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b) +} + +func TestWriteSpansRpcs(t *testing.T) { + doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index 8cd1526..9310e6e 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "github.com/jmhodges/levigo" + "github.com/ugorji/go/codec" "org/apache/htrace/common" "org/apache/htrace/conf" "os" @@ -66,7 +67,7 @@ import ( // const UNKNOWN_LAYOUT_VERSION = 0 -const CURRENT_LAYOUT_VERSION = 2 +const CURRENT_LAYOUT_VERSION = 3 var EMPTY_BYTE_BUF []byte = []byte{} @@ -89,6 +90,9 @@ type IncomingSpan struct { // The span. *common.Span + + // Serialized span data + SpanDataBytes []byte } // A single directory containing a levelDB instance. @@ -103,19 +107,13 @@ type shard struct { path string // Incoming requests to write Spans. - incoming chan *IncomingSpan + incoming chan []*IncomingSpan // A channel for incoming heartbeats heartbeats chan interface{} // Tracks whether the shard goroutine has exited. exited sync.WaitGroup - - // Per-address metrics - mtxMap ServerSpanMetricsMap - - // The maximum number of metrics to allow in our map - maxMtx int } // Process incoming spans for a shard. @@ -127,24 +125,33 @@ func (shd *shard) processIncoming() { }() for { select { - case span := <-shd.incoming: - if span == nil { + case spans := <-shd.incoming: + if spans == nil { return } - err := shd.writeSpan(span) - if err != nil { - lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error()) - } else if lg.TraceEnabled() { - lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson()) + totalWritten := 0 + totalDropped := 0 + for spanIdx := range(spans) { + err := shd.writeSpan(spans[spanIdx]) + if err != nil { + lg.Errorf("Shard processor for %s got fatal error %s.\n", + shd.path, err.Error()) + totalDropped++ + } else { + if lg.TraceEnabled() { + lg.Tracef("Shard processor for %s wrote span %s.\n", + shd.path, spans[spanIdx].ToJson()) + } + totalWritten++ + } + } + shd.store.msink.UpdatePersisted(spans[0].Addr, totalWritten, totalDropped) + if shd.store.WrittenSpans != nil { + lg.Debugf("Shard %s incrementing WrittenSpans by %d\n", shd.path, len(spans)) + shd.store.WrittenSpans.Posts(int64(len(spans))) } case <-shd.heartbeats: lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path) - mtxMap := make(ServerSpanMetricsMap) - for addr, mtx := range shd.mtxMap { - mtxMap[addr] = mtx.Clone() - mtx.Clear() - } - shd.store.msink.UpdateMetrics(mtxMap) shd.pruneExpired() } } @@ -246,21 +253,10 @@ func u64toSlice(val uint64) []byte { func (shd *shard) writeSpan(ispan *IncomingSpan) error { batch := levigo.NewWriteBatch() defer batch.Close() - - // Add SpanData to batch. - spanDataBuf := new(bytes.Buffer) - spanDataEnc := gob.NewEncoder(spanDataBuf) span := ispan.Span - err := spanDataEnc.Encode(span.SpanData) - if err != nil { - shd.store.lg.Errorf("Error encoding span %s: %s\n", - span.String(), err.Error()) - shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg) - return err - } primaryKey := append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...) - batch.Put(primaryKey, spanDataBuf.Bytes()) + batch.Put(primaryKey, ispan.SpanDataBytes) // Add this to the parent index. for parentIdx := range span.Parents { @@ -280,17 +276,12 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error { u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...) batch.Put(durationKey, EMPTY_BYTE_BUF) - err = shd.ldb.Write(shd.store.writeOpts, batch) + err := shd.ldb.Write(shd.store.writeOpts, batch) if err != nil { shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n", span.String(), shd.path, err.Error()) - shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg) return err } - shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg) - if shd.store.WrittenSpans != nil { - shd.store.WrittenSpans.Post() - } return nil } @@ -510,13 +501,11 @@ func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataSto for idx := range store.shards { shd := store.shards[idx] shd.heartbeats = make(chan interface{}, 1) - shd.mtxMap = make(ServerSpanMetricsMap) - shd.maxMtx = store.msink.maxMtx shd.exited.Add(1) go shd.processIncoming() } store.hb = NewHeartbeater("DatastoreHeartbeater", - cnf.GetInt64(conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS), lg) + cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), lg) for shdIdx := range store.shards { shd := store.shards[shdIdx] store.hb.AddHeartbeatTarget(&HeartbeatTarget{ @@ -606,7 +595,7 @@ func CreateShard(store *dataStore, cnf *conf.Config, path string, } spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) shd = &shard{store: store, ldb: ldb, path: path, - incoming: make(chan *IncomingSpan, spanBufferSize)} + incoming: make(chan []*IncomingSpan, spanBufferSize)} return shd, nil } @@ -654,10 +643,6 @@ func (store *dataStore) Close() { store.rpr.Shutdown() store.rpr = nil } - if store.msink != nil { - store.msink.Shutdown() - store.msink = nil - } if store.readOpts != nil { store.readOpts.Close() store.readOpts = nil @@ -677,8 +662,158 @@ func (store *dataStore) getShardIndex(sid common.SpanId) int { return int(sid.Hash32() % uint32(len(store.shards))) } -func (store *dataStore) WriteSpan(span *IncomingSpan) { - store.shards[store.getShardIndex(span.Id)].incoming <- span +const WRITESPANS_BATCH_SIZE = 128 + +// SpanIngestor is a class used internally to ingest spans from an RPC +// endpoint. It groups spans destined for a particular shard into small +// batches, so that we can reduce the number of objects that need to be sent +// over the shard's "incoming" channel. Since sending objects over a channel +// requires goroutine synchronization, this improves performance. +// +// SpanIngestor also allows us to reuse the same encoder object for many spans, +// rather than creating a new encoder per span. This avoids re-doing the +// encoder setup for each span, and also generates less garbage. +type SpanIngestor struct { + // The logger to use. + lg *common.Logger + + // The dataStore we are ingesting spans into. + store *dataStore + + // The remote address these spans are coming from. + addr string + + // Default TracerId + defaultTrid string + + // The msgpack handle to use to serialize the spans. + mh codec.MsgpackHandle + + // The msgpack encoder to use to serialize the spans. + // Caching this avoids generating a lot of garbage and burning CPUs + // creating new encoder objects for each span. + enc *codec.Encoder + + // The buffer which codec.Encoder is currently serializing to. + // We have to create a new buffer for each span because once we hand it off to the shard, the + // shard manages the buffer lifecycle. + spanDataBytes []byte + + // An array mapping shard index to span batch. + batches []*SpanIngestorBatch + + // The total number of spans ingested. Includes dropped spans. + totalIngested int + + // The total number of spans the ingestor dropped because of a server-side error. + serverDropped int +} + +// A batch of spans destined for a particular shard. +type SpanIngestorBatch struct { + incoming []*IncomingSpan +} + +func (store *dataStore) NewSpanIngestor(lg *common.Logger, + addr string, defaultTrid string) *SpanIngestor { + ing := &SpanIngestor { + lg: lg, + store: store, + addr: addr, + defaultTrid: defaultTrid, + spanDataBytes: make([]byte, 0, 1024), + batches: make([]*SpanIngestorBatch, len(store.shards)), + } + ing.mh.WriteExt = true + ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh) + for batchIdx := range(ing.batches) { + ing.batches[batchIdx] = &SpanIngestorBatch { + incoming: make([]*IncomingSpan, 0, WRITESPANS_BATCH_SIZE), + } + } + return ing +} + +func (ing *SpanIngestor) IngestSpan(span *common.Span) { + ing.totalIngested++ + // Make sure the span ID is valid. + spanIdProblem := span.Id.FindProblem() + if spanIdProblem != "" { + // Can't print the invalid span ID because String() might fail. + ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem) + ing.serverDropped++ + return + } + + // Set the default tracer id, if needed. + if span.TracerId == "" { + span.TracerId = ing.defaultTrid + } + + // Encode the span data. Doing the encoding here is better than doing it + // in the shard goroutine, because we can achieve more parallelism. + // There is one shard goroutine per shard, but potentially many more + // ingestors per shard. + err := ing.enc.Encode(span.SpanData) + if err != nil { + ing.lg.Warnf("Failed to encode span ID %s: %s\n", + span.Id.String(), err.Error()) + ing.serverDropped++ + return + } + spanDataBytes := ing.spanDataBytes + ing.spanDataBytes = make([]byte, 0, 1024) + ing.enc.ResetBytes(&ing.spanDataBytes) + + // Determine which shard this span should go to. + shardIdx := ing.store.getShardIndex(span.Id) + batch := ing.batches[shardIdx] + incomingLen := len(batch.incoming) + if ing.lg.TraceEnabled() { + ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, " + + "incomingLen=%d, cap(batch.incoming)=%d\n", + span.Id.String(), shardIdx, incomingLen, cap(batch.incoming)) + } + if incomingLen + 1 == cap(batch.incoming) { + if ing.lg.TraceEnabled() { + ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d spans for " + + "shard %d\n", len(batch.incoming), shardIdx) + } + ing.store.WriteSpans(shardIdx, batch.incoming) + batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE) + incomingLen = 0 + } else { + batch.incoming = batch.incoming[0:incomingLen+1] + } + batch.incoming[incomingLen] = &IncomingSpan { + Addr: ing.addr, + Span: span, + SpanDataBytes: spanDataBytes, + } +} + +func (ing *SpanIngestor) Close(clientDropped int, startTime time.Time) { + for shardIdx := range(ing.batches) { + batch := ing.batches[shardIdx] + if len(batch.incoming) > 0 { + if ing.lg.TraceEnabled() { + ing.lg.Tracef("SpanIngestor#Close: flushing %d span(s) for " + + "shard %d\n", len(batch.incoming), shardIdx) + } + ing.store.WriteSpans(shardIdx, batch.incoming) + } + batch.incoming = nil + } + ing.lg.Debugf("Closed span ingestor for %s. Ingested %d span(s); dropped " + + "%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped) + + endTime := time.Now() + ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested, + ing.serverDropped, clientDropped, endTime.Sub(startTime)) +} + +func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) { + store.shards[shardIdx].incoming <- ispans } func (store *dataStore) FindSpan(sid common.SpanId) *common.Span { @@ -709,14 +844,14 @@ func (shd *shard) FindSpan(sid common.SpanId) *common.Span { func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) { r := bytes.NewBuffer(buf) - decoder := gob.NewDecoder(r) + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + decoder := codec.NewDecoder(r, mh) data := common.SpanData{} err := decoder.Decode(&data) if err != nil { return nil, err } - // Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with - // non-nil slices. if data.Parents == nil { data.Parents = []common.SpanId{} } @@ -1259,19 +1394,6 @@ func (store *dataStore) ServerStats() *common.ServerStats { serverStats.LastStartMs = store.startMs serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC()) serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans) - wsData := store.msink.wsm.GetData() - serverStats.HostSpanMetrics = store.msink.AccessServerTotals() - for k, v := range wsData.clientDroppedMap { - smtx := serverStats.HostSpanMetrics[k] - if smtx == nil { - smtx = &common.SpanMetrics {} - serverStats.HostSpanMetrics[k] = smtx - } - smtx.ClientDropped = v - } - serverStats.IngestedSpans = wsData.ingestedSpans - serverStats.ClientDroppedSpans = wsData.clientDroppedSpans - serverStats.MaxWriteSpansLatencyMs = wsData.latencyMax - serverStats.AverageWriteSpansLatencyMs = wsData.latencyAverage + store.msink.PopulateServerStats(&serverStats) return &serverStats } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go index d9f4a0a..e6d1df7 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go @@ -73,12 +73,11 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{ } func createSpans(spans []common.Span, store *dataStore) { + ing := store.NewSpanIngestor(store.lg, "127.0.0.1", "") for idx := range spans { - store.WriteSpan(&IncomingSpan{ - Addr: "127.0.0.1", - Span: &spans[idx], - }) + ing.IngestSpan(&spans[idx]) } + ing.Close(0, time.Now()) store.WrittenSpans.Waits(int64(len(spans))) } @@ -87,7 +86,7 @@ func TestDatastoreWriteAndRead(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead", Cnf: map[string]string{ - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", }, WrittenSpans: common.NewSemaphore(0), } @@ -151,7 +150,7 @@ func TestSimpleQuery(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery", Cnf: map[string]string{ - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", }, WrittenSpans: common.NewSemaphore(0), } @@ -161,11 +160,8 @@ func TestSimpleQuery(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1": &common.SpanMetrics{ - Written: uint64(len(SIMPLE_TEST_SPANS)), - }, - }) + + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) testQuery(t, ht, &common.Query{ Predicates: []common.Predicate{ @@ -183,7 +179,7 @@ func TestQueries2(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestQueries2", Cnf: map[string]string{ - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", }, WrittenSpans: common.NewSemaphore(0), } @@ -193,11 +189,7 @@ func TestQueries2(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1": &common.SpanMetrics{ - Written: uint64(len(SIMPLE_TEST_SPANS)), - }, - }) + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) testQuery(t, ht, &common.Query{ Predicates: []common.Predicate{ common.Predicate{ @@ -241,7 +233,7 @@ func TestQueries3(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestQueries3", Cnf: map[string]string{ - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", }, WrittenSpans: common.NewSemaphore(0), } @@ -251,11 +243,7 @@ func TestQueries3(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1": &common.SpanMetrics{ - Written: uint64(len(SIMPLE_TEST_SPANS)), - }, - }) + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) testQuery(t, ht, &common.Query{ Predicates: []common.Predicate{ common.Predicate{ @@ -299,7 +287,7 @@ func TestQueries4(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestQueries4", Cnf: map[string]string{ - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", }, WrittenSpans: common.NewSemaphore(0), } @@ -345,7 +333,7 @@ func TestQueries4(t *testing.T) { func BenchmarkDatastoreWrites(b *testing.B) { htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites", Cnf: map[string]string{ - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "15000", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", conf.HTRACE_LOG_LEVEL: "INFO", }, WrittenSpans: common.NewSemaphore(0), @@ -372,25 +360,20 @@ func BenchmarkDatastoreWrites(b *testing.B) { b.ResetTimer() // Write many random spans. + ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "") for n := 0; n < b.N; n++ { - ht.Store.WriteSpan(&IncomingSpan{ - Addr: "127.0.0.1", - Span: allSpans[n], - }) + ing.IngestSpan(allSpans[n]) } + ing.Close(0, time.Now()) // Wait for all the spans to be written. ht.Store.WrittenSpans.Waits(int64(b.N)) - waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1": &common.SpanMetrics{ - Written: uint64(b.N), // should be less than? - }, - }) + assertNumWrittenEquals(b, ht.Store.msink, b.N) } func TestReloadDataStore(t *testing.T) { htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore", Cnf: map[string]string{ - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", }, DataDirs: make([]string, 2), KeepDataDirsOnClose: true, @@ -494,7 +477,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1", Cnf: map[string]string{ - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", }, WrittenSpans: common.NewSemaphore(0), } @@ -504,11 +487,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1": &common.SpanMetrics{ - Written: uint64(len(SIMPLE_TEST_SPANS)), - }, - }) + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) // Adding a prev value to this query excludes the first result that we // would normally get. testQuery(t, ht, &common.Query{ http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go index a0f2e81..a649420 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go @@ -266,26 +266,11 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, return errors.New(fmt.Sprintf("Failed to split host and port " + "for %s: %s\n", req.Addr, err.Error())) } - for i := range req.Spans { - span := req.Spans[i] - spanIdProblem := span.Id.FindProblem() - if spanIdProblem != "" { - return errors.New(fmt.Sprintf("Invalid span ID: %s", spanIdProblem)) - } - if span.TracerId == "" { - span.TracerId = req.DefaultTrid - } - if hand.lg.TraceEnabled() { - hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson()) - } - hand.store.WriteSpan(&IncomingSpan{ - Addr: client, - Span: span, - }) + ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid) + for spanIdx := range req.Spans { + ing.IngestSpan(req.Spans[spanIdx]) } - endTime := time.Now() - hand.store.msink.Update(client, req.ClientDropped, len(req.Spans), - endTime.Sub(startTime)) + ing.Close(int(req.ClientDropped), startTime) return nil } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go index cfff418..5ce3339 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go @@ -20,7 +20,6 @@ package main import ( - "encoding/json" "math" "org/apache/htrace/common" "org/apache/htrace/conf" @@ -40,252 +39,114 @@ import ( const LATENCY_CIRC_BUF_SIZE = 4096 -type ServerSpanMetrics struct { - // The total number of spans written to HTraced. - Written uint64 - - // The total number of spans dropped by the server. - ServerDropped uint64 -} - -func (spm *ServerSpanMetrics) Clone() *ServerSpanMetrics { - return &ServerSpanMetrics{ - Written: spm.Written, - ServerDropped: spm.ServerDropped, - } -} - -func (spm *ServerSpanMetrics) String() string { - jbytes, err := json.Marshal(*spm) - if err != nil { - panic(err) - } - return string(jbytes) -} - -func (spm *ServerSpanMetrics) Add(ospm *ServerSpanMetrics) { - spm.Written += ospm.Written - spm.ServerDropped += ospm.ServerDropped -} - -func (spm *ServerSpanMetrics) Clear() { - spm.Written = 0 - spm.ServerDropped = 0 -} - -// A map from network address strings to ServerSpanMetrics structures. -type ServerSpanMetricsMap map[string]*ServerSpanMetrics - -func (smtxMap ServerSpanMetricsMap) IncrementDropped(addr string, maxMtx int, - lg *common.Logger) { - mtx := smtxMap[addr] - if mtx == nil { - mtx = &ServerSpanMetrics{} - smtxMap[addr] = mtx - } - mtx.ServerDropped++ - smtxMap.Prune(maxMtx, lg) -} - -func (smtxMap ServerSpanMetricsMap) IncrementWritten(addr string, maxMtx int, - lg *common.Logger) { - mtx := smtxMap[addr] - if mtx == nil { - mtx = &ServerSpanMetrics{} - smtxMap[addr] = mtx - } - mtx.Written++ - smtxMap.Prune(maxMtx, lg) -} - -func (smtxMap ServerSpanMetricsMap) Prune(maxMtx int, lg *common.Logger) { - if len(smtxMap) >= maxMtx { - // Delete a random entry - for k := range smtxMap { - lg.Warnf("Evicting metrics entry for addr %s "+ - "because there are more than %d addrs.\n", k, maxMtx) - delete(smtxMap, k) - return - } - } -} - -type AccessReq struct { - mtxMap common.SpanMetricsMap - done chan interface{} -} - type MetricsSink struct { - // The total span metrics. - smtxMap ServerSpanMetricsMap - - // A channel of incoming shard metrics. - // When this is shut down, the MetricsSink will exit. - updateReqs chan ServerSpanMetricsMap - - // A channel of incoming requests for shard metrics. - accessReqs chan *AccessReq - - // This will be closed when the MetricsSink has exited. - exited chan interface{} - - // The logger used by this MetricsSink. + // The metrics sink logger. lg *common.Logger - // The maximum number of metrics totals we will maintain. + // The maximum number of entries we shuld allow in the HostSpanMetrics map. maxMtx int - // Metrics about WriteSpans requests - wsm WriteSpanMetrics -} + // The total number of spans ingested by the server (counting dropped spans) + IngestedSpans uint64 -func NewMetricsSink(cnf *conf.Config) *MetricsSink { - mcl := MetricsSink{ - smtxMap: make(ServerSpanMetricsMap), - updateReqs: make(chan ServerSpanMetricsMap, 128), - accessReqs: make(chan *AccessReq), - exited: make(chan interface{}), - lg: common.NewLogger("metrics", cnf), - maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES), - wsm: WriteSpanMetrics { - clientDroppedMap: make(map[string]uint64), - latencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE), - }, - } - go mcl.run() - return &mcl -} - -func (msink *MetricsSink) run() { - lg := msink.lg - defer func() { - lg.Info("MetricsSink: stopping service goroutine.\n") - close(msink.exited) - }() - lg.Tracef("MetricsSink: starting.\n") - for { - select { - case updateReq, open := <-msink.updateReqs: - if !open { - lg.Trace("MetricsSink: shutting down cleanly.\n") - return - } - for addr, umtx := range updateReq { - smtx := msink.smtxMap[addr] - if smtx == nil { - smtx = &ServerSpanMetrics{} - msink.smtxMap[addr] = smtx - } - smtx.Add(umtx) - if lg.TraceEnabled() { - lg.Tracef("MetricsSink: updated %s to %s\n", addr, smtx.String()) - } - } - msink.smtxMap.Prune(msink.maxMtx, lg) - case accessReq := <-msink.accessReqs: - msink.handleAccessReq(accessReq) - } - } -} - -func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) { - msink.lg.Debug("MetricsSink: accessing global metrics.\n") - defer close(accessReq.done) - for addr, smtx := range msink.smtxMap { - accessReq.mtxMap[addr] = &common.SpanMetrics{ - Written: smtx.Written, - ServerDropped: smtx.ServerDropped, - } - } -} - -func (msink *MetricsSink) AccessServerTotals() common.SpanMetricsMap { - accessReq := &AccessReq{ - mtxMap: make(common.SpanMetricsMap), - done: make(chan interface{}), - } - msink.accessReqs <- accessReq - <-accessReq.done - return accessReq.mtxMap -} - -func (msink *MetricsSink) UpdateMetrics(mtxMap ServerSpanMetricsMap) { - msink.updateReqs <- mtxMap -} - -func (msink *MetricsSink) Shutdown() { - close(msink.updateReqs) - <-msink.exited -} + // The total number of spans written to leveldb since the server started. + WrittenSpans uint64 -type WriteSpanMetrics struct { - // Lock protecting WriteSpanMetrics - lock sync.Mutex - - // The number of spans which each client has self-reported that it has - // dropped. - clientDroppedMap map[string]uint64 + // The total number of spans dropped by the server. + ServerDropped uint64 - // The total number of new span writes we've gotten since startup. - ingestedSpans uint64 + // The total number of spans dropped by the client (self-reported). + ClientDroppedEstimate uint64 - // The total number of spans all clients have dropped since startup. - clientDroppedSpans uint64 + // Per-host Span Metrics + HostSpanMetrics common.SpanMetricsMap // The last few writeSpan latencies - latencyCircBuf *CircBufU32 -} + wsLatencyCircBuf *CircBufU32 -type WriteSpanMetricsData struct { - clientDroppedMap map[string]uint64 - ingestedSpans uint64 - clientDroppedSpans uint64 - latencyMax uint32 - latencyAverage uint32 + // Lock protecting all metrics + lock sync.Mutex } -func (msink *MetricsSink) Update(client string, clientDropped uint64, clientWritten int, - wsLatency time.Duration) { - wsLatencyNs := wsLatency.Nanoseconds() / 1000000 +func NewMetricsSink(cnf *conf.Config) *MetricsSink { + return &MetricsSink { + lg: common.NewLogger("metrics", cnf), + maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES), + HostSpanMetrics: make(common.SpanMetricsMap), + wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE), + } +} + +// Update the total number of spans which were ingested, as well as other +// metrics that get updated during span ingest. +func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int, + serverDropped int, clientDroppedEstimate int, wsLatency time.Duration) { + msink.lock.Lock() + defer msink.lock.Unlock() + msink.IngestedSpans += uint64(totalIngested) + msink.ServerDropped += uint64(serverDropped) + msink.ClientDroppedEstimate += uint64(clientDroppedEstimate) + msink.updateSpanMetrics(addr, 0, serverDropped, clientDroppedEstimate) + wsLatencyMs := wsLatency.Nanoseconds() / 1000000 var wsLatency32 uint32 - if wsLatencyNs > math.MaxUint32 { + if wsLatencyMs > math.MaxUint32 { wsLatency32 = math.MaxUint32 } else { - wsLatency32 = uint32(wsLatencyNs) - } - msink.wsm.update(msink.maxMtx, client, clientDropped, clientWritten, wsLatency32) -} - -func (wsm *WriteSpanMetrics) update(maxMtx int, client string, clientDropped uint64, - clientWritten int, wsLatency uint32) { - wsm.lock.Lock() - defer wsm.lock.Unlock() - wsm.clientDroppedMap[client] = clientDropped - if len(wsm.clientDroppedMap) >= maxMtx { - // Delete a random entry - for k := range wsm.clientDroppedMap { - delete(wsm.clientDroppedMap, k) - return + wsLatency32 = uint32(wsLatencyMs) + } + msink.wsLatencyCircBuf.Append(wsLatency32) +} + +// Update the per-host span metrics. Must be called with the lock held. +func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int, + serverDropped int, clientDroppedEstimate int) { + mtx, found := msink.HostSpanMetrics[addr] + if !found { + // Ensure that the per-host span metrics map doesn't grow too large. + if len(msink.HostSpanMetrics) >= msink.maxMtx { + // Delete a random entry + for k := range msink.HostSpanMetrics { + msink.lg.Warnf("Evicting metrics entry for addr %s "+ + "because there are more than %d addrs.\n", k, msink.maxMtx) + delete(msink.HostSpanMetrics, k) + break + } + } + mtx = &common.SpanMetrics { } + msink.HostSpanMetrics[addr] = mtx + } + mtx.Written += uint64(numWritten) + mtx.ServerDropped += uint64(serverDropped) + mtx.ClientDroppedEstimate += uint64(clientDroppedEstimate) +} + +// Update the total number of spans which were persisted to disk. +func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int, + serverDropped int) { + msink.lock.Lock() + defer msink.lock.Unlock() + msink.WrittenSpans += uint64(totalWritten) + msink.ServerDropped += uint64(serverDropped) + msink.updateSpanMetrics(addr, totalWritten, serverDropped, 0) +} + +// Read the server stats. +func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) { + msink.lock.Lock() + defer msink.lock.Unlock() + stats.IngestedSpans = msink.IngestedSpans + stats.WrittenSpans = msink.WrittenSpans + stats.ServerDroppedSpans = msink.ServerDropped + stats.ClientDroppedEstimate = msink.ClientDroppedEstimate + stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max() + stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average() + stats.HostSpanMetrics = make(common.SpanMetricsMap) + for k, v := range(msink.HostSpanMetrics) { + stats.HostSpanMetrics[k] = &common.SpanMetrics { + Written: v.Written, + ServerDropped: v.ServerDropped, + ClientDroppedEstimate: v.ClientDroppedEstimate, } - } - wsm.ingestedSpans += uint64(clientWritten) - wsm.clientDroppedSpans += uint64(clientDropped) - wsm.latencyCircBuf.Append(wsLatency) -} - -func (wsm *WriteSpanMetrics) GetData() *WriteSpanMetricsData { - wsm.lock.Lock() - defer wsm.lock.Unlock() - clientDroppedMap := make(map[string]uint64) - for k, v := range wsm.clientDroppedMap { - clientDroppedMap[k] = v - } - return &WriteSpanMetricsData { - clientDroppedMap: clientDroppedMap, - ingestedSpans: wsm.ingestedSpans, - clientDroppedSpans: wsm.clientDroppedSpans, - latencyMax: wsm.latencyCircBuf.Max(), - latencyAverage: wsm.latencyCircBuf.Average(), } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go index 5243d9e..e1dba1f 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go @@ -20,6 +20,7 @@ package main import ( + "fmt" htrace "org/apache/htrace/client" "org/apache/htrace/common" "org/apache/htrace/conf" @@ -28,43 +29,6 @@ import ( "time" ) -func TestMetricsSinkStartupShutdown(t *testing.T) { - cnfBld := conf.Builder{ - Values: conf.TEST_VALUES(), - Defaults: conf.DEFAULTS, - } - cnf, err := cnfBld.Build() - if err != nil { - t.Fatalf("failed to create conf: %s", err.Error()) - } - msink := NewMetricsSink(cnf) - msink.Shutdown() -} - -func TestAddSpanMetrics(t *testing.T) { - a := &ServerSpanMetrics{ - Written: 100, - ServerDropped: 200, - } - b := &ServerSpanMetrics{ - Written: 500, - ServerDropped: 100, - } - a.Add(b) - if a.Written != 600 { - t.Fatalf("SpanMetrics#Add failed to update #Written") - } - if a.ServerDropped != 300 { - t.Fatalf("SpanMetrics#Add failed to update #Dropped") - } - if b.Written != 500 { - t.Fatalf("SpanMetrics#Add updated b#Written") - } - if b.ServerDropped != 100 { - t.Fatalf("SpanMetrics#Add updated b#Dropped") - } -} - func compareTotals(a, b common.SpanMetricsMap) bool { for k, v := range a { if !reflect.DeepEqual(v, b[k]) { @@ -79,112 +43,52 @@ func compareTotals(a, b common.SpanMetricsMap) bool { return true } -func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) { - for { - time.Sleep(1 * time.Millisecond) - totals := msink.AccessServerTotals() - if compareTotals(totals, expectedTotals) { - return - } - } +type Fatalfer interface { + Fatalf(format string, args ...interface{}) } -func TestMetricsSinkMessages(t *testing.T) { - cnfBld := conf.Builder{ - Values: conf.TEST_VALUES(), - Defaults: conf.DEFAULTS, - } - cnf, err := cnfBld.Build() - if err != nil { - t.Fatalf("failed to create conf: %s", err.Error()) - } - msink := NewMetricsSink(cnf) - totals := msink.AccessServerTotals() - if len(totals) != 0 { - t.Fatalf("Expected no data in the MetricsSink to start with.") +func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink, + expectedNumWritten int) { + var sstats common.ServerStats + msink.PopulateServerStats(&sstats) + if sstats.WrittenSpans != uint64(expectedNumWritten) { + t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n", + sstats.WrittenSpans, len(SIMPLE_TEST_SPANS)) + } + if sstats.HostSpanMetrics["127.0.0.1"] == nil { + t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] found.") + } + if sstats.HostSpanMetrics["127.0.0.1"].Written != + uint64(expectedNumWritten) { + t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but " + + "expected %d\n", sstats.HostSpanMetrics["127.0.0.1"].Written, + len(SIMPLE_TEST_SPANS)) } - msink.UpdateMetrics(ServerSpanMetricsMap{ - "192.168.0.100": &ServerSpanMetrics{ - Written: 20, - ServerDropped: 10, - }, - }) - waitForMetrics(msink, common.SpanMetricsMap{ - "192.168.0.100": &common.SpanMetrics{ - Written: 20, - ServerDropped: 10, - }, - }) - msink.UpdateMetrics(ServerSpanMetricsMap{ - "192.168.0.100": &ServerSpanMetrics{ - Written: 200, - ServerDropped: 100, - }, - }) - msink.UpdateMetrics(ServerSpanMetricsMap{ - "192.168.0.100": &ServerSpanMetrics{ - Written: 1000, - ServerDropped: 1000, - }, - }) - waitForMetrics(msink, common.SpanMetricsMap{ - "192.168.0.100": &common.SpanMetrics{ - Written: 1220, - ServerDropped: 1110, - }, - }) - msink.UpdateMetrics(ServerSpanMetricsMap{ - "192.168.0.200": &ServerSpanMetrics{ - Written: 200, - ServerDropped: 100, - }, - }) - waitForMetrics(msink, common.SpanMetricsMap{ - "192.168.0.100": &common.SpanMetrics{ - Written: 1220, - ServerDropped: 1110, - }, - "192.168.0.200": &common.SpanMetrics{ - Written: 200, - ServerDropped: 100, - }, - }) - msink.Shutdown() } -func TestMetricsSinkMessagesEviction(t *testing.T) { +func TestMetricsSinkPerHostEviction(t *testing.T) { cnfBld := conf.Builder{ Values: conf.TEST_VALUES(), Defaults: conf.DEFAULTS, } cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2" - cnfBld.Values[conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS] = "1" cnf, err := cnfBld.Build() if err != nil { t.Fatalf("failed to create conf: %s", err.Error()) } msink := NewMetricsSink(cnf) - msink.UpdateMetrics(ServerSpanMetricsMap{ - "192.168.0.100": &ServerSpanMetrics{ - Written: 20, - ServerDropped: 10, - }, - "192.168.0.101": &ServerSpanMetrics{ - Written: 20, - ServerDropped: 10, - }, - "192.168.0.102": &ServerSpanMetrics{ - Written: 20, - ServerDropped: 10, - }, - }) - for { - totals := msink.AccessServerTotals() - if len(totals) == 2 { - break + msink.UpdatePersisted("192.168.0.100", 20, 10) + msink.UpdatePersisted("192.168.0.101", 20, 10) + msink.UpdatePersisted("192.168.0.102", 20, 10) + msink.lock.Lock() + defer msink.lock.Unlock() + if len(msink.HostSpanMetrics) != 2 { + for k, v := range(msink.HostSpanMetrics) { + fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v) } + t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got %d\n", + len(msink.HostSpanMetrics)) } - msink.Shutdown() } func TestIngestedSpansMetricsRest(t *testing.T) { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go index dcc916a..0140dbb 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go @@ -43,7 +43,7 @@ func TestReapingOldSpans(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 60*60*1000), conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1", - conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1", }, WrittenSpans: common.NewSemaphore(0), DataDirs: make([]string, 2), @@ -52,12 +52,11 @@ func TestReapingOldSpans(t *testing.T) { if err != nil { t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error()) } - for i := range testSpans { - ht.Store.WriteSpan(&IncomingSpan{ - Addr: "127.0.0.1", - Span: testSpans[i], - }) + ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "") + for spanIdx := range testSpans { + ing.IngestSpan(testSpans[spanIdx]) } + ing.Close(0, time.Now()) // Wait the spans to be created ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS) // Set a reaper date that will remove all the spans except final one. http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go index 432375d..1b90bd4 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go @@ -252,27 +252,12 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques } hand.lg.Debugf("writeSpansHandler: received %d span(s). defaultTrid = %s\n", len(msg.Spans), msg.DefaultTrid) + + ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid) for spanIdx := range msg.Spans { - if hand.lg.DebugEnabled() { - hand.lg.Debugf("writing span %s\n", msg.Spans[spanIdx].ToJson()) - } - span := msg.Spans[spanIdx] - if span.TracerId == "" { - span.TracerId = msg.DefaultTrid - } - spanIdProblem := span.Id.FindProblem() - if spanIdProblem != "" { - hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", spanIdProblem)) - } else { - hand.store.WriteSpan(&IncomingSpan{ - Addr: client, - Span: span, - }) - } + ing.IngestSpan(msg.Spans[spanIdx]) } - endTime := time.Now() - hand.store.msink.Update(client, msg.ClientDropped, len(msg.Spans), - endTime.Sub(startTime)) + ing.Close(int(msg.ClientDropped), startTime) } type queryHandler struct { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go index 7b5e433..c81bbb7 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go +++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go @@ -216,7 +216,10 @@ func printServerStats(hcl *htrace.Client) int { common.UnixMsToTime(stats.CurMs).Format(time.RFC3339)) fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans) fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans) - fmt.Fprintf(w, "Spans dropped by clients\t%d\n", stats.ClientDroppedSpans) + fmt.Fprintf(w, "Spans written\t%d\n", stats.WrittenSpans) + fmt.Fprintf(w, "Spans dropped by server\t%d\n", stats.ServerDroppedSpans) + fmt.Fprintf(w, "Estimated spans dropped by clients\t%d\n", + stats.ClientDroppedEstimate) dur := time.Millisecond * time.Duration(stats.AverageWriteSpansLatencyMs) fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String()) dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs) @@ -244,8 +247,8 @@ func printServerStats(hcl *htrace.Client) int { sort.Sort(keys) for k := range keys { mtx := mtxMap[keys[k]] - fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped: %d\n", - keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped) + fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped estimate: %d\n", + keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDroppedEstimate) } w.Flush() return EXIT_SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/app/server_info_view.js ---------------------------------------------------------------------- diff --git a/htrace-webapp/src/main/webapp/app/server_info_view.js b/htrace-webapp/src/main/webapp/app/server_info_view.js index 62775e8..efb7545 100644 --- a/htrace-webapp/src/main/webapp/app/server_info_view.js +++ b/htrace-webapp/src/main/webapp/app/server_info_view.js @@ -50,7 +50,7 @@ htrace.ServerInfoView = Backbone.View.extend({ '<th>Remote</th>' + '<th>Written</th>' + '<th>ServerDropped</th>' + - '<th>ClientDropped</th>' + + '<th>ClientDroppedEstimate</th>' + '</tr>' + '</thead>'; var remotes = []; @@ -69,7 +69,7 @@ htrace.ServerInfoView = Backbone.View.extend({ "<td>" + remote + "</td>" + "<td>" + smtx.Written + "</td>" + "<td>" + smtx.ServerDropped + "</td>" + - "<td>" + smtx.ClientDropped + "</td>" + + "<td>" + smtx.ClientDroppedEstimate + "</td>" + "</tr>"; } out = out + '</table>'; http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/app/server_stats.js ---------------------------------------------------------------------- diff --git a/htrace-webapp/src/main/webapp/app/server_stats.js b/htrace-webapp/src/main/webapp/app/server_stats.js index 783041c..4cfea92 100644 --- a/htrace-webapp/src/main/webapp/app/server_stats.js +++ b/htrace-webapp/src/main/webapp/app/server_stats.js @@ -22,8 +22,13 @@ htrace.ServerStats = Backbone.Model.extend({ defaults: { "LastStartMs": "0", "CurMs": "0", + "ReapedSpans": "(unknown)", "IngestedSpans": "(unknown)", - "ReapedSpans": "(unknown)" + "WrittenSpans": "(unknown)", + "ServerDroppedSpans": "(unknown)", + "ClientDroppedSpans": "(unknown)", + "MaxWriteSpansLatencyMs": "(unknown)", + "AverageWriteSpansLatencyMs": "(unknown)" }, url: function() { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/index.html ---------------------------------------------------------------------- diff --git a/htrace-webapp/src/main/webapp/index.html b/htrace-webapp/src/main/webapp/index.html index 16fd2f9..1e20ec0 100644 --- a/htrace-webapp/src/main/webapp/index.html +++ b/htrace-webapp/src/main/webapp/index.html @@ -84,8 +84,16 @@ <td><%= model.stats.get("IngestedSpans") %></td> </tr> <tr> - <td>Client Dropped Spans</td> - <td><%= model.stats.get("ClientDroppedSpans") %></td> + <td>Spans Written</td> + <td><%= model.stats.get("WrittenSpans") %></td> + </tr> + <tr> + <td>Server Dropped Spans</td> + <td><%= model.stats.get("ServerDroppedSpans") %></td> + </tr> + <tr> + <td>Estimated Client Dropped Spans</td> + <td><%= model.stats.get("ClientDroppedEstimate") %></td> </tr> <tr> <td>Maximum WriteSpans Latency (ms)</td>
