Repository: incubator-htrace Updated Branches: refs/heads/master 97faf708b -> c28fc60dc
HTRACE-280. htraced: add metrics about total spans added and dropped per address (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/c28fc60d Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/c28fc60d Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/c28fc60d Branch: refs/heads/master Commit: c28fc60dcfd83f61afcb7c2685ada1f2fafe7bf0 Parents: 97faf70 Author: Colin P. Mccabe <[email protected]> Authored: Thu Oct 22 19:38:38 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Thu Oct 22 19:39:09 2015 -0700 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/common/process.go | 2 +- .../org/apache/htrace/common/process_test.go | 8 +- .../go/src/org/apache/htrace/common/rpc.go | 58 +++-- .../src/org/apache/htrace/conf/config_keys.go | 18 ++ .../go/src/org/apache/htrace/htrace/cmd.go | 29 ++- .../org/apache/htrace/htraced/client_test.go | 4 +- .../src/org/apache/htrace/htraced/datastore.go | 121 +++++++--- .../org/apache/htrace/htraced/datastore_test.go | 97 +++++--- .../org/apache/htrace/htraced/heartbeater.go | 117 ++++++++++ .../apache/htrace/htraced/heartbeater_test.go | 100 ++++++++ .../go/src/org/apache/htrace/htraced/hrpc.go | 18 +- .../go/src/org/apache/htrace/htraced/metrics.go | 234 +++++++++++++++++++ .../org/apache/htrace/htraced/metrics_test.go | 187 +++++++++++++++ .../org/apache/htrace/htraced/mini_htraced.go | 10 +- .../go/src/org/apache/htrace/htraced/rest.go | 8 +- 15 files changed, 906 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/process.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/process.go b/htrace-htraced/go/src/org/apache/htrace/common/process.go index 419d6fe..aad6ca1 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/process.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/process.go @@ -69,7 +69,7 @@ func InstallSignalHandlers(cnf *conf.Config) { sigQuitChan := make(chan os.Signal, 1) signal.Notify(sigQuitChan, syscall.SIGQUIT) go func() { - bufSize := 1<<20 + bufSize := 1 << 20 buf := make([]byte, bufSize) for { <-sigQuitChan http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/process_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/process_test.go b/htrace-htraced/go/src/org/apache/htrace/common/process_test.go index 7609133..d3f5a56 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/process_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/process_test.go @@ -42,7 +42,7 @@ func TestSignals(t *testing.T) { os.Exit(0) } helper := exec.Command(os.Args[0], "-test.run=TestSignals", "--") - helper.Env = []string { HTRACED_TEST_HELPER_PROCESS + "=1" } + helper.Env = []string{HTRACED_TEST_HELPER_PROCESS + "=1"} stdoutPipe, err := helper.StdoutPipe() if err != nil { panic(fmt.Sprintf("Failed to open pipe to process stdout: %s", @@ -77,7 +77,7 @@ func TestSignals(t *testing.T) { } t.Logf("Saw 'Terminating on signal: SIGINT'. " + "Helper goroutine exiting.\n") - done<-nil + done <- nil }() scanner := bufio.NewScanner(stderrPipe) for scanner.Scan() { @@ -97,9 +97,9 @@ func TestSignals(t *testing.T) { // Run the helper process which TestSignals spawns. func runHelperProcess() { - cnfMap := map[string]string { + cnfMap := map[string]string{ conf.HTRACE_LOG_LEVEL: "TRACE", - conf.HTRACE_LOG_PATH: "", // log to stdout + conf.HTRACE_LOG_PATH: "", // log to stdout } cnfBld := conf.Builder{Values: cnfMap, Defaults: conf.DEFAULTS} cnf, err := cnfBld.Build() http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/rpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go index 9c7bfad..34ed15e 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go @@ -38,8 +38,10 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024 // A request to write spans to htraced. type WriteSpansReq struct { - DefaultTrid string `json:",omitempty"` - Spans []*Span + Addr string // This gets filled in by the RPC layer. + DefaultTrid string `json:",omitempty"` + Spans []*Span + ClientDropped uint64 `json:",omitempty"` } // Info returned by /server/info @@ -55,22 +57,6 @@ type ServerInfo struct { type WriteSpansResp struct { } -// Info returned by /server/stats -type ServerStats struct { - Shards []ShardStats -} - -type ShardStats struct { - Path string - - // The approximate number of spans present in this shard. This may be an - // underestimate. - ApproxNumSpans uint64 - - // leveldb.stats information - LevelDbStats string -} - // The header which is sent over the wire for HRPC type HrpcRequestHeader struct { Magic uint32 @@ -104,3 +90,39 @@ func HrpcMethodNameToId(name string) uint32 { return METHOD_ID_NONE } } + +type SpanMetrics struct { + // The total number of spans written to HTraced. + Written uint64 + + // The total number of spans dropped by the server. + ServerDropped uint64 + + // The total number of spans dropped by the client. Note that this number + // is tracked on the client itself and doesn't get updated if the client + // can't contact the server. + ClientDropped uint64 +} + +// A map from network address strings to SpanMetrics structures. +type SpanMetricsMap map[string]*SpanMetrics + +// Info returned by /server/stats +type ServerStats struct { + // Statistics for each shard (directory) + Dirs []StorageDirectoryStats + + // Per-host Span Metrics + HostSpanMetrics SpanMetricsMap +} + +type StorageDirectoryStats struct { + Path string + + // The approximate number of spans present in this shard. This may be an + // underestimate. + ApproxNumSpans uint64 + + // leveldb.stats information + LevelDbStats string +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go index ccb09e0..487762b 100644 --- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go +++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go @@ -68,6 +68,13 @@ const HTRACE_LOG_PATH = "log.path" // The log level to use for the logs in htrace. const HTRACE_LOG_LEVEL = "log.level" +// The period between metrics heartbeats. This is the approximate interval at which we will +// update global metrics. +const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms" + +// The maximum number of addresses for which we will maintain metrics. +const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries" + // A host:port pair to send information to on startup. This is used in unit // tests to determine the (random) port of the htraced process that has been // started. @@ -83,4 +90,15 @@ var DEFAULTS = map[string]string{ HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100", HTRACE_LOG_PATH: "", HTRACE_LOG_LEVEL: "INFO", + HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000), + HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000", +} + +// Values to be used when creating test configurations +func TEST_VALUES() map[string]string { + return map[string]string{ + HTRACE_HRPC_ADDRESS: ":0", // use a random port for the HRPC server + HTRACE_LOG_LEVEL: "TRACE", // show all log messages in tests + HTRACE_WEB_ADDRESS: ":0", // use a random port for the REST server + } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go index f6972bb..749acdf 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go +++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go @@ -31,6 +31,7 @@ import ( "org/apache/htrace/common" "org/apache/htrace/conf" "os" + "sort" "strings" "time" ) @@ -196,15 +197,29 @@ func printServerStats(hcl *htrace.Client) int { fmt.Println(err.Error()) return EXIT_FAILURE } - fmt.Printf("HTraced server stats:\n") - fmt.Printf("%d leveldb shards.\n", len(stats.Shards)) - for i := range stats.Shards { - shard := stats.Shards[i] - fmt.Printf("==== %s ===\n", shard.Path) - fmt.Printf("Approximate number of spans: %d\n", shard.ApproxNumSpans) - stats := strings.Replace(shard.LevelDbStats, "\\n", "\n", -1) + fmt.Printf("HTRACED SERVER STATS:\n") + fmt.Printf("%d leveldb directories.\n", len(stats.Dirs)) + for i := range stats.Dirs { + dir := stats.Dirs[i] + fmt.Printf("==== %s ===\n", dir.Path) + fmt.Printf("Approximate number of spans: %d\n", dir.ApproxNumSpans) + stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1) fmt.Printf("%s\n", stats) } + fmt.Printf("HOST SPAN METRICS:\n") + mtxMap := stats.HostSpanMetrics + keys := make(sort.StringSlice, len(mtxMap)) + i := 0 + for k, _ := range mtxMap { + keys[i] = k + i++ + } + sort.Sort(keys) + for k := range keys { + mtx := mtxMap[keys[k]] + fmt.Printf("%s: written: %d, server dropped %d, client dropped %d\n", + keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped) + } return EXIT_SUCCESS } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go index 540e688..ca0a425 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go @@ -210,7 +210,7 @@ const EXAMPLE_CONF_VALUE = "foo.bar.baz" func TestClientGetServerConf(t *testing.T) { htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf", - Cnf: map[string]string { + Cnf: map[string]string{ EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE, }, DataDirs: make([]string, 2)} @@ -230,6 +230,6 @@ func TestClientGetServerConf(t *testing.T) { } if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE { t.Fatalf("unexpected value for %s: %s", - EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE) + EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE) } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index 9fb9920..780b6d2 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -31,7 +31,6 @@ import ( "os" "strconv" "strings" - "sync/atomic" ) // @@ -77,19 +76,12 @@ const DURATION_INDEX_PREFIX = 'd' const PARENT_ID_INDEX_PREFIX = 'p' const INVALID_INDEX_PREFIX = 0 -type Statistics struct { - NumSpansWritten uint64 -} - -func (stats *Statistics) IncrementWrittenSpans() { - atomic.AddUint64(&stats.NumSpansWritten, 1) -} +type IncomingSpan struct { + // The address that the span was sent from. + Addr string -// Make a copy of the statistics structure, using atomic operations. -func (stats *Statistics) Copy() *Statistics { - return &Statistics{ - NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten), - } + // The span. + *common.Span } // A single directory containing a levelDB instance. @@ -104,27 +96,48 @@ type shard struct { path string // Incoming requests to write Spans. - incoming chan *common.Span + incoming chan *IncomingSpan + + // A channel for incoming heartbeats + heartbeats chan interface{} // The channel we will send a bool to when we exit. exited chan bool + + // Per-address metrics + mtxMap ServerSpanMetricsMap + + // The maximum number of metrics to allow in our map + maxMtx int } // Process incoming spans for a shard. func (shd *shard) processIncoming() { lg := shd.store.lg + defer func() { + lg.Infof("Shard processor for %s exiting.\n", shd.path) + shd.exited <- true + }() for { - span := <-shd.incoming - if span == nil { - lg.Infof("Shard processor for %s exiting.\n", shd.path) - shd.exited <- true - return - } - err := shd.writeSpan(span) - if err != nil { - lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error()) - } else { - lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson()) + select { + case span := <-shd.incoming: + if span == nil { + return + } + err := shd.writeSpan(span) + if err != nil { + lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error()) + } else { + lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson()) + } + case <-shd.heartbeats: + lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path) + mtxMap := make(ServerSpanMetricsMap) + for addr, mtx := range shd.mtxMap { + mtxMap[addr] = mtx.Clone() + mtx.Clear() + } + shd.store.msink.UpdateMetrics(mtxMap) } } } @@ -150,15 +163,19 @@ func u64toSlice(val uint64) []byte { byte(0xff & (val >> 0))} } -func (shd *shard) writeSpan(span *common.Span) error { +func (shd *shard) writeSpan(ispan *IncomingSpan) error { batch := levigo.NewWriteBatch() defer batch.Close() // Add SpanData to batch. spanDataBuf := new(bytes.Buffer) spanDataEnc := gob.NewEncoder(spanDataBuf) + span := ispan.Span err := spanDataEnc.Encode(span.SpanData) if err != nil { + shd.store.lg.Errorf("Error encoding span %s: %s\n", + span.String(), err.Error()) + shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg) return err } primaryKey := @@ -185,9 +202,12 @@ func (shd *shard) writeSpan(span *common.Span) error { err = shd.ldb.Write(shd.store.writeOpts, batch) if err != nil { + shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n", + span.String(), shd.path, err.Error()) + shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg) return err } - shd.store.stats.IncrementWrittenSpans() + shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg) if shd.store.WrittenSpans != nil { shd.store.WrittenSpans <- span } @@ -238,9 +258,6 @@ type dataStore struct { // The shards which manage our LevelDB instances. shards []*shard - // I/O statistics for all shards. - stats Statistics - // The read options to use for LevelDB. readOpts *levigo.ReadOptions @@ -250,6 +267,12 @@ type dataStore struct { // If non-null, a channel we will send spans to once we finish writing them. This is only used // for testing. WrittenSpans chan *common.Span + + // The metrics sink. + msink *MetricsSink + + // The heartbeater which periodically asks shards to update the MetricsSink. + hb *Heartbeater } func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) { @@ -286,11 +309,24 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataSto } store.shards = append(store.shards, shd) } + store.msink = NewMetricsSink(cnf) for idx := range store.shards { shd := store.shards[idx] shd.exited = make(chan bool, 1) + shd.heartbeats = make(chan interface{}, 1) + shd.mtxMap = make(ServerSpanMetricsMap) + shd.maxMtx = store.msink.maxMtx go shd.processIncoming() } + store.hb = NewHeartbeater("DatastoreHeartbeater", + cnf.GetInt64(conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS), lg) + for shdIdx := range store.shards { + shd := store.shards[shdIdx] + store.hb.AddHeartbeatTarget(&HeartbeatTarget{ + name: fmt.Sprintf("shard(%s)", shd.path), + targetChan: shd.heartbeats, + }) + } return store, nil } @@ -372,7 +408,7 @@ func CreateShard(store *dataStore, cnf *conf.Config, path string, } spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) shd = &shard{store: store, ldb: ldb, path: path, - incoming: make(chan *common.Span, spanBufferSize)} + incoming: make(chan *IncomingSpan, spanBufferSize)} return shd, nil } @@ -406,16 +442,24 @@ func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error { return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes()) } -func (store *dataStore) GetStatistics() *Statistics { - return store.stats.Copy() +func (store *dataStore) GetSpanMetrics() common.SpanMetricsMap { + return store.msink.AccessTotals() } // Close the DataStore. func (store *dataStore) Close() { + if store.hb != nil { + store.hb.Shutdown() + store.hb = nil + } for idx := range store.shards { store.shards[idx].Close() store.shards[idx] = nil } + if store.msink != nil { + store.msink.Shutdown() + store.msink = nil + } if store.readOpts != nil { store.readOpts.Close() store.readOpts = nil @@ -435,7 +479,7 @@ func (store *dataStore) getShardIndex(sid common.SpanId) int { return int(sid.Hash32() % uint32(len(store.shards))) } -func (store *dataStore) WriteSpan(span *common.Span) { +func (store *dataStore) WriteSpan(span *IncomingSpan) { store.shards[store.getShardIndex(span.Id)].incoming <- span } @@ -954,11 +998,11 @@ func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) func (store *dataStore) ServerStats() *common.ServerStats { serverStats := common.ServerStats{ - Shards: make([]common.ShardStats, len(store.shards)), + Dirs: make([]common.StorageDirectoryStats, len(store.shards)), } for shardIdx := range store.shards { shard := store.shards[shardIdx] - serverStats.Shards[shardIdx].Path = shard.path + serverStats.Dirs[shardIdx].Path = shard.path r := levigo.Range{ Start: append([]byte{SPAN_ID_INDEX_PREFIX}, common.INVALID_SPAN_ID.Val()...), @@ -966,11 +1010,12 @@ func (store *dataStore) ServerStats() *common.ServerStats { common.INVALID_SPAN_ID.Val()...), } vals := shard.ldb.GetApproximateSizes([]levigo.Range{r}) - serverStats.Shards[shardIdx].ApproxNumSpans = vals[0] - serverStats.Shards[shardIdx].LevelDbStats = + serverStats.Dirs[shardIdx].ApproxNumSpans = vals[0] + serverStats.Dirs[shardIdx].LevelDbStats = shard.ldb.PropertyValue("leveldb.stats") store.lg.Infof("shard.ldb.PropertyValue(leveldb.stats)=%s\n", shard.ldb.PropertyValue("leveldb.stats")) } + serverStats.HostSpanMetrics = store.msink.AccessTotals() return &serverStats } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go index 0caa509..50d2891 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go @@ -73,10 +73,13 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{ func createSpans(spans []common.Span, store *dataStore) { for idx := range spans { - store.WriteSpan(&spans[idx]) + store.WriteSpan(&IncomingSpan{ + Addr: "127.0.0.1:1234", + Span: &spans[idx], + }) } // Wait the spans to be created - for i := 0; i < 3; i++ { + for i := 0; i < len(spans); i++ { <-store.WrittenSpans } } @@ -85,6 +88,9 @@ func createSpans(spans []common.Span, store *dataStore) { func TestDatastoreWriteAndRead(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead", + Cnf: map[string]string{ + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, WrittenSpans: make(chan *common.Span, 100)} ht, err := htraceBld.Build() if err != nil { @@ -92,9 +98,13 @@ func TestDatastoreWriteAndRead(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } + + waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ + "127.0.0.1:1234": &common.SpanMetrics{ + Written: uint64(len(SIMPLE_TEST_SPANS)), + }, + }) + span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001")) if span == nil { t.Fatal() @@ -147,6 +157,9 @@ func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query, func TestSimpleQuery(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery", + Cnf: map[string]string{ + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, WrittenSpans: make(chan *common.Span, 100)} ht, err := htraceBld.Build() if err != nil { @@ -154,9 +167,12 @@ func TestSimpleQuery(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } + waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ + "127.0.0.1:1234": &common.SpanMetrics{ + Written: uint64(len(SIMPLE_TEST_SPANS)), + }, + }) + testQuery(t, ht, &common.Query{ Predicates: []common.Predicate{ common.Predicate{ @@ -172,6 +188,9 @@ func TestSimpleQuery(t *testing.T) { func TestQueries2(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestQueries2", + Cnf: map[string]string{ + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, WrittenSpans: make(chan *common.Span, 100)} ht, err := htraceBld.Build() if err != nil { @@ -179,9 +198,11 @@ func TestQueries2(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } + waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ + "127.0.0.1:1234": &common.SpanMetrics{ + Written: uint64(len(SIMPLE_TEST_SPANS)), + }, + }) testQuery(t, ht, &common.Query{ Predicates: []common.Predicate{ common.Predicate{ @@ -224,6 +245,9 @@ func TestQueries2(t *testing.T) { func TestQueries3(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestQueries3", + Cnf: map[string]string{ + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, WrittenSpans: make(chan *common.Span, 100)} ht, err := htraceBld.Build() if err != nil { @@ -231,9 +255,11 @@ func TestQueries3(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } + waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ + "127.0.0.1:1234": &common.SpanMetrics{ + Written: uint64(len(SIMPLE_TEST_SPANS)), + }, + }) testQuery(t, ht, &common.Query{ Predicates: []common.Predicate{ common.Predicate{ @@ -276,6 +302,9 @@ func TestQueries3(t *testing.T) { func TestQueries4(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestQueries4", + Cnf: map[string]string{ + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, WrittenSpans: make(chan *common.Span, 100)} ht, err := htraceBld.Build() if err != nil { @@ -283,9 +312,11 @@ func TestQueries4(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } + waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ + "127.0.0.1:1234": &common.SpanMetrics{ + Written: uint64(len(SIMPLE_TEST_SPANS)), + }, + }) testQuery(t, ht, &common.Query{ Predicates: []common.Predicate{ common.Predicate{ @@ -320,6 +351,9 @@ func TestQueries4(t *testing.T) { func BenchmarkDatastoreWrites(b *testing.B) { htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites", + Cnf: map[string]string{ + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, WrittenSpans: make(chan *common.Span, b.N)} ht, err := htraceBld.Build() if err != nil { @@ -331,22 +365,28 @@ func BenchmarkDatastoreWrites(b *testing.B) { // Write many random spans. for n := 0; n < b.N; n++ { span := test.NewRandomSpan(rnd, allSpans[0:n]) - ht.Store.WriteSpan(span) + ht.Store.WriteSpan(&IncomingSpan{ + Addr: "127.0.0.1:1234", + Span: span, + }) allSpans[n] = span } // Wait for all the spans to be written. for n := 0; n < b.N; n++ { <-ht.Store.WrittenSpans } - spansWritten := ht.Store.GetStatistics().NumSpansWritten - if spansWritten < uint64(b.N) { - b.Fatal("incorrect statistics: expected %d spans to be written, but only got %d", - b.N, spansWritten) - } + waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ + "127.0.0.1:1234": &common.SpanMetrics{ + Written: uint64(b.N), // should be less than? + }, + }) } func TestReloadDataStore(t *testing.T) { htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore", + Cnf: map[string]string{ + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, DataDirs: make([]string, 2), KeepDataDirsOnClose: true} ht, err := htraceBld.Build() if err != nil { @@ -444,6 +484,9 @@ func TestReloadDataStore(t *testing.T) { func TestQueriesWithContinuationTokens1(t *testing.T) { t.Parallel() htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1", + Cnf: map[string]string{ + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, WrittenSpans: make(chan *common.Span, 100)} ht, err := htraceBld.Build() if err != nil { @@ -451,9 +494,11 @@ func TestQueriesWithContinuationTokens1(t *testing.T) { } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { - t.Fatal() - } + waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ + "127.0.0.1:1234": &common.SpanMetrics{ + Written: uint64(len(SIMPLE_TEST_SPANS)), + }, + }) // Adding a prev value to this query excludes the first result that we // would normally get. testQuery(t, ht, &common.Query{ http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go new file mode 100644 index 0000000..140b50d --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "org/apache/htrace/common" + "time" +) + +type Heartbeater struct { + // The name of this heartbeater + name string + + // How long to sleep between heartbeats, in milliseconds. + periodMs int64 + + // The logger to use. + lg *common.Logger + + // The channels to send the heartbeat on. + targets []HeartbeatTarget + + // Incoming requests to the heartbeater. When this is closed, the + // heartbeater will exit. + req chan *HeartbeatTarget +} + +type HeartbeatTarget struct { + // The name of the heartbeat target. + name string + + // The channel for the heartbeat target. + targetChan chan interface{} +} + +func (tgt *HeartbeatTarget) String() string { + return tgt.name +} + +func NewHeartbeater(name string, periodMs int64, lg *common.Logger) *Heartbeater { + hb := &Heartbeater{ + name: name, + periodMs: periodMs, + lg: lg, + targets: make([]HeartbeatTarget, 0, 4), + req: make(chan *HeartbeatTarget), + } + go hb.run() + return hb +} + +func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) { + hb.req <- tgt +} + +func (hb *Heartbeater) Shutdown() { + close(hb.req) +} + +func (hb *Heartbeater) String() string { + return hb.name +} + +func (hb *Heartbeater) run() { + period := time.Duration(hb.periodMs) * time.Millisecond + for { + periodEnd := time.Now().Add(period) + for { + timeToWait := periodEnd.Sub(time.Now()) + if timeToWait <= 0 { + break + } else if timeToWait > period { + // Smooth over jitter or clock changes + timeToWait = period + periodEnd = time.Now().Add(period) + } + select { + case tgt, open := <-hb.req: + if !open { + hb.lg.Debugf("%s: exiting.\n", hb.String()) + return + } + hb.targets = append(hb.targets, *tgt) + hb.lg.Debugf("%s: added %s.\n", hb.String(), tgt.String()) + case <-time.After(timeToWait): + } + } + for targetIdx := range hb.targets { + select { + case hb.targets[targetIdx].targetChan <- nil: + default: + // We failed to send a heartbeat because the other goroutine was busy and + // hasn't cleared the previous one from its channel. This could indicate a + // stuck goroutine. + hb.lg.Infof("%s: could not send heartbeat to %s.\n", + hb.String(), hb.targets[targetIdx]) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go new file mode 100644 index 0000000..cbde7fc --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "org/apache/htrace/common" + "org/apache/htrace/conf" + "testing" + "time" +) + +func TestHeartbeaterStartupShutdown(t *testing.T) { + cnfBld := conf.Builder{ + Values: conf.TEST_VALUES(), + Defaults: conf.DEFAULTS, + } + cnf, err := cnfBld.Build() + if err != nil { + t.Fatalf("failed to create conf: %s", err.Error()) + } + lg := common.NewLogger("heartbeater", cnf) + hb := NewHeartbeater("ExampleHeartbeater", 1, lg) + if hb.String() != "ExampleHeartbeater" { + t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater") + } + hb.Shutdown() +} + +// The number of milliseconds between heartbeats +const HEARTBEATER_PERIOD = 5 + +// The number of heartbeats to send in the test. +const NUM_TEST_HEARTBEATS = 3 + +func TestHeartbeaterSendsHeartbeats(t *testing.T) { + cnfBld := conf.Builder{ + Values: conf.TEST_VALUES(), + Defaults: conf.DEFAULTS, + } + cnf, err := cnfBld.Build() + if err != nil { + t.Fatalf("failed to create conf: %s", err.Error()) + } + lg := common.NewLogger("heartbeater", cnf) + // The minimum amount of time which the heartbeater test should take + MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * HEARTBEATER_PERIOD) + duration := MINIMUM_TEST_DURATION + for duration <= MINIMUM_TEST_DURATION { + start := time.Now() + testHeartbeaterSendsHeartbeatsImpl(t, lg) + end := time.Now() + duration = end.Sub(start) + lg.Debugf("Measured duration: %v; minimum expected duration: %v\n", + duration, MINIMUM_TEST_DURATION) + } +} + +func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) { + hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg) + if hb.String() != "ExampleHeartbeater" { + t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater") + } + testChan := make(chan interface{}, NUM_TEST_HEARTBEATS) + gotAllHeartbeats := make(chan bool) + hb.AddHeartbeatTarget(&HeartbeatTarget{ + name: "ExampleHeartbeatTarget", + targetChan: testChan, + }) + go func() { + for i := 0; i < NUM_TEST_HEARTBEATS; i++ { + <-testChan + } + gotAllHeartbeats <- true + for i := 0; i < NUM_TEST_HEARTBEATS; i++ { + _, open := <-testChan + if !open { + return + } + } + }() + <-gotAllHeartbeats + hb.Shutdown() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go index 354d064..49587bb 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go @@ -32,6 +32,7 @@ import ( "net/rpc" "org/apache/htrace/common" "org/apache/htrace/conf" + "reflect" ) // Handles HRPC calls @@ -109,9 +110,10 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { } func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { + remoteAddr := cdc.conn.RemoteAddr() if cdc.lg.TraceEnabled() { cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n", - cdc.length, cdc.conn.RemoteAddr()) + cdc.length, remoteAddr) } mh := new(codec.MsgpackHandle) mh.WriteExt = true @@ -119,11 +121,16 @@ func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { err := dec.Decode(body) if err != nil { return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read request "+ - "body from %s: %s", cdc.conn.RemoteAddr(), err.Error())) + "body from %s: %s", remoteAddr, err.Error())) } if cdc.lg.TraceEnabled() { cdc.lg.Tracef("Read body from %s: %s\n", - cdc.conn.RemoteAddr(), asJson(&body)) + remoteAddr, asJson(&body)) + } + val := reflect.ValueOf(body) + addr := val.Elem().FieldByName("Addr") + if addr.IsValid() { + addr.SetString(remoteAddr.String()) } return nil } @@ -203,7 +210,10 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, if hand.lg.TraceEnabled() { hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson()) } - hand.store.WriteSpan(span) + hand.store.WriteSpan(&IncomingSpan{ + Addr: req.Addr, + Span: span, + }) } return nil } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go new file mode 100644 index 0000000..672f5f6 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "encoding/json" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "sync" +) + +// +// The Metrics Sink for HTraced. +// +// The Metrics sink keeps track of metrics for the htraced daemon. +// It is important to have good metrics so that we can properly manager htraced. In particular, we +// need to know what rate we are receiving spans at, the main places spans came from. If spans +// were dropped because of a high sampling rates, we need to know which part of the system dropped +// them so that we can adjust the sampling rate there. +// + +type ServerSpanMetrics struct { + // The total number of spans written to HTraced. + Written uint64 + + // The total number of spans dropped by the server. + ServerDropped uint64 +} + +func (spm *ServerSpanMetrics) Clone() *ServerSpanMetrics { + return &ServerSpanMetrics{ + Written: spm.Written, + ServerDropped: spm.ServerDropped, + } +} + +func (spm *ServerSpanMetrics) String() string { + jbytes, err := json.Marshal(*spm) + if err != nil { + panic(err) + } + return string(jbytes) +} + +func (spm *ServerSpanMetrics) Add(ospm *ServerSpanMetrics) { + spm.Written += ospm.Written + spm.ServerDropped += ospm.ServerDropped +} + +func (spm *ServerSpanMetrics) Clear() { + spm.Written = 0 + spm.ServerDropped = 0 +} + +// A map from network address strings to ServerSpanMetrics structures. +type ServerSpanMetricsMap map[string]*ServerSpanMetrics + +func (smtxMap ServerSpanMetricsMap) IncrementDropped(addr string, maxMtx int, + lg *common.Logger) { + mtx := smtxMap[addr] + if mtx == nil { + mtx = &ServerSpanMetrics{} + smtxMap[addr] = mtx + } + mtx.ServerDropped++ + smtxMap.Prune(maxMtx, lg) +} + +func (smtxMap ServerSpanMetricsMap) IncrementWritten(addr string, maxMtx int, + lg *common.Logger) { + mtx := smtxMap[addr] + if mtx == nil { + mtx = &ServerSpanMetrics{} + smtxMap[addr] = mtx + } + mtx.Written++ + smtxMap.Prune(maxMtx, lg) +} + +func (smtxMap ServerSpanMetricsMap) Prune(maxMtx int, lg *common.Logger) { + if len(smtxMap) >= maxMtx { + // Delete a random entry + for k := range smtxMap { + lg.Warnf("Evicting metrics entry for addr %s "+ + "because there are more than %d addrs.\n", k, maxMtx) + delete(smtxMap, k) + return + } + } +} + +type AccessReq struct { + mtxMap common.SpanMetricsMap + done chan interface{} +} + +type MetricsSink struct { + // The total span metrics. + smtxMap ServerSpanMetricsMap + + // A channel of incoming shard metrics. + // When this is shut down, the MetricsSink will exit. + updateReqs chan ServerSpanMetricsMap + + // A channel of incoming requests for shard metrics. + accessReqs chan *AccessReq + + // This will be closed when the MetricsSink has exited. + exited chan interface{} + + // The logger used by this MetricsSink. + lg *common.Logger + + // The maximum number of metrics totals we will maintain. + maxMtx int + + // The number of spans which each client has self-reported that it has + // dropped. + clientDroppedMap map[string]uint64 + + // Lock protecting clientDropped + clientDroppedLock sync.Mutex +} + +func NewMetricsSink(cnf *conf.Config) *MetricsSink { + mcl := MetricsSink{ + smtxMap: make(ServerSpanMetricsMap), + updateReqs: make(chan ServerSpanMetricsMap, 128), + accessReqs: make(chan *AccessReq), + exited: make(chan interface{}), + lg: common.NewLogger("metrics", cnf), + maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES), + clientDroppedMap: make(map[string]uint64), + } + go mcl.run() + return &mcl +} + +func (msink *MetricsSink) run() { + lg := msink.lg + defer func() { + lg.Info("MetricsSink: stopping service goroutine.\n") + close(msink.exited) + }() + lg.Tracef("MetricsSink: starting.\n") + for { + select { + case updateReq, open := <-msink.updateReqs: + if !open { + lg.Trace("MetricsSink: shutting down cleanly.\n") + return + } + for addr, umtx := range updateReq { + smtx := msink.smtxMap[addr] + if smtx == nil { + smtx = &ServerSpanMetrics{} + msink.smtxMap[addr] = smtx + } + smtx.Add(umtx) + if lg.TraceEnabled() { + lg.Tracef("MetricsSink: updated %s to %s\n", addr, smtx.String()) + } + } + msink.smtxMap.Prune(msink.maxMtx, lg) + case accessReq := <-msink.accessReqs: + msink.handleAccessReq(accessReq) + } + } +} + +func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) { + msink.lg.Debug("MetricsSink: accessing global metrics.\n") + msink.clientDroppedLock.Lock() + defer func() { + msink.clientDroppedLock.Unlock() + close(accessReq.done) + }() + for addr, smtx := range msink.smtxMap { + accessReq.mtxMap[addr] = &common.SpanMetrics{ + Written: smtx.Written, + ServerDropped: smtx.ServerDropped, + ClientDropped: msink.clientDroppedMap[addr], + } + } +} + +func (msink *MetricsSink) AccessTotals() common.SpanMetricsMap { + accessReq := &AccessReq{ + mtxMap: make(common.SpanMetricsMap), + done: make(chan interface{}), + } + msink.accessReqs <- accessReq + <-accessReq.done + return accessReq.mtxMap +} + +func (msink *MetricsSink) UpdateMetrics(mtxMap ServerSpanMetricsMap) { + msink.updateReqs <- mtxMap +} + +func (msink *MetricsSink) Shutdown() { + close(msink.updateReqs) + <-msink.exited +} + +func (msink *MetricsSink) UpdateClientDropped(client string, clientDropped uint64) { + msink.clientDroppedLock.Lock() + defer msink.clientDroppedLock.Unlock() + msink.clientDroppedMap[client] = clientDropped + if len(msink.clientDroppedMap) >= msink.maxMtx { + // Delete a random entry + for k := range msink.clientDroppedMap { + delete(msink.clientDroppedMap, k) + return + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go new file mode 100644 index 0000000..c90d1da --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "org/apache/htrace/common" + "org/apache/htrace/conf" + "reflect" + "testing" + "time" +) + +func TestMetricsSinkStartupShutdown(t *testing.T) { + cnfBld := conf.Builder{ + Values: conf.TEST_VALUES(), + Defaults: conf.DEFAULTS, + } + cnf, err := cnfBld.Build() + if err != nil { + t.Fatalf("failed to create conf: %s", err.Error()) + } + msink := NewMetricsSink(cnf) + msink.Shutdown() +} + +func TestAddSpanMetrics(t *testing.T) { + a := &ServerSpanMetrics{ + Written: 100, + ServerDropped: 200, + } + b := &ServerSpanMetrics{ + Written: 500, + ServerDropped: 100, + } + a.Add(b) + if a.Written != 600 { + t.Fatalf("SpanMetrics#Add failed to update #Written") + } + if a.ServerDropped != 300 { + t.Fatalf("SpanMetrics#Add failed to update #Dropped") + } + if b.Written != 500 { + t.Fatalf("SpanMetrics#Add updated b#Written") + } + if b.ServerDropped != 100 { + t.Fatalf("SpanMetrics#Add updated b#Dropped") + } +} + +func compareTotals(a, b common.SpanMetricsMap) bool { + for k, v := range a { + if !reflect.DeepEqual(v, b[k]) { + return false + } + } + for k, v := range b { + if !reflect.DeepEqual(v, a[k]) { + return false + } + } + return true +} + +func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) { + for { + time.Sleep(1 * time.Millisecond) + totals := msink.AccessTotals() + if compareTotals(totals, expectedTotals) { + return + } + } +} + +func TestMetricsSinkMessages(t *testing.T) { + cnfBld := conf.Builder{ + Values: conf.TEST_VALUES(), + Defaults: conf.DEFAULTS, + } + cnf, err := cnfBld.Build() + if err != nil { + t.Fatalf("failed to create conf: %s", err.Error()) + } + msink := NewMetricsSink(cnf) + totals := msink.AccessTotals() + if len(totals) != 0 { + t.Fatalf("Expected no data in the MetricsSink to start with.") + } + msink.UpdateMetrics(ServerSpanMetricsMap{ + "192.168.0.100": &ServerSpanMetrics{ + Written: 20, + ServerDropped: 10, + }, + }) + waitForMetrics(msink, common.SpanMetricsMap{ + "192.168.0.100": &common.SpanMetrics{ + Written: 20, + ServerDropped: 10, + }, + }) + msink.UpdateMetrics(ServerSpanMetricsMap{ + "192.168.0.100": &ServerSpanMetrics{ + Written: 200, + ServerDropped: 100, + }, + }) + msink.UpdateMetrics(ServerSpanMetricsMap{ + "192.168.0.100": &ServerSpanMetrics{ + Written: 1000, + ServerDropped: 1000, + }, + }) + waitForMetrics(msink, common.SpanMetricsMap{ + "192.168.0.100": &common.SpanMetrics{ + Written: 1220, + ServerDropped: 1110, + }, + }) + msink.UpdateMetrics(ServerSpanMetricsMap{ + "192.168.0.200": &ServerSpanMetrics{ + Written: 200, + ServerDropped: 100, + }, + }) + waitForMetrics(msink, common.SpanMetricsMap{ + "192.168.0.100": &common.SpanMetrics{ + Written: 1220, + ServerDropped: 1110, + }, + "192.168.0.200": &common.SpanMetrics{ + Written: 200, + ServerDropped: 100, + }, + }) + msink.Shutdown() +} + +func TestMetricsSinkMessagesEviction(t *testing.T) { + cnfBld := conf.Builder{ + Values: conf.TEST_VALUES(), + Defaults: conf.DEFAULTS, + } + cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2" + cnfBld.Values[conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS] = "1" + cnf, err := cnfBld.Build() + if err != nil { + t.Fatalf("failed to create conf: %s", err.Error()) + } + msink := NewMetricsSink(cnf) + msink.UpdateMetrics(ServerSpanMetricsMap{ + "192.168.0.100": &ServerSpanMetrics{ + Written: 20, + ServerDropped: 10, + }, + "192.168.0.101": &ServerSpanMetrics{ + Written: 20, + ServerDropped: 10, + }, + "192.168.0.102": &ServerSpanMetrics{ + Written: 20, + ServerDropped: 10, + }, + }) + for { + totals := msink.AccessTotals() + if len(totals) == 2 { + break + } + } + msink.Shutdown() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go index a54f2cb..c2300c4 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go @@ -90,11 +90,15 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { } } } + // Copy the default test configuration values. + for k, v := range conf.TEST_VALUES() { + _, hasVal := bld.Cnf[k] + if !hasVal { + bld.Cnf[k] = v + } + } bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] = strings.Join(bld.DataDirs, conf.PATH_LIST_SEP) - bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server - bld.Cnf[conf.HTRACE_HRPC_ADDRESS] = ":0" // use a random port for the HRPC server - bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE" cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS} cnf, err := cnfBld.Build() if err != nil { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go index 16c3a75..eca3f08 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go @@ -90,7 +90,7 @@ func (hand *serverStatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque type serverConfHandler struct { cnf *conf.Config - lg *common.Logger + lg *common.Logger } func (hand *serverConfHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -233,9 +233,13 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques if spanIdProblem != "" { hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", spanIdProblem)) } else { - hand.store.WriteSpan(span) + hand.store.WriteSpan(&IncomingSpan{ + Addr: req.RemoteAddr, + Span: span, + }) } } + hand.store.msink.UpdateClientDropped(req.RemoteAddr, msg.ClientDropped) } type queryHandler struct {
