Repository: incubator-htrace
Updated Branches:
  refs/heads/master 2fc552c12 -> be749f1c5


HTRACE-282. htraced: reap spans which are older than a configurable interval 
(cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/be749f1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/be749f1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/be749f1c

Branch: refs/heads/master
Commit: be749f1c5ac4cb890be4f36c9d239270c82d099d
Parents: 2fc552c
Author: Colin P. Mccabe <[email protected]>
Authored: Fri Oct 30 09:53:24 2015 -0700
Committer: Colin P. Mccabe <[email protected]>
Committed: Fri Oct 30 09:53:24 2015 -0700

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/common/rpc.go      |  10 +
 .../go/src/org/apache/htrace/common/time.go     |  34 +++
 .../src/org/apache/htrace/common/time_test.go   |  38 +++
 .../src/org/apache/htrace/conf/config_keys.go   |   8 +
 .../go/src/org/apache/htrace/htrace/cmd.go      |  21 +-
 .../src/org/apache/htrace/htraced/datastore.go  | 260 +++++++++++++++++--
 .../org/apache/htrace/htraced/heartbeater.go    |   8 +-
 .../org/apache/htrace/htraced/reaper_test.go    |  86 ++++++
 8 files changed, 440 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go 
b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 34ed15e..5e57f08 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -114,6 +114,16 @@ type ServerStats struct {
 
        // Per-host Span Metrics
        HostSpanMetrics SpanMetricsMap
+
+       // The time (in UTC milliseconds since the epoch) when the
+       // datastore was last started.
+       LastStartMs int64
+
+       // The current time (in UTC milliseconds since the epoch) on the server.
+       CurMs int64
+
+       // The total number of spans which have been reaped.
+       ReapedSpans uint64
 }
 
 type StorageDirectoryStats struct {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/common/time.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time.go 
b/htrace-htraced/go/src/org/apache/htrace/common/time.go
new file mode 100644
index 0000000..8b4b6b8
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/time.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+       "time"
+)
+
+func TimeToUnixMs(t time.Time) int64 {
+       return t.UnixNano() / 1000000
+}
+
+func UnixMsToTime(u int64) time.Time {
+       secs := u / 1000
+       nanos := u - (secs * 1000)
+       return time.Unix(secs, nanos)
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go 
b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
new file mode 100644
index 0000000..11e2733
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+       "testing"
+)
+
+func testRoundTrip(t *testing.T, u int64) {
+       tme := UnixMsToTime(u)
+       u2 := TimeToUnixMs(tme)
+       if u2 != u {
+               t.Fatalf("Error taking %d on a round trip: came back as "+
+                       "%d instead.\n", u, u2)
+       }
+}
+
+func TestTimeConversions(t *testing.T) {
+       testRoundTrip(t, 0)
+       testRoundTrip(t, 1445540632000)
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go 
b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index 487762b..ed809f9 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -75,6 +75,12 @@ const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = 
"metrics.heartbeat.period.ms"
 // The maximum number of addresses for which we will maintain metrics.
 const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
 
+// The number of milliseconds we should keep spans before discarding them.
+const HTRACE_SPAN_EXPIRY_MS = "span.expiry.ms"
+
+// The period between updates to the span reaper
+const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms"
+
 // A host:port pair to send information to on startup.  This is used in unit
 // tests to determine the (random) port of the htraced process that has been
 // started.
@@ -92,6 +98,8 @@ var DEFAULTS = map[string]string{
        HTRACE_LOG_LEVEL:                   "INFO",
        HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
        HTRACE_METRICS_MAX_ADDR_ENTRIES:    "100000",
+       HTRACE_SPAN_EXPIRY_MS:              fmt.Sprintf("%d", 3*24*60*60*1000),
+       HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  fmt.Sprintf("%d", 90*1000),
 }
 
 // Values to be used when creating test configurations

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go 
b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index 749acdf..e7286ff 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -33,6 +33,7 @@ import (
        "os"
        "sort"
        "strings"
+       "text/tabwriter"
        "time"
 )
 
@@ -197,8 +198,17 @@ func printServerStats(hcl *htrace.Client) int {
                fmt.Println(err.Error())
                return EXIT_FAILURE
        }
-       fmt.Printf("HTRACED SERVER STATS:\n")
-       fmt.Printf("%d leveldb directories.\n", len(stats.Dirs))
+       w := new(tabwriter.Writer)
+       w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+       fmt.Fprintf(w, "HTRACED SERVER STATS\n")
+       fmt.Fprintf(w, "Datastore Start\t%s\n",
+               common.UnixMsToTime(stats.LastStartMs).Format(time.RFC3339))
+       fmt.Fprintf(w, "Server Time\t%s\n",
+               common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
+       fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
+       fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs))
+       w.Flush()
+       fmt.Println("")
        for i := range stats.Dirs {
                dir := stats.Dirs[i]
                fmt.Printf("==== %s ===\n", dir.Path)
@@ -206,7 +216,9 @@ func printServerStats(hcl *htrace.Client) int {
                stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
                fmt.Printf("%s\n", stats)
        }
-       fmt.Printf("HOST SPAN METRICS:\n")
+       w = new(tabwriter.Writer)
+       w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+       fmt.Fprintf(w, "HOST SPAN METRICS\n")
        mtxMap := stats.HostSpanMetrics
        keys := make(sort.StringSlice, len(mtxMap))
        i := 0
@@ -217,9 +229,10 @@ func printServerStats(hcl *htrace.Client) int {
        sort.Sort(keys)
        for k := range keys {
                mtx := mtxMap[keys[k]]
-               fmt.Printf("%s: written: %d, server dropped %d, client dropped 
%d\n",
+               fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient 
dropped: %d\n",
                        keys[k], mtx.Written, mtx.ServerDropped, 
mtx.ClientDropped)
        }
+       w.Flush()
        return EXIT_SUCCESS
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 780b6d2..5d5559a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -31,6 +31,9 @@ import (
        "os"
        "strconv"
        "strings"
+       "sync"
+       "sync/atomic"
+       "time"
 )
 
 //
@@ -138,10 +141,83 @@ func (shd *shard) processIncoming() {
                                mtx.Clear()
                        }
                        shd.store.msink.UpdateMetrics(mtxMap)
+                       shd.pruneExpired()
                }
        }
 }
 
