Repository: incubator-htrace
Updated Branches:
  refs/heads/master 97faf708b -> c28fc60dc


HTRACE-280. htraced: add metrics about total spans added and dropped per 
address (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/c28fc60d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/c28fc60d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/c28fc60d

Branch: refs/heads/master
Commit: c28fc60dcfd83f61afcb7c2685ada1f2fafe7bf0
Parents: 97faf70
Author: Colin P. Mccabe <[email protected]>
Authored: Thu Oct 22 19:38:38 2015 -0700
Committer: Colin P. Mccabe <[email protected]>
Committed: Thu Oct 22 19:39:09 2015 -0700

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/common/process.go  |   2 +-
 .../org/apache/htrace/common/process_test.go    |   8 +-
 .../go/src/org/apache/htrace/common/rpc.go      |  58 +++--
 .../src/org/apache/htrace/conf/config_keys.go   |  18 ++
 .../go/src/org/apache/htrace/htrace/cmd.go      |  29 ++-
 .../org/apache/htrace/htraced/client_test.go    |   4 +-
 .../src/org/apache/htrace/htraced/datastore.go  | 121 +++++++---
 .../org/apache/htrace/htraced/datastore_test.go |  97 +++++---
 .../org/apache/htrace/htraced/heartbeater.go    | 117 ++++++++++
 .../apache/htrace/htraced/heartbeater_test.go   | 100 ++++++++
 .../go/src/org/apache/htrace/htraced/hrpc.go    |  18 +-
 .../go/src/org/apache/htrace/htraced/metrics.go | 234 +++++++++++++++++++
 .../org/apache/htrace/htraced/metrics_test.go   | 187 +++++++++++++++
 .../org/apache/htrace/htraced/mini_htraced.go   |  10 +-
 .../go/src/org/apache/htrace/htraced/rest.go    |   8 +-
 15 files changed, 906 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/process.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/process.go 
b/htrace-htraced/go/src/org/apache/htrace/common/process.go
index 419d6fe..aad6ca1 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/process.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/process.go
@@ -69,7 +69,7 @@ func InstallSignalHandlers(cnf *conf.Config) {
        sigQuitChan := make(chan os.Signal, 1)
        signal.Notify(sigQuitChan, syscall.SIGQUIT)
        go func() {
-               bufSize := 1<<20
+               bufSize := 1 << 20
                buf := make([]byte, bufSize)
                for {
                        <-sigQuitChan

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/process_test.go 
b/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
index 7609133..d3f5a56 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
@@ -42,7 +42,7 @@ func TestSignals(t *testing.T) {
                os.Exit(0)
        }
        helper := exec.Command(os.Args[0], "-test.run=TestSignals", "--")
-       helper.Env = []string { HTRACED_TEST_HELPER_PROCESS + "=1" }
+       helper.Env = []string{HTRACED_TEST_HELPER_PROCESS + "=1"}
        stdoutPipe, err := helper.StdoutPipe()
        if err != nil {
                panic(fmt.Sprintf("Failed to open pipe to process stdout: %s",
@@ -77,7 +77,7 @@ func TestSignals(t *testing.T) {
                }
                t.Logf("Saw 'Terminating on signal: SIGINT'.  " +
                        "Helper goroutine exiting.\n")
-               done<-nil
+               done <- nil
        }()
        scanner := bufio.NewScanner(stderrPipe)
        for scanner.Scan() {
@@ -97,9 +97,9 @@ func TestSignals(t *testing.T) {
 
 // Run the helper process which TestSignals spawns.
 func runHelperProcess() {
-       cnfMap := map[string]string {
+       cnfMap := map[string]string{
                conf.HTRACE_LOG_LEVEL: "TRACE",
-               conf.HTRACE_LOG_PATH: "", // log to stdout
+               conf.HTRACE_LOG_PATH:  "", // log to stdout
        }
        cnfBld := conf.Builder{Values: cnfMap, Defaults: conf.DEFAULTS}
        cnf, err := cnfBld.Build()

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go 
b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 9c7bfad..34ed15e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,8 +38,10 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
 
 // A request to write spans to htraced.
 type WriteSpansReq struct {
-       DefaultTrid string `json:",omitempty"`
-       Spans       []*Span
+       Addr          string // This gets filled in by the RPC layer.
+       DefaultTrid   string `json:",omitempty"`
+       Spans         []*Span
+       ClientDropped uint64 `json:",omitempty"`
 }
 
 // Info returned by /server/info
@@ -55,22 +57,6 @@ type ServerInfo struct {
 type WriteSpansResp struct {
 }
 
-// Info returned by /server/stats
-type ServerStats struct {
-       Shards []ShardStats
-}
-
-type ShardStats struct {
-       Path string
-
-       // The approximate number of spans present in this shard.  This may be 
an
-       // underestimate.
-       ApproxNumSpans uint64
-
-       // leveldb.stats information
-       LevelDbStats string
-}
-
 // The header which is sent over the wire for HRPC
 type HrpcRequestHeader struct {
        Magic    uint32
@@ -104,3 +90,39 @@ func HrpcMethodNameToId(name string) uint32 {
                return METHOD_ID_NONE
        }
 }
+
+type SpanMetrics struct {
+       // The total number of spans written to HTraced.
+       Written uint64
+
+       // The total number of spans dropped by the server.
+       ServerDropped uint64
+
+       // The total number of spans dropped by the client.  Note that this 
number
+       // is tracked on the client itself and doesn't get updated if the client
+       // can't contact the server.
+       ClientDropped uint64
+}
+
+// A map from network address strings to SpanMetrics structures.
+type SpanMetricsMap map[string]*SpanMetrics
+
+// Info returned by /server/stats
+type ServerStats struct {
+       // Statistics for each shard (directory)
+       Dirs []StorageDirectoryStats
+
+       // Per-host Span Metrics
+       HostSpanMetrics SpanMetricsMap
+}
+
+type StorageDirectoryStats struct {
+       Path string
+
+       // The approximate number of spans present in this shard.  This may be 
an
+       // underestimate.
+       ApproxNumSpans uint64
+
+       // leveldb.stats information
+       LevelDbStats string
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go 
b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index ccb09e0..487762b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -68,6 +68,13 @@ const HTRACE_LOG_PATH = "log.path"
 // The log level to use for the logs in htrace.
 const HTRACE_LOG_LEVEL = "log.level"
 
+// The period between metrics heartbeats.  This is the approximate interval at 
which we will
+// update global metrics.
+const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms"
+
+// The maximum number of addresses for which we will maintain metrics.
+const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
+
 // A host:port pair to send information to on startup.  This is used in unit
 // tests to determine the (random) port of the htraced process that has been
 // started.
@@ -83,4 +90,15 @@ var DEFAULTS = map[string]string{
        HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100",
        HTRACE_LOG_PATH:                    "",
        HTRACE_LOG_LEVEL:                   "INFO",
+       HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
+       HTRACE_METRICS_MAX_ADDR_ENTRIES:    "100000",
+}
+
+// Values to be used when creating test configurations
+func TEST_VALUES() map[string]string {
+       return map[string]string{
+               HTRACE_HRPC_ADDRESS: ":0",    // use a random port for the HRPC 
server
+               HTRACE_LOG_LEVEL:    "TRACE", // show all log messages in tests
+               HTRACE_WEB_ADDRESS:  ":0",    // use a random port for the REST 
server
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go 
b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index f6972bb..749acdf 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -31,6 +31,7 @@ import (
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "os"
+       "sort"
        "strings"
        "time"
 )
@@ -196,15 +197,29 @@ func printServerStats(hcl *htrace.Client) int {
                fmt.Println(err.Error())
                return EXIT_FAILURE
        }
-       fmt.Printf("HTraced server stats:\n")
-       fmt.Printf("%d leveldb shards.\n", len(stats.Shards))
-       for i := range stats.Shards {
-               shard := stats.Shards[i]
-               fmt.Printf("==== %s ===\n", shard.Path)
-               fmt.Printf("Approximate number of spans: %d\n", 
shard.ApproxNumSpans)
-               stats := strings.Replace(shard.LevelDbStats, "\\n", "\n", -1)
+       fmt.Printf("HTRACED SERVER STATS:\n")
+       fmt.Printf("%d leveldb directories.\n", len(stats.Dirs))
+       for i := range stats.Dirs {
+               dir := stats.Dirs[i]
+               fmt.Printf("==== %s ===\n", dir.Path)
+               fmt.Printf("Approximate number of spans: %d\n", 
dir.ApproxNumSpans)
+               stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
                fmt.Printf("%s\n", stats)
        }
+       fmt.Printf("HOST SPAN METRICS:\n")
+       mtxMap := stats.HostSpanMetrics
+       keys := make(sort.StringSlice, len(mtxMap))
+       i := 0
+       for k, _ := range mtxMap {
+               keys[i] = k
+               i++
+       }
+       sort.Sort(keys)
+       for k := range keys {
+               mtx := mtxMap[keys[k]]
+               fmt.Printf("%s: written: %d, server dropped %d, client dropped 
%d\n",
+                       keys[k], mtx.Written, mtx.ServerDropped, 
mtx.ClientDropped)
+       }
        return EXIT_SUCCESS
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 540e688..ca0a425 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -210,7 +210,7 @@ const EXAMPLE_CONF_VALUE = "foo.bar.baz"
 
 func TestClientGetServerConf(t *testing.T) {
        htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf",
-               Cnf: map[string]string {
+               Cnf: map[string]string{
                        EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE,
                },
                DataDirs: make([]string, 2)}
@@ -230,6 +230,6 @@ func TestClientGetServerConf(t *testing.T) {
        }
        if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE {
                t.Fatalf("unexpected value for %s: %s",
-                               EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
+                       EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 9fb9920..780b6d2 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -31,7 +31,6 @@ import (
        "os"
        "strconv"
        "strings"
-       "sync/atomic"
 )
 
 //
@@ -77,19 +76,12 @@ const DURATION_INDEX_PREFIX = 'd'
 const PARENT_ID_INDEX_PREFIX = 'p'
 const INVALID_INDEX_PREFIX = 0
 
-type Statistics struct {
-       NumSpansWritten uint64
-}
-
-func (stats *Statistics) IncrementWrittenSpans() {
-       atomic.AddUint64(&stats.NumSpansWritten, 1)
-}
+type IncomingSpan struct {
+       // The address that the span was sent from.
+       Addr string
 
-// Make a copy of the statistics structure, using atomic operations.
-func (stats *Statistics) Copy() *Statistics {
-       return &Statistics{
-               NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten),
-       }
+       // The span.
+       *common.Span
 }
 
 // A single directory containing a levelDB instance.
@@ -104,27 +96,48 @@ type shard struct {
        path string
 
        // Incoming requests to write Spans.
-       incoming chan *common.Span
+       incoming chan *IncomingSpan
+
+       // A channel for incoming heartbeats
+       heartbeats chan interface{}
 
        // The channel we will send a bool to when we exit.
        exited chan bool
+
+       // Per-address metrics
+       mtxMap ServerSpanMetricsMap
+
+       // The maximum number of metrics to allow in our map
+       maxMtx int
 }
 
 // Process incoming spans for a shard.
 func (shd *shard) processIncoming() {
        lg := shd.store.lg
+       defer func() {
+               lg.Infof("Shard processor for %s exiting.\n", shd.path)
+               shd.exited <- true
+       }()
        for {
-               span := <-shd.incoming
-               if span == nil {
-                       lg.Infof("Shard processor for %s exiting.\n", shd.path)
-                       shd.exited <- true
-                       return
-               }
-               err := shd.writeSpan(span)
-               if err != nil {
-                       lg.Errorf("Shard processor for %s got fatal error 
%s.\n", shd.path, err.Error())
-               } else {
-                       lg.Tracef("Shard processor for %s wrote span %s.\n", 
shd.path, span.ToJson())
+               select {
+               case span := <-shd.incoming:
+                       if span == nil {
+                               return
+                       }
+                       err := shd.writeSpan(span)
+                       if err != nil {
+                               lg.Errorf("Shard processor for %s got fatal 
error %s.\n", shd.path, err.Error())
+                       } else {
+                               lg.Tracef("Shard processor for %s wrote span 
%s.\n", shd.path, span.ToJson())
+                       }
+               case <-shd.heartbeats:
+                       lg.Tracef("Shard processor for %s handling 
heartbeat.\n", shd.path)
+                       mtxMap := make(ServerSpanMetricsMap)
+                       for addr, mtx := range shd.mtxMap {
+                               mtxMap[addr] = mtx.Clone()
+                               mtx.Clear()
+                       }
+                       shd.store.msink.UpdateMetrics(mtxMap)
                }
        }
 }
@@ -150,15 +163,19 @@ func u64toSlice(val uint64) []byte {
                byte(0xff & (val >> 0))}
 }
 
-func (shd *shard) writeSpan(span *common.Span) error {
+func (shd *shard) writeSpan(ispan *IncomingSpan) error {
        batch := levigo.NewWriteBatch()
        defer batch.Close()
 
        // Add SpanData to batch.
        spanDataBuf := new(bytes.Buffer)
        spanDataEnc := gob.NewEncoder(spanDataBuf)
+       span := ispan.Span
        err := spanDataEnc.Encode(span.SpanData)
        if err != nil {
+               shd.store.lg.Errorf("Error encoding span %s: %s\n",
+                       span.String(), err.Error())
+               shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, 
shd.store.lg)
                return err
        }
        primaryKey :=
@@ -185,9 +202,12 @@ func (shd *shard) writeSpan(span *common.Span) error {
 
        err = shd.ldb.Write(shd.store.writeOpts, batch)
        if err != nil {
+               shd.store.lg.Errorf("Error writing span %s to leveldb at %s: 
%s\n",
+                       span.String(), shd.path, err.Error())
+               shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, 
shd.store.lg)
                return err
        }
-       shd.store.stats.IncrementWrittenSpans()
+       shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
        if shd.store.WrittenSpans != nil {
                shd.store.WrittenSpans <- span
        }
@@ -238,9 +258,6 @@ type dataStore struct {
        // The shards which manage our LevelDB instances.
        shards []*shard
 
-       // I/O statistics for all shards.
-       stats Statistics
-
        // The read options to use for LevelDB.
        readOpts *levigo.ReadOptions
 
@@ -250,6 +267,12 @@ type dataStore struct {
        // If non-null, a channel we will send spans to once we finish writing 
them.  This is only used
        // for testing.
        WrittenSpans chan *common.Span
+
+       // The metrics sink.
+       msink *MetricsSink
+
+       // The heartbeater which periodically asks shards to update the 
MetricsSink.
+       hb *Heartbeater
 }
 
 func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) 
(*dataStore, error) {
@@ -286,11 +309,24 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan 
*common.Span) (*dataSto
                }
                store.shards = append(store.shards, shd)
        }
+       store.msink = NewMetricsSink(cnf)
        for idx := range store.shards {
                shd := store.shards[idx]
                shd.exited = make(chan bool, 1)
+               shd.heartbeats = make(chan interface{}, 1)
+               shd.mtxMap = make(ServerSpanMetricsMap)
+               shd.maxMtx = store.msink.maxMtx
                go shd.processIncoming()
        }
+       store.hb = NewHeartbeater("DatastoreHeartbeater",
+               cnf.GetInt64(conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS), lg)
+       for shdIdx := range store.shards {
+               shd := store.shards[shdIdx]
+               store.hb.AddHeartbeatTarget(&HeartbeatTarget{
+                       name:       fmt.Sprintf("shard(%s)", shd.path),
+                       targetChan: shd.heartbeats,
+               })
+       }
        return store, nil
 }
 
@@ -372,7 +408,7 @@ func CreateShard(store *dataStore, cnf *conf.Config, path 
string,
        }
        spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
        shd = &shard{store: store, ldb: ldb, path: path,
-               incoming: make(chan *common.Span, spanBufferSize)}
+               incoming: make(chan *IncomingSpan, spanBufferSize)}
        return shd, nil
 }
 
@@ -406,16 +442,24 @@ func writeDataStoreVersion(store *dataStore, ldb 
*levigo.DB, v uint32) error {
        return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
 }
 
-func (store *dataStore) GetStatistics() *Statistics {
-       return store.stats.Copy()
+func (store *dataStore) GetSpanMetrics() common.SpanMetricsMap {
+       return store.msink.AccessTotals()
 }
 
 // Close the DataStore.
 func (store *dataStore) Close() {
+       if store.hb != nil {
+               store.hb.Shutdown()
+               store.hb = nil
+       }
        for idx := range store.shards {
                store.shards[idx].Close()
                store.shards[idx] = nil
        }
+       if store.msink != nil {
+               store.msink.Shutdown()
+               store.msink = nil
+       }
        if store.readOpts != nil {
                store.readOpts.Close()
                store.readOpts = nil
@@ -435,7 +479,7 @@ func (store *dataStore) getShardIndex(sid common.SpanId) 
int {
        return int(sid.Hash32() % uint32(len(store.shards)))
 }
 
-func (store *dataStore) WriteSpan(span *common.Span) {
+func (store *dataStore) WriteSpan(span *IncomingSpan) {
        store.shards[store.getShardIndex(span.Id)].incoming <- span
 }
 
@@ -954,11 +998,11 @@ func (store *dataStore) HandleQuery(query *common.Query) 
([]*common.Span, error)
 
 func (store *dataStore) ServerStats() *common.ServerStats {
        serverStats := common.ServerStats{
-               Shards: make([]common.ShardStats, len(store.shards)),
+               Dirs: make([]common.StorageDirectoryStats, len(store.shards)),
        }
        for shardIdx := range store.shards {
                shard := store.shards[shardIdx]
-               serverStats.Shards[shardIdx].Path = shard.path
+               serverStats.Dirs[shardIdx].Path = shard.path
                r := levigo.Range{
                        Start: append([]byte{SPAN_ID_INDEX_PREFIX},
                                common.INVALID_SPAN_ID.Val()...),
@@ -966,11 +1010,12 @@ func (store *dataStore) ServerStats() 
*common.ServerStats {
                                common.INVALID_SPAN_ID.Val()...),
                }
                vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
-               serverStats.Shards[shardIdx].ApproxNumSpans = vals[0]
-               serverStats.Shards[shardIdx].LevelDbStats =
+               serverStats.Dirs[shardIdx].ApproxNumSpans = vals[0]
+               serverStats.Dirs[shardIdx].LevelDbStats =
                        shard.ldb.PropertyValue("leveldb.stats")
                store.lg.Infof("shard.ldb.PropertyValue(leveldb.stats)=%s\n",
                        shard.ldb.PropertyValue("leveldb.stats"))
        }
+       serverStats.HostSpanMetrics = store.msink.AccessTotals()
        return &serverStats
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 0caa509..50d2891 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -73,10 +73,13 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{
 
 func createSpans(spans []common.Span, store *dataStore) {
        for idx := range spans {
-               store.WriteSpan(&spans[idx])
+               store.WriteSpan(&IncomingSpan{
+                       Addr: "127.0.0.1:1234",
+                       Span: &spans[idx],
+               })
        }
        // Wait the spans to be created
-       for i := 0; i < 3; i++ {
+       for i := 0; i < len(spans); i++ {
                <-store.WrittenSpans
        }
 }
@@ -85,6 +88,9 @@ func createSpans(spans []common.Span, store *dataStore) {
 func TestDatastoreWriteAndRead(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
+               Cnf: map[string]string{
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
                WrittenSpans: make(chan *common.Span, 100)}
        ht, err := htraceBld.Build()
        if err != nil {
@@ -92,9 +98,13 @@ func TestDatastoreWriteAndRead(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
-               t.Fatal()
-       }
+
+       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+               "127.0.0.1:1234": &common.SpanMetrics{
+                       Written: uint64(len(SIMPLE_TEST_SPANS)),
+               },
+       })
+
        span := 
ht.Store.FindSpan(common.TestId("00000000000000000000000000000001"))
        if span == nil {
                t.Fatal()
@@ -147,6 +157,9 @@ func testQuery(t *testing.T, ht *MiniHTraced, query 
*common.Query,
 func TestSimpleQuery(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+               Cnf: map[string]string{
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
                WrittenSpans: make(chan *common.Span, 100)}
        ht, err := htraceBld.Build()
        if err != nil {
@@ -154,9 +167,12 @@ func TestSimpleQuery(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
-               t.Fatal()
-       }
+       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+               "127.0.0.1:1234": &common.SpanMetrics{
+                       Written: uint64(len(SIMPLE_TEST_SPANS)),
+               },
+       })
+
        testQuery(t, ht, &common.Query{
                Predicates: []common.Predicate{
                        common.Predicate{
@@ -172,6 +188,9 @@ func TestSimpleQuery(t *testing.T) {
 func TestQueries2(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+               Cnf: map[string]string{
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
                WrittenSpans: make(chan *common.Span, 100)}
        ht, err := htraceBld.Build()
        if err != nil {
@@ -179,9 +198,11 @@ func TestQueries2(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
-               t.Fatal()
-       }
+       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+               "127.0.0.1:1234": &common.SpanMetrics{
+                       Written: uint64(len(SIMPLE_TEST_SPANS)),
+               },
+       })
        testQuery(t, ht, &common.Query{
                Predicates: []common.Predicate{
                        common.Predicate{
@@ -224,6 +245,9 @@ func TestQueries2(t *testing.T) {
 func TestQueries3(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+               Cnf: map[string]string{
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
                WrittenSpans: make(chan *common.Span, 100)}
        ht, err := htraceBld.Build()
        if err != nil {
@@ -231,9 +255,11 @@ func TestQueries3(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
-               t.Fatal()
-       }
+       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+               "127.0.0.1:1234": &common.SpanMetrics{
+                       Written: uint64(len(SIMPLE_TEST_SPANS)),
+               },
+       })
        testQuery(t, ht, &common.Query{
                Predicates: []common.Predicate{
                        common.Predicate{
@@ -276,6 +302,9 @@ func TestQueries3(t *testing.T) {
 func TestQueries4(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
+               Cnf: map[string]string{
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
                WrittenSpans: make(chan *common.Span, 100)}
        ht, err := htraceBld.Build()
        if err != nil {
@@ -283,9 +312,11 @@ func TestQueries4(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
-               t.Fatal()
-       }
+       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+               "127.0.0.1:1234": &common.SpanMetrics{
+                       Written: uint64(len(SIMPLE_TEST_SPANS)),
+               },
+       })
        testQuery(t, ht, &common.Query{
                Predicates: []common.Predicate{
                        common.Predicate{
@@ -320,6 +351,9 @@ func TestQueries4(t *testing.T) {
 
 func BenchmarkDatastoreWrites(b *testing.B) {
        htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
+               Cnf: map[string]string{
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
                WrittenSpans: make(chan *common.Span, b.N)}
        ht, err := htraceBld.Build()
        if err != nil {
@@ -331,22 +365,28 @@ func BenchmarkDatastoreWrites(b *testing.B) {
        // Write many random spans.
        for n := 0; n < b.N; n++ {
                span := test.NewRandomSpan(rnd, allSpans[0:n])
-               ht.Store.WriteSpan(span)
+               ht.Store.WriteSpan(&IncomingSpan{
+                       Addr: "127.0.0.1:1234",
+                       Span: span,
+               })
                allSpans[n] = span
        }
        // Wait for all the spans to be written.
        for n := 0; n < b.N; n++ {
                <-ht.Store.WrittenSpans
        }
-       spansWritten := ht.Store.GetStatistics().NumSpansWritten
-       if spansWritten < uint64(b.N) {
-               b.Fatal("incorrect statistics: expected %d spans to be written, 
but only got %d",
-                       b.N, spansWritten)
-       }
+       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+               "127.0.0.1:1234": &common.SpanMetrics{
+                       Written: uint64(b.N), // should be less than?
+               },
+       })
 }
 
 func TestReloadDataStore(t *testing.T) {
        htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
+               Cnf: map[string]string{
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
                DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
        ht, err := htraceBld.Build()
        if err != nil {
@@ -444,6 +484,9 @@ func TestReloadDataStore(t *testing.T) {
 func TestQueriesWithContinuationTokens1(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: 
"TestQueriesWithContinuationTokens1",
+               Cnf: map[string]string{
+                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+               },
                WrittenSpans: make(chan *common.Span, 100)}
        ht, err := htraceBld.Build()
        if err != nil {
@@ -451,9 +494,11 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
-               t.Fatal()
-       }
+       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+               "127.0.0.1:1234": &common.SpanMetrics{
+                       Written: uint64(len(SIMPLE_TEST_SPANS)),
+               },
+       })
        // Adding a prev value to this query excludes the first result that we
        // would normally get.
        testQuery(t, ht, &common.Query{

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
new file mode 100644
index 0000000..140b50d
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "org/apache/htrace/common"
+       "time"
+)
+
+type Heartbeater struct {
+       // The name of this heartbeater
+       name string
+
+       // How long to sleep between heartbeats, in milliseconds.
+       periodMs int64
+
+       // The logger to use.
+       lg *common.Logger
+
+       // The channels to send the heartbeat on.
+       targets []HeartbeatTarget
+
+       // Incoming requests to the heartbeater.  When this is closed, the
+       // heartbeater will exit.
+       req chan *HeartbeatTarget
+}
+
+type HeartbeatTarget struct {
+       // The name of the heartbeat target.
+       name string
+
+       // The channel for the heartbeat target.
+       targetChan chan interface{}
+}
+
+func (tgt *HeartbeatTarget) String() string {
+       return tgt.name
+}
+
+func NewHeartbeater(name string, periodMs int64, lg *common.Logger) 
*Heartbeater {
+       hb := &Heartbeater{
+               name:     name,
+               periodMs: periodMs,
+               lg:       lg,
+               targets:  make([]HeartbeatTarget, 0, 4),
+               req:      make(chan *HeartbeatTarget),
+       }
+       go hb.run()
+       return hb
+}
+
+func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) {
+       hb.req <- tgt
+}
+
+func (hb *Heartbeater) Shutdown() {
+       close(hb.req)
+}
+
+func (hb *Heartbeater) String() string {
+       return hb.name
+}
+
+func (hb *Heartbeater) run() {
+       period := time.Duration(hb.periodMs) * time.Millisecond
+       for {
+               periodEnd := time.Now().Add(period)
+               for {
+                       timeToWait := periodEnd.Sub(time.Now())
+                       if timeToWait <= 0 {
+                               break
+                       } else if timeToWait > period {
+                               // Smooth over jitter or clock changes
+                               timeToWait = period
+                               periodEnd = time.Now().Add(period)
+                       }
+                       select {
+                       case tgt, open := <-hb.req:
+                               if !open {
+                                       hb.lg.Debugf("%s: exiting.\n", 
hb.String())
+                                       return
+                               }
+                               hb.targets = append(hb.targets, *tgt)
+                               hb.lg.Debugf("%s: added %s.\n", hb.String(), 
tgt.String())
+                       case <-time.After(timeToWait):
+                       }
+               }
+               for targetIdx := range hb.targets {
+                       select {
+                       case hb.targets[targetIdx].targetChan <- nil:
+                       default:
+                               // We failed to send a heartbeat because the 
other goroutine was busy and
+                               // hasn't cleared the previous one from its 
channel.  This could indicate a
+                               // stuck goroutine.
+                               hb.lg.Infof("%s: could not send heartbeat to 
%s.\n",
+                                       hb.String(), hb.targets[targetIdx])
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
new file mode 100644
index 0000000..cbde7fc
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "testing"
+       "time"
+)
+
+func TestHeartbeaterStartupShutdown(t *testing.T) {
+       cnfBld := conf.Builder{
+               Values:   conf.TEST_VALUES(),
+               Defaults: conf.DEFAULTS,
+       }
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create conf: %s", err.Error())
+       }
+       lg := common.NewLogger("heartbeater", cnf)
+       hb := NewHeartbeater("ExampleHeartbeater", 1, lg)
+       if hb.String() != "ExampleHeartbeater" {
+               t.Fatalf("hb.String() returned %s instead of %s\n", 
hb.String(), "ExampleHeartbeater")
+       }
+       hb.Shutdown()
+}
+
+// The number of milliseconds between heartbeats
+const HEARTBEATER_PERIOD = 5
+
+// The number of heartbeats to send in the test.
+const NUM_TEST_HEARTBEATS = 3
+
+func TestHeartbeaterSendsHeartbeats(t *testing.T) {
+       cnfBld := conf.Builder{
+               Values:   conf.TEST_VALUES(),
+               Defaults: conf.DEFAULTS,
+       }
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create conf: %s", err.Error())
+       }
+       lg := common.NewLogger("heartbeater", cnf)
+       // The minimum amount of time which the heartbeater test should take
+       MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * 
HEARTBEATER_PERIOD)
+       duration := MINIMUM_TEST_DURATION
+       for duration <= MINIMUM_TEST_DURATION {
+               start := time.Now()
+               testHeartbeaterSendsHeartbeatsImpl(t, lg)
+               end := time.Now()
+               duration = end.Sub(start)
+               lg.Debugf("Measured duration: %v; minimum expected duration: 
%v\n",
+                       duration, MINIMUM_TEST_DURATION)
+       }
+}
+
+func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) {
+       hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg)
+       if hb.String() != "ExampleHeartbeater" {
+               t.Fatalf("hb.String() returned %s instead of %s\n", 
hb.String(), "ExampleHeartbeater")
+       }
+       testChan := make(chan interface{}, NUM_TEST_HEARTBEATS)
+       gotAllHeartbeats := make(chan bool)
+       hb.AddHeartbeatTarget(&HeartbeatTarget{
+               name:       "ExampleHeartbeatTarget",
+               targetChan: testChan,
+       })
+       go func() {
+               for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+                       <-testChan
+               }
+               gotAllHeartbeats <- true
+               for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+                       _, open := <-testChan
+                       if !open {
+                               return
+                       }
+               }
+       }()
+       <-gotAllHeartbeats
+       hb.Shutdown()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index 354d064..49587bb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -32,6 +32,7 @@ import (
        "net/rpc"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
+       "reflect"
 )
 
 // Handles HRPC calls
@@ -109,9 +110,10 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req 
*rpc.Request) error {
 }
 
 func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+       remoteAddr := cdc.conn.RemoteAddr()
        if cdc.lg.TraceEnabled() {
                cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
-                       cdc.length, cdc.conn.RemoteAddr())
+                       cdc.length, remoteAddr)
        }
        mh := new(codec.MsgpackHandle)
        mh.WriteExt = true
@@ -119,11 +121,16 @@ func (cdc *HrpcServerCodec) ReadRequestBody(body 
interface{}) error {
        err := dec.Decode(body)
        if err != nil {
                return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read 
request "+
-                       "body from %s: %s", cdc.conn.RemoteAddr(), err.Error()))
+                       "body from %s: %s", remoteAddr, err.Error()))
        }
        if cdc.lg.TraceEnabled() {
                cdc.lg.Tracef("Read body from %s: %s\n",
-                       cdc.conn.RemoteAddr(), asJson(&body))
+                       remoteAddr, asJson(&body))
+       }
+       val := reflect.ValueOf(body)
+       addr := val.Elem().FieldByName("Addr")
+       if addr.IsValid() {
+               addr.SetString(remoteAddr.String())
        }
        return nil
 }
@@ -203,7 +210,10 @@ func (hand *HrpcHandler) WriteSpans(req 
*common.WriteSpansReq,
                if hand.lg.TraceEnabled() {
                        hand.lg.Tracef("writing span %d: %s\n", i, 
span.ToJson())
                }
-               hand.store.WriteSpan(span)
+               hand.store.WriteSpan(&IncomingSpan{
+                       Addr: req.Addr,
+                       Span: span,
+               })
        }
        return nil
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
new file mode 100644
index 0000000..672f5f6
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "encoding/json"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "sync"
+)
+
+//
+// The Metrics Sink for HTraced.
+//
+// The Metrics sink keeps track of metrics for the htraced daemon.
+// It is important to have good metrics so that we can properly manager 
htraced.  In particular, we
+// need to know what rate we are receiving spans at, the main places spans 
came from.  If spans
+// were dropped because of a high sampling rates, we need to know which part 
of the system dropped
+// them so that we can adjust the sampling rate there.
+//
+
+type ServerSpanMetrics struct {
+       // The total number of spans written to HTraced.
+       Written uint64
+
+       // The total number of spans dropped by the server.
+       ServerDropped uint64
+}
+
+func (spm *ServerSpanMetrics) Clone() *ServerSpanMetrics {
+       return &ServerSpanMetrics{
+               Written:       spm.Written,
+               ServerDropped: spm.ServerDropped,
+       }
+}
+
+func (spm *ServerSpanMetrics) String() string {
+       jbytes, err := json.Marshal(*spm)
+       if err != nil {
+               panic(err)
+       }
+       return string(jbytes)
+}
+
+func (spm *ServerSpanMetrics) Add(ospm *ServerSpanMetrics) {
+       spm.Written += ospm.Written
+       spm.ServerDropped += ospm.ServerDropped
+}
+
+func (spm *ServerSpanMetrics) Clear() {
+       spm.Written = 0
+       spm.ServerDropped = 0
+}
+
+// A map from network address strings to ServerSpanMetrics structures.
+type ServerSpanMetricsMap map[string]*ServerSpanMetrics
+
+func (smtxMap ServerSpanMetricsMap) IncrementDropped(addr string, maxMtx int,
+       lg *common.Logger) {
+       mtx := smtxMap[addr]
+       if mtx == nil {
+               mtx = &ServerSpanMetrics{}
+               smtxMap[addr] = mtx
+       }
+       mtx.ServerDropped++
+       smtxMap.Prune(maxMtx, lg)
+}
+
+func (smtxMap ServerSpanMetricsMap) IncrementWritten(addr string, maxMtx int,
+       lg *common.Logger) {
+       mtx := smtxMap[addr]
+       if mtx == nil {
+               mtx = &ServerSpanMetrics{}
+               smtxMap[addr] = mtx
+       }
+       mtx.Written++
+       smtxMap.Prune(maxMtx, lg)
+}
+
+func (smtxMap ServerSpanMetricsMap) Prune(maxMtx int, lg *common.Logger) {
+       if len(smtxMap) >= maxMtx {
+               // Delete a random entry
+               for k := range smtxMap {
+                       lg.Warnf("Evicting metrics entry for addr %s "+
+                               "because there are more than %d addrs.\n", k, 
maxMtx)
+                       delete(smtxMap, k)
+                       return
+               }
+       }
+}
+
+type AccessReq struct {
+       mtxMap common.SpanMetricsMap
+       done   chan interface{}
+}
+
+type MetricsSink struct {
+       // The total span metrics.
+       smtxMap ServerSpanMetricsMap
+
+       // A channel of incoming shard metrics.
+       // When this is shut down, the MetricsSink will exit.
+       updateReqs chan ServerSpanMetricsMap
+
+       // A channel of incoming requests for shard metrics.
+       accessReqs chan *AccessReq
+
+       // This will be closed when the MetricsSink has exited.
+       exited chan interface{}
+
+       // The logger used by this MetricsSink.
+       lg *common.Logger
+
+       // The maximum number of metrics totals we will maintain.
+       maxMtx int
+
+       // The number of spans which each client has self-reported that it has
+       // dropped.
+       clientDroppedMap map[string]uint64
+
+       // Lock protecting clientDropped
+       clientDroppedLock sync.Mutex
+}
+
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+       mcl := MetricsSink{
+               smtxMap:          make(ServerSpanMetricsMap),
+               updateReqs:       make(chan ServerSpanMetricsMap, 128),
+               accessReqs:       make(chan *AccessReq),
+               exited:           make(chan interface{}),
+               lg:               common.NewLogger("metrics", cnf),
+               maxMtx:           
cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+               clientDroppedMap: make(map[string]uint64),
+       }
+       go mcl.run()
+       return &mcl
+}
+
+func (msink *MetricsSink) run() {
+       lg := msink.lg
+       defer func() {
+               lg.Info("MetricsSink: stopping service goroutine.\n")
+               close(msink.exited)
+       }()
+       lg.Tracef("MetricsSink: starting.\n")
+       for {
+               select {
+               case updateReq, open := <-msink.updateReqs:
+                       if !open {
+                               lg.Trace("MetricsSink: shutting down 
cleanly.\n")
+                               return
+                       }
+                       for addr, umtx := range updateReq {
+                               smtx := msink.smtxMap[addr]
+                               if smtx == nil {
+                                       smtx = &ServerSpanMetrics{}
+                                       msink.smtxMap[addr] = smtx
+                               }
+                               smtx.Add(umtx)
+                               if lg.TraceEnabled() {
+                                       lg.Tracef("MetricsSink: updated %s to 
%s\n", addr, smtx.String())
+                               }
+                       }
+                       msink.smtxMap.Prune(msink.maxMtx, lg)
+               case accessReq := <-msink.accessReqs:
+                       msink.handleAccessReq(accessReq)
+               }
+       }
+}
+
+func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) {
+       msink.lg.Debug("MetricsSink: accessing global metrics.\n")
+       msink.clientDroppedLock.Lock()
+       defer func() {
+               msink.clientDroppedLock.Unlock()
+               close(accessReq.done)
+       }()
+       for addr, smtx := range msink.smtxMap {
+               accessReq.mtxMap[addr] = &common.SpanMetrics{
+                       Written:       smtx.Written,
+                       ServerDropped: smtx.ServerDropped,
+                       ClientDropped: msink.clientDroppedMap[addr],
+               }
+       }
+}
+
+func (msink *MetricsSink) AccessTotals() common.SpanMetricsMap {
+       accessReq := &AccessReq{
+               mtxMap: make(common.SpanMetricsMap),
+               done:   make(chan interface{}),
+       }
+       msink.accessReqs <- accessReq
+       <-accessReq.done
+       return accessReq.mtxMap
+}
+
+func (msink *MetricsSink) UpdateMetrics(mtxMap ServerSpanMetricsMap) {
+       msink.updateReqs <- mtxMap
+}
+
+func (msink *MetricsSink) Shutdown() {
+       close(msink.updateReqs)
+       <-msink.exited
+}
+
+func (msink *MetricsSink) UpdateClientDropped(client string, clientDropped 
uint64) {
+       msink.clientDroppedLock.Lock()
+       defer msink.clientDroppedLock.Unlock()
+       msink.clientDroppedMap[client] = clientDropped
+       if len(msink.clientDroppedMap) >= msink.maxMtx {
+               // Delete a random entry
+               for k := range msink.clientDroppedMap {
+                       delete(msink.clientDroppedMap, k)
+                       return
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
new file mode 100644
index 0000000..c90d1da
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "reflect"
+       "testing"
+       "time"
+)
+
+func TestMetricsSinkStartupShutdown(t *testing.T) {
+       cnfBld := conf.Builder{
+               Values:   conf.TEST_VALUES(),
+               Defaults: conf.DEFAULTS,
+       }
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create conf: %s", err.Error())
+       }
+       msink := NewMetricsSink(cnf)
+       msink.Shutdown()
+}
+
+func TestAddSpanMetrics(t *testing.T) {
+       a := &ServerSpanMetrics{
+               Written:       100,
+               ServerDropped: 200,
+       }
+       b := &ServerSpanMetrics{
+               Written:       500,
+               ServerDropped: 100,
+       }
+       a.Add(b)
+       if a.Written != 600 {
+               t.Fatalf("SpanMetrics#Add failed to update #Written")
+       }
+       if a.ServerDropped != 300 {
+               t.Fatalf("SpanMetrics#Add failed to update #Dropped")
+       }
+       if b.Written != 500 {
+               t.Fatalf("SpanMetrics#Add updated b#Written")
+       }
+       if b.ServerDropped != 100 {
+               t.Fatalf("SpanMetrics#Add updated b#Dropped")
+       }
+}
+
+func compareTotals(a, b common.SpanMetricsMap) bool {
+       for k, v := range a {
+               if !reflect.DeepEqual(v, b[k]) {
+                       return false
+               }
+       }
+       for k, v := range b {
+               if !reflect.DeepEqual(v, a[k]) {
+                       return false
+               }
+       }
+       return true
+}
+
+func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) {
+       for {
+               time.Sleep(1 * time.Millisecond)
+               totals := msink.AccessTotals()
+               if compareTotals(totals, expectedTotals) {
+                       return
+               }
+       }
+}
+
+func TestMetricsSinkMessages(t *testing.T) {
+       cnfBld := conf.Builder{
+               Values:   conf.TEST_VALUES(),
+               Defaults: conf.DEFAULTS,
+       }
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create conf: %s", err.Error())
+       }
+       msink := NewMetricsSink(cnf)
+       totals := msink.AccessTotals()
+       if len(totals) != 0 {
+               t.Fatalf("Expected no data in the MetricsSink to start with.")
+       }
+       msink.UpdateMetrics(ServerSpanMetricsMap{
+               "192.168.0.100": &ServerSpanMetrics{
+                       Written:       20,
+                       ServerDropped: 10,
+               },
+       })
+       waitForMetrics(msink, common.SpanMetricsMap{
+               "192.168.0.100": &common.SpanMetrics{
+                       Written:       20,
+                       ServerDropped: 10,
+               },
+       })
+       msink.UpdateMetrics(ServerSpanMetricsMap{
+               "192.168.0.100": &ServerSpanMetrics{
+                       Written:       200,
+                       ServerDropped: 100,
+               },
+       })
+       msink.UpdateMetrics(ServerSpanMetricsMap{
+               "192.168.0.100": &ServerSpanMetrics{
+                       Written:       1000,
+                       ServerDropped: 1000,
+               },
+       })
+       waitForMetrics(msink, common.SpanMetricsMap{
+               "192.168.0.100": &common.SpanMetrics{
+                       Written:       1220,
+                       ServerDropped: 1110,
+               },
+       })
+       msink.UpdateMetrics(ServerSpanMetricsMap{
+               "192.168.0.200": &ServerSpanMetrics{
+                       Written:       200,
+                       ServerDropped: 100,
+               },
+       })
+       waitForMetrics(msink, common.SpanMetricsMap{
+               "192.168.0.100": &common.SpanMetrics{
+                       Written:       1220,
+                       ServerDropped: 1110,
+               },
+               "192.168.0.200": &common.SpanMetrics{
+                       Written:       200,
+                       ServerDropped: 100,
+               },
+       })
+       msink.Shutdown()
+}
+
+func TestMetricsSinkMessagesEviction(t *testing.T) {
+       cnfBld := conf.Builder{
+               Values:   conf.TEST_VALUES(),
+               Defaults: conf.DEFAULTS,
+       }
+       cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
+       cnfBld.Values[conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS] = "1"
+       cnf, err := cnfBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create conf: %s", err.Error())
+       }
+       msink := NewMetricsSink(cnf)
+       msink.UpdateMetrics(ServerSpanMetricsMap{
+               "192.168.0.100": &ServerSpanMetrics{
+                       Written:       20,
+                       ServerDropped: 10,
+               },
+               "192.168.0.101": &ServerSpanMetrics{
+                       Written:       20,
+                       ServerDropped: 10,
+               },
+               "192.168.0.102": &ServerSpanMetrics{
+                       Written:       20,
+                       ServerDropped: 10,
+               },
+       })
+       for {
+               totals := msink.AccessTotals()
+               if len(totals) == 2 {
+                       break
+               }
+       }
+       msink.Shutdown()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index a54f2cb..c2300c4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -90,11 +90,15 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, 
error) {
                        }
                }
        }
+       // Copy the default test configuration values.
+       for k, v := range conf.TEST_VALUES() {
+               _, hasVal := bld.Cnf[k]
+               if !hasVal {
+                       bld.Cnf[k] = v
+               }
+       }
        bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
                strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
-       bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0"  // use a random port for the 
REST server
-       bld.Cnf[conf.HTRACE_HRPC_ADDRESS] = ":0" // use a random port for the 
HRPC server
-       bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE"
        cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
        cnf, err := cnfBld.Build()
        if err != nil {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 16c3a75..eca3f08 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -90,7 +90,7 @@ func (hand *serverStatsHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Reque
 
 type serverConfHandler struct {
        cnf *conf.Config
-       lg *common.Logger
+       lg  *common.Logger
 }
 
 func (hand *serverConfHandler) ServeHTTP(w http.ResponseWriter, req 
*http.Request) {
@@ -233,9 +233,13 @@ func (hand *writeSpansHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Reques
                if spanIdProblem != "" {
                        hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", 
spanIdProblem))
                } else {
-                       hand.store.WriteSpan(span)
+                       hand.store.WriteSpan(&IncomingSpan{
+                               Addr: req.RemoteAddr,
+                               Span: span,
+                       })
                }
        }
+       hand.store.msink.UpdateClientDropped(req.RemoteAddr, msg.ClientDropped)
 }
 
 type queryHandler struct {

Reply via email to