Repository: incubator-htrace Updated Branches: refs/heads/master 2fc552c12 -> be749f1c5
HTRACE-282. htraced: reap spans which are older than a configurable interval (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/be749f1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/be749f1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/be749f1c Branch: refs/heads/master Commit: be749f1c5ac4cb890be4f36c9d239270c82d099d Parents: 2fc552c Author: Colin P. Mccabe <[email protected]> Authored: Fri Oct 30 09:53:24 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Fri Oct 30 09:53:24 2015 -0700 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/common/rpc.go | 10 + .../go/src/org/apache/htrace/common/time.go | 34 +++ .../src/org/apache/htrace/common/time_test.go | 38 +++ .../src/org/apache/htrace/conf/config_keys.go | 8 + .../go/src/org/apache/htrace/htrace/cmd.go | 21 +- .../src/org/apache/htrace/htraced/datastore.go | 260 +++++++++++++++++-- .../org/apache/htrace/htraced/heartbeater.go | 8 +- .../org/apache/htrace/htraced/reaper_test.go | 86 ++++++ 8 files changed, 440 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/common/rpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go index 34ed15e..5e57f08 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go @@ -114,6 +114,16 @@ type ServerStats struct { // Per-host Span Metrics HostSpanMetrics SpanMetricsMap + + // The time (in UTC milliseconds since the epoch) when the + // datastore was last started. + LastStartMs int64 + + // The current time (in UTC milliseconds since the epoch) on the server. + CurMs int64 + + // The total number of spans which have been reaped. + ReapedSpans uint64 } type StorageDirectoryStats struct { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/common/time.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time.go b/htrace-htraced/go/src/org/apache/htrace/common/time.go new file mode 100644 index 0000000..8b4b6b8 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/common/time.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "time" +) + +func TimeToUnixMs(t time.Time) int64 { + return t.UnixNano() / 1000000 +} + +func UnixMsToTime(u int64) time.Time { + secs := u / 1000 + nanos := u - (secs * 1000) + return time.Unix(secs, nanos) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/common/time_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go new file mode 100644 index 0000000..11e2733 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "testing" +) + +func testRoundTrip(t *testing.T, u int64) { + tme := UnixMsToTime(u) + u2 := TimeToUnixMs(tme) + if u2 != u { + t.Fatalf("Error taking %d on a round trip: came back as "+ + "%d instead.\n", u, u2) + } +} + +func TestTimeConversions(t *testing.T) { + testRoundTrip(t, 0) + testRoundTrip(t, 1445540632000) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go index 487762b..ed809f9 100644 --- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go +++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go @@ -75,6 +75,12 @@ const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms" // The maximum number of addresses for which we will maintain metrics. const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries" +// The number of milliseconds we should keep spans before discarding them. +const HTRACE_SPAN_EXPIRY_MS = "span.expiry.ms" + +// The period between updates to the span reaper +const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms" + // A host:port pair to send information to on startup. This is used in unit // tests to determine the (random) port of the htraced process that has been // started. @@ -92,6 +98,8 @@ var DEFAULTS = map[string]string{ HTRACE_LOG_LEVEL: "INFO", HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000), HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000", + HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 3*24*60*60*1000), + HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000), } // Values to be used when creating test configurations http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go index 749acdf..e7286ff 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go +++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go @@ -33,6 +33,7 @@ import ( "os" "sort" "strings" + "text/tabwriter" "time" ) @@ -197,8 +198,17 @@ func printServerStats(hcl *htrace.Client) int { fmt.Println(err.Error()) return EXIT_FAILURE } - fmt.Printf("HTRACED SERVER STATS:\n") - fmt.Printf("%d leveldb directories.\n", len(stats.Dirs)) + w := new(tabwriter.Writer) + w.Init(os.Stdout, 0, 8, 0, '\t', 0) + fmt.Fprintf(w, "HTRACED SERVER STATS\n") + fmt.Fprintf(w, "Datastore Start\t%s\n", + common.UnixMsToTime(stats.LastStartMs).Format(time.RFC3339)) + fmt.Fprintf(w, "Server Time\t%s\n", + common.UnixMsToTime(stats.CurMs).Format(time.RFC3339)) + fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans) + fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs)) + w.Flush() + fmt.Println("") for i := range stats.Dirs { dir := stats.Dirs[i] fmt.Printf("==== %s ===\n", dir.Path) @@ -206,7 +216,9 @@ func printServerStats(hcl *htrace.Client) int { stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1) fmt.Printf("%s\n", stats) } - fmt.Printf("HOST SPAN METRICS:\n") + w = new(tabwriter.Writer) + w.Init(os.Stdout, 0, 8, 0, '\t', 0) + fmt.Fprintf(w, "HOST SPAN METRICS\n") mtxMap := stats.HostSpanMetrics keys := make(sort.StringSlice, len(mtxMap)) i := 0 @@ -217,9 +229,10 @@ func printServerStats(hcl *htrace.Client) int { sort.Sort(keys) for k := range keys { mtx := mtxMap[keys[k]] - fmt.Printf("%s: written: %d, server dropped %d, client dropped %d\n", + fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped: %d\n", keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped) } + w.Flush() return EXIT_SUCCESS } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index 780b6d2..5d5559a 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -31,6 +31,9 @@ import ( "os" "strconv" "strings" + "sync" + "sync/atomic" + "time" ) // @@ -138,10 +141,83 @@ func (shd *shard) processIncoming() { mtx.Clear() } shd.store.msink.UpdateMetrics(mtxMap) + shd.pruneExpired() } } } +func (shd *shard) pruneExpired() { + lg := shd.store.rpr.lg + src, err := CreateReaperSource(shd) + if err != nil { + lg.Errorf("Error creating reaper source for shd(%s): %s\n", + shd.path, err.Error()) + return + } + var totalReaped uint64 + defer func() { + src.Close() + if totalReaped > 0 { + atomic.AddUint64(&shd.store.rpr.ReapedSpans, totalReaped) + } + }() + urdate := s2u64(shd.store.rpr.GetReaperDate()) + for { + span := src.next() + if span == nil { + lg.Debugf("After reaping %d span(s), no more found in shard %s "+ + "to reap.\n", totalReaped, shd.path) + return + } + begin := s2u64(span.Begin) + if begin >= urdate { + lg.Debugf("After reaping %d span(s), the remaining spans in "+ + "shard %s are new enough to be kept\n", + totalReaped, shd.path) + return + } + err = shd.DeleteSpan(span) + if err != nil { + lg.Errorf("Error deleting span %s from shd(%s): %s\n", + span.String(), shd.path, err.Error()) + return + } + if lg.TraceEnabled() { + lg.Tracef("Reaped span %s from shard %s\n", span.String(), shd.path) + } + totalReaped++ + } +} + +// Delete a span from the shard. Note that leveldb may retain the data until +// compaction(s) remove it. +func (shd *shard) DeleteSpan(span *common.Span) error { + batch := levigo.NewWriteBatch() + defer batch.Close() + primaryKey := + append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...) + batch.Delete(primaryKey) + for parentIdx := range span.Parents { + key := append(append([]byte{PARENT_ID_INDEX_PREFIX}, + span.Parents[parentIdx].Val()...), span.Id.Val()...) + batch.Delete(key) + } + beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX}, + u64toSlice(s2u64(span.Begin))...), span.Id.Val()...) + batch.Delete(beginTimeKey) + endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX}, + u64toSlice(s2u64(span.End))...), span.Id.Val()...) + batch.Delete(endTimeKey) + durationKey := append(append([]byte{DURATION_INDEX_PREFIX}, + u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...) + batch.Delete(durationKey) + err := shd.ldb.Write(shd.store.writeOpts, batch) + if err != nil { + return err + } + return nil +} + // Convert a signed 64-bit number into an unsigned 64-bit number. We flip the // highest bit, so that negative input values map to unsigned numbers which are // less than non-negative input values. @@ -251,6 +327,107 @@ func (shd *shard) Close() { lg.Infof("Closed %s...\n", shd.path) } +type Reaper struct { + // The logger used by the reaper + lg *common.Logger + + // The number of milliseconds to keep spans around, in milliseconds. + spanExpiryMs int64 + + // The oldest date for which we'll keep spans. + reaperDate int64 + + // A channel used to send heartbeats to the reaper + heartbeats chan interface{} + + // A channel used to block until the reaper goroutine has exited. + exited chan interface{} + + // The lock protecting reaper data. + lock sync.Mutex + + // The reaper heartbeater + hb *Heartbeater + + // The total number of spans which have been reaped. + ReapedSpans uint64 +} + +func NewReaper(cnf *conf.Config) *Reaper { + rpr := &Reaper{ + lg: common.NewLogger("reaper", cnf), + spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS), + heartbeats: make(chan interface{}, 1), + exited: make(chan interface{}), + } + rpr.hb = NewHeartbeater("ReaperHeartbeater", + cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg) + go rpr.run() + rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{ + name: "reaper", + targetChan: rpr.heartbeats, + }) + return rpr +} + +func (rpr *Reaper) run() { + defer func() { + rpr.lg.Info("Exiting Reaper goroutine.\n") + rpr.exited <- nil + }() + + for { + _, isOpen := <-rpr.heartbeats + if !isOpen { + return + } + rpr.handleHeartbeat() + } +} + +func (rpr *Reaper) handleHeartbeat() { + // TODO: check dataStore fullness + now := common.TimeToUnixMs(time.Now().UTC()) + d, updated := func() (int64, bool) { + rpr.lock.Lock() + defer rpr.lock.Unlock() + newReaperDate := now - rpr.spanExpiryMs + if newReaperDate > rpr.reaperDate { + rpr.reaperDate = newReaperDate + return rpr.reaperDate, true + } else { + return rpr.reaperDate, false + } + }() + if rpr.lg.DebugEnabled() { + if updated { + rpr.lg.Debugf("Updating UTC reaper date to %s.\n", + common.UnixMsToTime(d).Format(time.RFC3339)) + } else { + rpr.lg.Debugf("Not updating previous reaperDate of %s.\n", + common.UnixMsToTime(d).Format(time.RFC3339)) + } + } +} + +func (rpr *Reaper) GetReaperDate() int64 { + rpr.lock.Lock() + defer rpr.lock.Unlock() + return rpr.reaperDate +} + +func (rpr *Reaper) SetReaperDate(rdate int64) { + rpr.lock.Lock() + defer rpr.lock.Unlock() + rpr.reaperDate = rdate +} + +func (rpr *Reaper) Shutdown() { + rpr.hb.Shutdown() + close(rpr.heartbeats) + <-rpr.exited +} + // The Data Store. type dataStore struct { lg *common.Logger @@ -273,6 +450,12 @@ type dataStore struct { // The heartbeater which periodically asks shards to update the MetricsSink. hb *Heartbeater + + // The reaper for this datastore + rpr *Reaper + + // When this datastore was started (in UTC milliseconds since the epoch) + startMs int64 } func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) { @@ -327,6 +510,8 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataSto targetChan: shd.heartbeats, }) } + store.rpr = NewReaper(cnf) + store.startMs = common.TimeToUnixMs(time.Now().UTC()) return store, nil } @@ -456,6 +641,10 @@ func (store *dataStore) Close() { store.shards[idx].Close() store.shards[idx] = nil } + if store.rpr != nil { + store.rpr.Shutdown() + store.rpr = nil + } if store.msink != nil { store.msink.Shutdown() store.msink = nil @@ -502,8 +691,8 @@ func (shd *shard) FindSpan(sid common.SpanId) *common.Span { var span *common.Span span, err = shd.decodeSpan(sid, buf) if err != nil { - lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s\n", - shd.path, sid.String(), err.Error()) + lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding [%s]\n", + shd.path, sid.String(), err.Error(), hex.EncodeToString(buf)) return nil } return span @@ -704,6 +893,7 @@ func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*s var ret *source src := source{store: store, pred: pred, + shards: make([]*shard, len(store.shards)), iters: make([]*levigo.Iterator, 0, len(store.shards)), nexts: make([]*common.Span, len(store.shards)), numRead: make([]int, len(store.shards)), @@ -720,6 +910,7 @@ func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*s }() for shardIdx := range store.shards { shd := store.shards[shardIdx] + src.shards[shardIdx] = shd src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts)) } var searchKey []byte @@ -804,12 +995,41 @@ func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*s type source struct { store *dataStore pred *predicateData + shards []*shard iters []*levigo.Iterator nexts []*common.Span numRead []int keyPrefix byte } +func CreateReaperSource(shd *shard) (*source, error) { + store := shd.store + p := &common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: common.INVALID_SPAN_ID.String(), + } + pred, err := loadPredicateData(p) + if err != nil { + return nil, err + } + src := &source{ + store: store, + pred: pred, + shards: []*shard{shd}, + iters: make([]*levigo.Iterator, 1), + nexts: make([]*common.Span, 1), + numRead: make([]int, 1), + keyPrefix: pred.getIndexPrefix(), + } + iter := shd.ldb.NewIterator(store.readOpts) + src.iters[0] = iter + searchKey := append(append([]byte{src.keyPrefix}, pred.key...), + pred.key...) + iter.Seek(searchKey) + return src, nil +} + // Return true if this operation may require skipping the first result we get back from leveldb. func mayRequireOneSkip(op common.Op) bool { switch op { @@ -834,24 +1054,25 @@ func (src *source) populateNextFromShard(shardIdx int) { lg := src.store.lg var err error iter := src.iters[shardIdx] + shdPath := src.shards[shardIdx].path if iter == nil { - lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx) + lg.Debugf("Can't populate: No more entries in shard %s\n", shdPath) return // There are no more entries in this shard. } if src.nexts[shardIdx] != nil { - lg.Debugf("No need to populate shard %d\n", shardIdx) + lg.Debugf("No need to populate shard %s\n", shdPath) return // We already have a valid entry for this shard. } for { if !iter.Valid() { - lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx) + lg.Debugf("Can't populate: Iterator for shard %s is no longer valid.\n", shdPath) break // Can't read past end of DB } src.numRead[shardIdx]++ key := iter.Key() if !bytes.HasPrefix(key, []byte{src.keyPrefix}) { - lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s\n", - shardIdx, string(src.keyPrefix)) + lg.Debugf("Can't populate: Iterator for shard %s does not have prefix %s\n", + shdPath, string(src.keyPrefix)) break // Can't read past end of indexed section } var span *common.Span @@ -859,19 +1080,19 @@ func (src *source) populateNextFromShard(shardIdx int) { if src.keyPrefix == SPAN_ID_INDEX_PREFIX { // The span id maps to the span itself. sid = common.SpanId(key[1:17]) - span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value()) + span, err = src.shards[shardIdx].decodeSpan(sid, iter.Value()) if err != nil { - lg.Debugf("Internal error decoding span %s in shard %d: %s\n", - sid.String(), shardIdx, err.Error()) + lg.Debugf("Internal error decoding span %s in shard %s: %s\n", + sid.String(), shdPath, err.Error()) break } } else { // With a secondary index, we have to look up the span by id. sid = common.SpanId(key[9:25]) - span = src.store.shards[shardIdx].FindSpan(sid) + span = src.shards[shardIdx].FindSpan(sid) if span == nil { - lg.Debugf("Internal error rehydrating span %s in shard %d\n", - sid.String(), shardIdx) + lg.Debugf("Internal error rehydrating span %s in shard %s\n", + sid.String(), shdPath) break } } @@ -881,12 +1102,12 @@ func (src *source) populateNextFromShard(shardIdx int) { iter.Next() } if src.pred.satisfiedBy(span) { - lg.Debugf("Populated valid span %v from shard %d.\n", sid, shardIdx) + lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath) src.nexts[shardIdx] = span // Found valid entry return } else { - lg.Debugf("Span %s from shard %d does not satisfy the predicate.\n", - sid.String(), shardIdx) + lg.Debugf("Span %s from shard %s does not satisfy the predicate.\n", + sid.String(), shdPath) if src.numRead[shardIdx] <= 1 && mayRequireOneSkip(src.pred.Op) { continue } @@ -894,13 +1115,13 @@ func (src *source) populateNextFromShard(shardIdx int) { break } } - lg.Debugf("Closing iterator for shard %d.\n", shardIdx) + lg.Debugf("Closing iterator for shard %s.\n", shdPath) iter.Close() src.iters[shardIdx] = nil } func (src *source) next() *common.Span { - for shardIdx := range src.iters { + for shardIdx := range src.shards { src.populateNextFromShard(shardIdx) } var best *common.Span @@ -1017,5 +1238,8 @@ func (store *dataStore) ServerStats() *common.ServerStats { shard.ldb.PropertyValue("leveldb.stats")) } serverStats.HostSpanMetrics = store.msink.AccessTotals() + serverStats.LastStartMs = store.startMs + serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC()) + serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans) return &serverStats } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go index ea4b053..49a21ee 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go @@ -21,8 +21,8 @@ package main import ( "org/apache/htrace/common" - "time" "sync" + "time" ) type Heartbeater struct { @@ -84,6 +84,10 @@ func (hb *Heartbeater) String() string { } func (hb *Heartbeater) run() { + defer func() { + hb.lg.Debugf("%s: exiting.\n", hb.String()) + hb.wg.Done() + }() period := time.Duration(hb.periodMs) * time.Millisecond for { periodEnd := time.Now().Add(period) @@ -99,8 +103,6 @@ func (hb *Heartbeater) run() { select { case tgt, open := <-hb.req: if !open { - defer hb.wg.Done() - hb.lg.Debugf("%s: exiting.\n", hb.String()) return } hb.targets = append(hb.targets, *tgt) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go new file mode 100644 index 0000000..7aef1d1 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "fmt" + "math/rand" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "org/apache/htrace/test" + "testing" + "time" +) + +func TestReapingOldSpans(t *testing.T) { + const NUM_TEST_SPANS = 20 + testSpans := make([]*common.Span, NUM_TEST_SPANS) + rnd := rand.New(rand.NewSource(2)) + now := common.TimeToUnixMs(time.Now().UTC()) + for i := range testSpans { + testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i]) + testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i) + testSpans[i].Description = fmt.Sprintf("Span%02d", i) + } + htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans", + Cnf: map[string]string{ + conf.HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 60*60*1000), + conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", + }, + WrittenSpans: make(chan *common.Span, NUM_TEST_SPANS), + DataDirs: make([]string, 2), + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error()) + } + for i := range testSpans { + ht.Store.WriteSpan(&IncomingSpan{ + Addr: "127.0.0.1:1234", + Span: testSpans[i], + }) + } + // Wait the spans to be created + for i := 0; i < len(testSpans); i++ { + <-ht.Store.WrittenSpans + } + // Set a reaper date that will remove all the spans except final one. + ht.Store.rpr.SetReaperDate(now) + + common.WaitFor(5*time.Minute, time.Millisecond, func() bool { + for i := 0; i < NUM_TEST_SPANS-1; i++ { + span := ht.Store.FindSpan(testSpans[i].Id) + if span != nil { + ht.Store.lg.Debugf("Waiting for %s to be removed...\n", + testSpans[i].Description) + return false + } + } + span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id) + if span == nil { + ht.Store.lg.Debugf("Did not expect %s to be removed\n", + testSpans[NUM_TEST_SPANS-1].Description) + return false + } + return true + }) + defer ht.Close() +}