+func (shd *shard) pruneExpired() {
+       lg := shd.store.rpr.lg
+       src, err := CreateReaperSource(shd)
+       if err != nil {
+               lg.Errorf("Error creating reaper source for shd(%s): %s\n",
+                       shd.path, err.Error())
+               return
+       }
+       var totalReaped uint64
+       defer func() {
+               src.Close()
+               if totalReaped > 0 {
+                       atomic.AddUint64(&shd.store.rpr.ReapedSpans, 
totalReaped)
+               }
+       }()
+       urdate := s2u64(shd.store.rpr.GetReaperDate())
+       for {
+               span := src.next()
+               if span == nil {
+                       lg.Debugf("After reaping %d span(s), no more found in 
shard %s "+
+                               "to reap.\n", totalReaped, shd.path)
+                       return
+               }
+               begin := s2u64(span.Begin)
+               if begin >= urdate {
+                       lg.Debugf("After reaping %d span(s), the remaining 
spans in "+
+                               "shard %s are new enough to be kept\n",
+                               totalReaped, shd.path)
+                       return
+               }
+               err = shd.DeleteSpan(span)
+               if err != nil {
+                       lg.Errorf("Error deleting span %s from shd(%s): %s\n",
+                               span.String(), shd.path, err.Error())
+                       return
+               }
+               if lg.TraceEnabled() {
+                       lg.Tracef("Reaped span %s from shard %s\n", 
span.String(), shd.path)
+               }
+               totalReaped++
+       }
+}
+
+// Delete a span from the shard.  Note that leveldb may retain the data until
+// compaction(s) remove it.
+func (shd *shard) DeleteSpan(span *common.Span) error {
+       batch := levigo.NewWriteBatch()
+       defer batch.Close()
+       primaryKey :=
+               append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
+       batch.Delete(primaryKey)
+       for parentIdx := range span.Parents {
+               key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
+                       span.Parents[parentIdx].Val()...), span.Id.Val()...)
+               batch.Delete(key)
+       }
+       beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
+               u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
+       batch.Delete(beginTimeKey)
+       endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
+               u64toSlice(s2u64(span.End))...), span.Id.Val()...)
+       batch.Delete(endTimeKey)
+       durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
+               u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
+       batch.Delete(durationKey)
+       err := shd.ldb.Write(shd.store.writeOpts, batch)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
 // Convert a signed 64-bit number into an unsigned 64-bit number.  We flip the
 // highest bit, so that negative input values map to unsigned numbers which are
 // less than non-negative input values.
@@ -251,6 +327,107 @@ func (shd *shard) Close() {
        lg.Infof("Closed %s...\n", shd.path)
 }
 
+type Reaper struct {
+       // The logger used by the reaper
+       lg *common.Logger
+
+       // The number of milliseconds to keep spans around, in milliseconds.
+       spanExpiryMs int64
+
+       // The oldest date for which we'll keep spans.
+       reaperDate int64
+
+       // A channel used to send heartbeats to the reaper
+       heartbeats chan interface{}
+
+       // A channel used to block until the reaper goroutine has exited.
+       exited chan interface{}
+
+       // The lock protecting reaper data.
+       lock sync.Mutex
+
+       // The reaper heartbeater
+       hb *Heartbeater
+
+       // The total number of spans which have been reaped.
+       ReapedSpans uint64
+}
+
+func NewReaper(cnf *conf.Config) *Reaper {
+       rpr := &Reaper{
+               lg:           common.NewLogger("reaper", cnf),
+               spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
+               heartbeats:   make(chan interface{}, 1),
+               exited:       make(chan interface{}),
+       }
+       rpr.hb = NewHeartbeater("ReaperHeartbeater",
+               cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
+       go rpr.run()
+       rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
+               name:       "reaper",
+               targetChan: rpr.heartbeats,
+       })
+       return rpr
+}
+
+func (rpr *Reaper) run() {
+       defer func() {
+               rpr.lg.Info("Exiting Reaper goroutine.\n")
+               rpr.exited <- nil
+       }()
+
+       for {
+               _, isOpen := <-rpr.heartbeats
+               if !isOpen {
+                       return
+               }
+               rpr.handleHeartbeat()
+       }
+}
+
+func (rpr *Reaper) handleHeartbeat() {
+       // TODO: check dataStore fullness
+       now := common.TimeToUnixMs(time.Now().UTC())
+       d, updated := func() (int64, bool) {
+               rpr.lock.Lock()
+               defer rpr.lock.Unlock()
+               newReaperDate := now - rpr.spanExpiryMs
+               if newReaperDate > rpr.reaperDate {
+                       rpr.reaperDate = newReaperDate
+                       return rpr.reaperDate, true
+               } else {
+                       return rpr.reaperDate, false
+               }
+       }()
+       if rpr.lg.DebugEnabled() {
+               if updated {
+                       rpr.lg.Debugf("Updating UTC reaper date to %s.\n",
+                               common.UnixMsToTime(d).Format(time.RFC3339))
+               } else {
+                       rpr.lg.Debugf("Not updating previous reaperDate of 
%s.\n",
+                               common.UnixMsToTime(d).Format(time.RFC3339))
+               }
+       }
+}
+
+func (rpr *Reaper) GetReaperDate() int64 {
+       rpr.lock.Lock()
+       defer rpr.lock.Unlock()
+       return rpr.reaperDate
+}
+
+func (rpr *Reaper) SetReaperDate(rdate int64) {
+       rpr.lock.Lock()
+       defer rpr.lock.Unlock()
+       rpr.reaperDate = rdate
+}
+
+func (rpr *Reaper) Shutdown() {
+       rpr.hb.Shutdown()
+       close(rpr.heartbeats)
+       <-rpr.exited
+}
+
 // The Data Store.
 type dataStore struct {
        lg *common.Logger
@@ -273,6 +450,12 @@ type dataStore struct {
 
        // The heartbeater which periodically asks shards to update the 
MetricsSink.
        hb *Heartbeater
+
+       // The reaper for this datastore
+       rpr *Reaper
+
+       // When this datastore was started (in UTC milliseconds since the epoch)
+       startMs int64
 }
 
 func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) 
(*dataStore, error) {
@@ -327,6 +510,8 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan 
*common.Span) (*dataSto
                        targetChan: shd.heartbeats,
                })
        }
+       store.rpr = NewReaper(cnf)
+       store.startMs = common.TimeToUnixMs(time.Now().UTC())
        return store, nil
 }
 
@@ -456,6 +641,10 @@ func (store *dataStore) Close() {
                store.shards[idx].Close()
                store.shards[idx] = nil
        }
+       if store.rpr != nil {
+               store.rpr.Shutdown()
+               store.rpr = nil
+       }
        if store.msink != nil {
                store.msink.Shutdown()
                store.msink = nil
@@ -502,8 +691,8 @@ func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
        var span *common.Span
        span, err = shd.decodeSpan(sid, buf)
        if err != nil {
-               lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s\n",
-                       shd.path, sid.String(), err.Error())
+               lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding 
[%s]\n",
+                       shd.path, sid.String(), err.Error(), 
hex.EncodeToString(buf))
                return nil
        }
        return span
@@ -704,6 +893,7 @@ func (pred *predicateData) createSource(store *dataStore, 
prev *common.Span) (*s
        var ret *source
        src := source{store: store,
                pred:      pred,
+               shards:    make([]*shard, len(store.shards)),
                iters:     make([]*levigo.Iterator, 0, len(store.shards)),
                nexts:     make([]*common.Span, len(store.shards)),
                numRead:   make([]int, len(store.shards)),
@@ -720,6 +910,7 @@ func (pred *predicateData) createSource(store *dataStore, 
prev *common.Span) (*s
        }()
        for shardIdx := range store.shards {
                shd := store.shards[shardIdx]
+               src.shards[shardIdx] = shd
                src.iters = append(src.iters, 
shd.ldb.NewIterator(store.readOpts))
        }
        var searchKey []byte
@@ -804,12 +995,41 @@ func (pred *predicateData) createSource(store *dataStore, 
prev *common.Span) (*s
 type source struct {
        store     *dataStore
        pred      *predicateData
+       shards    []*shard
        iters     []*levigo.Iterator
        nexts     []*common.Span
        numRead   []int
        keyPrefix byte
 }
 
+func CreateReaperSource(shd *shard) (*source, error) {
+       store := shd.store
+       p := &common.Predicate{
+               Op:    common.GREATER_THAN_OR_EQUALS,
+               Field: common.BEGIN_TIME,
+               Val:   common.INVALID_SPAN_ID.String(),
+       }
+       pred, err := loadPredicateData(p)
+       if err != nil {
+               return nil, err
+       }
+       src := &source{
+               store:     store,
+               pred:      pred,
+               shards:    []*shard{shd},
+               iters:     make([]*levigo.Iterator, 1),
+               nexts:     make([]*common.Span, 1),
+               numRead:   make([]int, 1),
+               keyPrefix: pred.getIndexPrefix(),
+       }
+       iter := shd.ldb.NewIterator(store.readOpts)
+       src.iters[0] = iter
+       searchKey := append(append([]byte{src.keyPrefix}, pred.key...),
+               pred.key...)
+       iter.Seek(searchKey)
+       return src, nil
+}
+
 // Return true if this operation may require skipping the first result we get 
back from leveldb.
 func mayRequireOneSkip(op common.Op) bool {
        switch op {
@@ -834,24 +1054,25 @@ func (src *source) populateNextFromShard(shardIdx int) {
        lg := src.store.lg
        var err error
        iter := src.iters[shardIdx]
+       shdPath := src.shards[shardIdx].path
        if iter == nil {
-               lg.Debugf("Can't populate: No more entries in shard %d\n", 
shardIdx)
+               lg.Debugf("Can't populate: No more entries in shard %s\n", 
shdPath)
                return // There are no more entries in this shard.
        }
        if src.nexts[shardIdx] != nil {
-               lg.Debugf("No need to populate shard %d\n", shardIdx)
+               lg.Debugf("No need to populate shard %s\n", shdPath)
                return // We already have a valid entry for this shard.
        }
        for {
                if !iter.Valid() {
-                       lg.Debugf("Can't populate: Iterator for shard %d is no 
longer valid.\n", shardIdx)
+                       lg.Debugf("Can't populate: Iterator for shard %s is no 
longer valid.\n", shdPath)
                        break // Can't read past end of DB
                }
                src.numRead[shardIdx]++
                key := iter.Key()
                if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
-                       lg.Debugf("Can't populate: Iterator for shard %d does 
not have prefix %s\n",
-                               shardIdx, string(src.keyPrefix))
+                       lg.Debugf("Can't populate: Iterator for shard %s does 
not have prefix %s\n",
+                               shdPath, string(src.keyPrefix))
                        break // Can't read past end of indexed section
                }
                var span *common.Span
@@ -859,19 +1080,19 @@ func (src *source) populateNextFromShard(shardIdx int) {
                if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
                        // The span id maps to the span itself.
                        sid = common.SpanId(key[1:17])
-                       span, err = src.store.shards[shardIdx].decodeSpan(sid, 
iter.Value())
+                       span, err = src.shards[shardIdx].decodeSpan(sid, 
iter.Value())
                        if err != nil {
-                               lg.Debugf("Internal error decoding span %s in 
shard %d: %s\n",
-                                       sid.String(), shardIdx, err.Error())
+                               lg.Debugf("Internal error decoding span %s in 
shard %s: %s\n",
+                                       sid.String(), shdPath, err.Error())
                                break
                        }
                } else {
                        // With a secondary index, we have to look up the span 
by id.
                        sid = common.SpanId(key[9:25])
-                       span = src.store.shards[shardIdx].FindSpan(sid)
+                       span = src.shards[shardIdx].FindSpan(sid)
                        if span == nil {
-                               lg.Debugf("Internal error rehydrating span %s 
in shard %d\n",
-                                       sid.String(), shardIdx)
+                               lg.Debugf("Internal error rehydrating span %s 
in shard %s\n",
+                                       sid.String(), shdPath)
                                break
                        }
                }
@@ -881,12 +1102,12 @@ func (src *source) populateNextFromShard(shardIdx int) {
                        iter.Next()
                }
                if src.pred.satisfiedBy(span) {
-                       lg.Debugf("Populated valid span %v from shard %d.\n", 
sid, shardIdx)
+                       lg.Debugf("Populated valid span %v from shard %s.\n", 
sid, shdPath)
                        src.nexts[shardIdx] = span // Found valid entry
                        return
                } else {
-                       lg.Debugf("Span %s from shard %d does not satisfy the 
predicate.\n",
-                               sid.String(), shardIdx)
+                       lg.Debugf("Span %s from shard %s does not satisfy the 
predicate.\n",
+                               sid.String(), shdPath)
                        if src.numRead[shardIdx] <= 1 && 
mayRequireOneSkip(src.pred.Op) {
                                continue
                        }
@@ -894,13 +1115,13 @@ func (src *source) populateNextFromShard(shardIdx int) {
                        break
                }
        }
-       lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
+       lg.Debugf("Closing iterator for shard %s.\n", shdPath)
        iter.Close()
        src.iters[shardIdx] = nil
 }
 
 func (src *source) next() *common.Span {
-       for shardIdx := range src.iters {
+       for shardIdx := range src.shards {
                src.populateNextFromShard(shardIdx)
        }
        var best *common.Span
@@ -1017,5 +1238,8 @@ func (store *dataStore) ServerStats() *common.ServerStats 
{
                        shard.ldb.PropertyValue("leveldb.stats"))
        }
        serverStats.HostSpanMetrics = store.msink.AccessTotals()
+       serverStats.LastStartMs = store.startMs
+       serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
+       serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
        return &serverStats
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
index ea4b053..49a21ee 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
@@ -21,8 +21,8 @@ package main
 
 import (
        "org/apache/htrace/common"
-       "time"
        "sync"
+       "time"
 )
 
 type Heartbeater struct {
@@ -84,6 +84,10 @@ func (hb *Heartbeater) String() string {
 }
 
 func (hb *Heartbeater) run() {
+       defer func() {
+               hb.lg.Debugf("%s: exiting.\n", hb.String())
+               hb.wg.Done()
+       }()
        period := time.Duration(hb.periodMs) * time.Millisecond
        for {
                periodEnd := time.Now().Add(period)
@@ -99,8 +103,6 @@ func (hb *Heartbeater) run() {
                        select {
                        case tgt, open := <-hb.req:
                                if !open {
-                                       defer hb.wg.Done()
-                                       hb.lg.Debugf("%s: exiting.\n", 
hb.String())
                                        return
                                }
                                hb.targets = append(hb.targets, *tgt)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/be749f1c/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
new file mode 100644
index 0000000..7aef1d1
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "fmt"
+       "math/rand"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "org/apache/htrace/test"
+       "testing"
+       "time"
+)
+
+func TestReapingOldSpans(t *testing.T) {
+       const NUM_TEST_SPANS = 20
+       testSpans := make([]*common.Span, NUM_TEST_SPANS)
+       rnd := rand.New(rand.NewSource(2))
+       now := common.TimeToUnixMs(time.Now().UTC())
+       for i := range testSpans {
+               testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i])
+               testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i)
+               testSpans[i].Description = fmt.Sprintf("Span%02d", i)
+       }
+       htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans",
+               Cnf: map[string]string{
+                       conf.HTRACE_SPAN_EXPIRY_MS:              
fmt.Sprintf("%d", 60*60*1000),
+                       conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  "1",
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
+               WrittenSpans: make(chan *common.Span, NUM_TEST_SPANS),
+               DataDirs:     make([]string, 2),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create mini htraced cluster: %s\n", 
err.Error())
+       }
+       for i := range testSpans {
+               ht.Store.WriteSpan(&IncomingSpan{
+                       Addr: "127.0.0.1:1234",
+                       Span: testSpans[i],
+               })
+       }
+       // Wait the spans to be created
+       for i := 0; i < len(testSpans); i++ {
+               <-ht.Store.WrittenSpans
+       }
+       // Set a reaper date that will remove all the spans except final one.
+       ht.Store.rpr.SetReaperDate(now)
+
+       common.WaitFor(5*time.Minute, time.Millisecond, func() bool {
+               for i := 0; i < NUM_TEST_SPANS-1; i++ {
+                       span := ht.Store.FindSpan(testSpans[i].Id)
+                       if span != nil {
+                               ht.Store.lg.Debugf("Waiting for %s to be 
removed...\n",
+                                       testSpans[i].Description)
+                               return false
+                       }
+               }
+               span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id)
+               if span == nil {
+                       ht.Store.lg.Debugf("Did not expect %s to be removed\n",
+                               testSpans[NUM_TEST_SPANS-1].Description)
+                       return false
+               }
+               return true
+       })
+       defer ht.Close()
+}

Reply via email to