Repository: incubator-htrace
Updated Branches:
  refs/heads/master fe19368a3 -> 699c8cf80


HTRACE-298. htraced: improve datastore serialization and metrics (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/699c8cf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/699c8cf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/699c8cf8

Branch: refs/heads/master
Commit: 699c8cf80058913a89988552ef2d357690309b6f
Parents: fe19368
Author: Colin P. Mccabe <[email protected]>
Authored: Sat Nov 21 13:12:20 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Sat Nov 21 13:26:04 2015 -0800

----------------------------------------------------------------------
 htrace-htraced/go/Godeps/Godeps.json            |   2 +-
 .../go/src/org/apache/htrace/common/rpc.go      |  31 +-
 .../src/org/apache/htrace/conf/config_keys.go   |   8 +-
 .../org/apache/htrace/htraced/client_test.go    |  98 ++++++
 .../src/org/apache/htrace/htraced/datastore.go  | 254 +++++++++++----
 .../org/apache/htrace/htraced/datastore_test.go |  61 ++--
 .../go/src/org/apache/htrace/htraced/hrpc.go    |  23 +-
 .../go/src/org/apache/htrace/htraced/metrics.go | 319 ++++++-------------
 .../org/apache/htrace/htraced/metrics_test.go   | 156 ++-------
 .../org/apache/htrace/htraced/reaper_test.go    |  11 +-
 .../go/src/org/apache/htrace/htraced/rest.go    |  23 +-
 .../go/src/org/apache/htrace/htracedTool/cmd.go |   9 +-
 .../src/main/webapp/app/server_info_view.js     |   4 +-
 .../src/main/webapp/app/server_stats.js         |   7 +-
 htrace-webapp/src/main/webapp/index.html        |  12 +-
 15 files changed, 491 insertions(+), 527 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/Godeps/Godeps.json
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/Godeps/Godeps.json 
b/htrace-htraced/go/Godeps/Godeps.json
index 47aa90e..7c737fe 100644
--- a/htrace-htraced/go/Godeps/Godeps.json
+++ b/htrace-htraced/go/Godeps/Godeps.json
@@ -24,7 +24,7 @@
         },
         {
             "ImportPath": "github.com/ugorji/go/codec",
-            "Rev": "08bbe4aa39b9f189f4e294b5c8408b5fa5787bb2"
+            "Rev": "1a8bf87a90ddcdc7deaa0038f127ac62135fdd58"
         }
     ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go 
b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 2627c26..6375688 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,7 +38,7 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
 
 // A request to write spans to htraced.
 type WriteSpansReq struct {
-       Addr          string // This gets filled in by the RPC layer.
+       Addr          string `json:",omitempty"` // This gets filled in by the 
RPC layer.
        DefaultTrid   string `json:",omitempty"`
        Spans         []*Span
        ClientDropped uint64 `json:",omitempty"`
@@ -98,10 +98,19 @@ type SpanMetrics struct {
        // The total number of spans dropped by the server.
        ServerDropped uint64
 
-       // The total number of spans dropped by the client.  Note that this 
number
-       // is tracked on the client itself and doesn't get updated if the client
-       // can't contact the server.
-       ClientDropped uint64
+       // The total number of spans dropped by the client.
+       //
+       // This number is just an estimate and may be incorrect for many 
reasons.
+       // If the client can't contact the server at all, then obviously the 
server
+       // will never increment ClientDropped... even though spans are being
+       // dropped.  The client may also tell the server about some new spans it
+       // has dropped, but then for some reason fail to get the acknowledgement
+       // from the server.  In that case, the client would re-send its client
+       // dropped estimate and it would be double-counted by the server
+       //
+       // The intention here is to provide a rough estimate of how overloaded
+       // htraced clients are, not to provide strongly consistent numbers.
+       ClientDroppedEstimate uint64
 }
 
 // A map from network address strings to SpanMetrics structures.
@@ -130,9 +139,15 @@ type ServerStats struct {
        // those that did.
        IngestedSpans uint64
 
-       // The total number of spans which have been dropped by clients since 
the server started,
-       // as reported by WriteSpans requests.
-       ClientDroppedSpans uint64
+       // The total number of spans which have been written to leveldb since 
the server started.
+       WrittenSpans uint64
+
+       // The total number of spans dropped by the server since the server 
started.
+       ServerDroppedSpans uint64
+
+       // An estimate of the total number of spans dropped by the server since 
the server started.
+       // See SpanMetrics#ClientDroppedEstimate
+       ClientDroppedEstimate uint64
 
        // The maximum latency of a writeSpans request, in milliseconds.
        MaxWriteSpansLatencyMs uint32

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go 
b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index 511833c..573ce21 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -68,9 +68,9 @@ const HTRACE_LOG_PATH = "log.path"
 // The log level to use for the logs in htrace.
 const HTRACE_LOG_LEVEL = "log.level"
 
-// The period between metrics heartbeats.  This is the approximate interval at 
which we will
-// update global metrics.
-const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms"
+// The period between datastore heartbeats.  This is the approximate interval 
at which we will
+// prune expired spans.
+const HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS = "datastore.heartbeat.period.ms"
 
 // The maximum number of addresses for which we will maintain metrics.
 const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
@@ -106,7 +106,7 @@ var DEFAULTS = map[string]string{
        HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100",
        HTRACE_LOG_PATH:                    "",
        HTRACE_LOG_LEVEL:                   "INFO",
-       HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
+       HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
        HTRACE_METRICS_MAX_ADDR_ENTRIES:    "100000",
        HTRACE_SPAN_EXPIRY_MS:              "0",
        HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  fmt.Sprintf("%d", 90*1000),

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index fae871c..36e8369 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -21,6 +21,8 @@ package main
 
 import (
        "fmt"
+       "github.com/ugorji/go/codec"
+       "math"
        "math/rand"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
@@ -391,3 +393,99 @@ func TestHrpcIoTimeout(t *testing.T) {
        close(finishClient)
        wg.Wait()
 }
+
+func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) {
+       htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
+               Cnf: map[string]string{
+                       conf.HTRACE_LOG_LEVEL: "INFO",
+               },
+               WrittenSpans: common.NewSemaphore(int64(1-N)),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       rnd := rand.New(rand.NewSource(1))
+       allSpans := make([]*common.Span, N)
+       for n := 0; n < N; n++ {
+               allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
+       }
+       // Determine how many calls to WriteSpans we should make.  Each 
writeSpans
+       // message should be small enough so that it doesn't exceed the max RPC
+       // body length limit.  TODO: a production-quality golang client would do
+       // this internally rather than needing us to do it here in the unit 
test.
+       bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
+       reqs := make([]*common.WriteSpansReq, 0, 4)
+       curReq := -1
+       curReqLen := bodyLen
+       var curReqSpans uint32
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       var mbuf [8192]byte
+       buf := mbuf[:0]
+       enc := codec.NewEncoderBytes(&buf, mh)
+       for n := 0; n < N; n++ {
+               span := allSpans[n]
+               if (curReqSpans >= maxSpansPerRpc) ||
+                          (curReqLen >= bodyLen) {
+                       reqs = append(reqs, &common.WriteSpansReq{})
+                       curReqLen = 0
+                       curReq++
+                       curReqSpans = 0
+               }
+               buf = mbuf[:0]
+               enc.ResetBytes(&buf)
+               err := enc.Encode(span)
+               if err != nil {
+                       panic(fmt.Sprintf("Error encoding span %s: %s\n",
+                               span.String(), err.Error()))
+               }
+               bufLen := len(buf)
+               if bufLen > (bodyLen / 5) {
+                       panic(fmt.Sprintf("Span too long at %d bytes\n", 
bufLen))
+               }
+               curReqLen += bufLen
+               reqs[curReq].Spans = append(reqs[curReq].Spans, span)
+               curReqSpans++
+       }
+       ht.Store.lg.Infof("num spans: %d.  num WriteSpansReq calls: %d\n", N, 
len(reqs))
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               panic(fmt.Sprintf("failed to create client: %s", err.Error()))
+       }
+       defer hcl.Close()
+
+       // Reset the timer to avoid including the time required to create new
+       // random spans in the benchmark total.
+       if b != nil {
+               b.ResetTimer()
+       }
+
+       // Write many random spans.
+       for reqIdx := range(reqs) {
+               go func() {
+                       err = hcl.WriteSpans(reqs[reqIdx])
+                       if err != nil {
+                               panic(fmt.Sprintf("failed to send WriteSpans 
request %d: %s",
+                                       reqIdx, err.Error()))
+                       }
+               }()
+       }
+       // Wait for all the spans to be written.
+       ht.Store.WrittenSpans.Wait()
+}
+
+// This is a test of how quickly we can create new spans via WriteSpans RPCs.
+// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore.
+// Unlike that benchmark, it sends the spans via RPC.
+// Suggested flags for running this:
+// -tags unsafe -cpu 16 -benchtime=1m
+func BenchmarkWriteSpans(b *testing.B) {
+       doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b)
+}
+
+func TestWriteSpansRpcs(t *testing.T) {
+       doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil)
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 8cd1526..9310e6e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -26,6 +26,7 @@ import (
        "errors"
        "fmt"
        "github.com/jmhodges/levigo"
+       "github.com/ugorji/go/codec"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "os"
@@ -66,7 +67,7 @@ import (
 //
 
 const UNKNOWN_LAYOUT_VERSION = 0
-const CURRENT_LAYOUT_VERSION = 2
+const CURRENT_LAYOUT_VERSION = 3
 
 var EMPTY_BYTE_BUF []byte = []byte{}
 
@@ -89,6 +90,9 @@ type IncomingSpan struct {
 
        // The span.
        *common.Span
+
+       // Serialized span data
+       SpanDataBytes []byte
 }
 
 // A single directory containing a levelDB instance.
@@ -103,19 +107,13 @@ type shard struct {
        path string
 
        // Incoming requests to write Spans.
-       incoming chan *IncomingSpan
+       incoming chan []*IncomingSpan
 
        // A channel for incoming heartbeats
        heartbeats chan interface{}
 
        // Tracks whether the shard goroutine has exited.
        exited sync.WaitGroup
-
-       // Per-address metrics
-       mtxMap ServerSpanMetricsMap
-
-       // The maximum number of metrics to allow in our map
-       maxMtx int
 }
 
 // Process incoming spans for a shard.
@@ -127,24 +125,33 @@ func (shd *shard) processIncoming() {
        }()
        for {
                select {
-               case span := <-shd.incoming:
-                       if span == nil {
+               case spans := <-shd.incoming:
+                       if spans == nil {
                                return
                        }
-                       err := shd.writeSpan(span)
-                       if err != nil {
-                               lg.Errorf("Shard processor for %s got fatal 
error %s.\n", shd.path, err.Error())
-                       } else if lg.TraceEnabled() {
-                               lg.Tracef("Shard processor for %s wrote span 
%s.\n", shd.path, span.ToJson())
+                       totalWritten := 0
+                       totalDropped := 0
+                       for spanIdx := range(spans) {
+                               err := shd.writeSpan(spans[spanIdx])
+                               if err != nil {
+                                       lg.Errorf("Shard processor for %s got 
fatal error %s.\n",
+                                               shd.path, err.Error())
+                                       totalDropped++
+                               } else {
+                                       if lg.TraceEnabled() {
+                                               lg.Tracef("Shard processor for 
%s wrote span %s.\n",
+                                                       shd.path, 
spans[spanIdx].ToJson())
+                                       }
+                                       totalWritten++
+                               }
+                       }
+                       shd.store.msink.UpdatePersisted(spans[0].Addr, 
totalWritten, totalDropped)
+                       if shd.store.WrittenSpans != nil {
+                               lg.Debugf("Shard %s incrementing WrittenSpans 
by %d\n", shd.path, len(spans))
+                               shd.store.WrittenSpans.Posts(int64(len(spans)))
                        }
                case <-shd.heartbeats:
                        lg.Tracef("Shard processor for %s handling 
heartbeat.\n", shd.path)
-                       mtxMap := make(ServerSpanMetricsMap)
-                       for addr, mtx := range shd.mtxMap {
-                               mtxMap[addr] = mtx.Clone()
-                               mtx.Clear()
-                       }
-                       shd.store.msink.UpdateMetrics(mtxMap)
                        shd.pruneExpired()
                }
        }
@@ -246,21 +253,10 @@ func u64toSlice(val uint64) []byte {
 func (shd *shard) writeSpan(ispan *IncomingSpan) error {
        batch := levigo.NewWriteBatch()
        defer batch.Close()
-
-       // Add SpanData to batch.
-       spanDataBuf := new(bytes.Buffer)
-       spanDataEnc := gob.NewEncoder(spanDataBuf)
        span := ispan.Span
-       err := spanDataEnc.Encode(span.SpanData)
-       if err != nil {
-               shd.store.lg.Errorf("Error encoding span %s: %s\n",
-                       span.String(), err.Error())
-               shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, 
shd.store.lg)
-               return err
-       }
        primaryKey :=
                append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
-       batch.Put(primaryKey, spanDataBuf.Bytes())
+       batch.Put(primaryKey, ispan.SpanDataBytes)
 
        // Add this to the parent index.
        for parentIdx := range span.Parents {
@@ -280,17 +276,12 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error {
                u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
        batch.Put(durationKey, EMPTY_BYTE_BUF)
 
-       err = shd.ldb.Write(shd.store.writeOpts, batch)
+       err := shd.ldb.Write(shd.store.writeOpts, batch)
        if err != nil {
                shd.store.lg.Errorf("Error writing span %s to leveldb at %s: 
%s\n",
                        span.String(), shd.path, err.Error())
-               shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, 
shd.store.lg)
                return err
        }
-       shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
-       if shd.store.WrittenSpans != nil {
-               shd.store.WrittenSpans.Post()
-       }
        return nil
 }
 
@@ -510,13 +501,11 @@ func CreateDataStore(cnf *conf.Config, writtenSpans 
*common.Semaphore) (*dataSto
        for idx := range store.shards {
                shd := store.shards[idx]
                shd.heartbeats = make(chan interface{}, 1)
-               shd.mtxMap = make(ServerSpanMetricsMap)
-               shd.maxMtx = store.msink.maxMtx
                shd.exited.Add(1)
                go shd.processIncoming()
        }
        store.hb = NewHeartbeater("DatastoreHeartbeater",
-               cnf.GetInt64(conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS), lg)
+               cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), lg)
        for shdIdx := range store.shards {
                shd := store.shards[shdIdx]
                store.hb.AddHeartbeatTarget(&HeartbeatTarget{
@@ -606,7 +595,7 @@ func CreateShard(store *dataStore, cnf *conf.Config, path 
string,
        }
        spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
        shd = &shard{store: store, ldb: ldb, path: path,
-               incoming: make(chan *IncomingSpan, spanBufferSize)}
+               incoming: make(chan []*IncomingSpan, spanBufferSize)}
        return shd, nil
 }
 
@@ -654,10 +643,6 @@ func (store *dataStore) Close() {
                store.rpr.Shutdown()
                store.rpr = nil
        }
-       if store.msink != nil {
-               store.msink.Shutdown()
-               store.msink = nil
-       }
        if store.readOpts != nil {
                store.readOpts.Close()
                store.readOpts = nil
@@ -677,8 +662,158 @@ func (store *dataStore) getShardIndex(sid common.SpanId) 
int {
        return int(sid.Hash32() % uint32(len(store.shards)))
 }
 
-func (store *dataStore) WriteSpan(span *IncomingSpan) {
-       store.shards[store.getShardIndex(span.Id)].incoming <- span
+const WRITESPANS_BATCH_SIZE = 128
+
+// SpanIngestor is a class used internally to ingest spans from an RPC
+// endpoint.  It groups spans destined for a particular shard into small
+// batches, so that we can reduce the number of objects that need to be sent
+// over the shard's "incoming" channel.  Since sending objects over a channel
+// requires goroutine synchronization, this improves performance.
+//
+// SpanIngestor also allows us to reuse the same encoder object for many spans,
+// rather than creating a new encoder per span.  This avoids re-doing the
+// encoder setup for each span, and also generates less garbage.
+type SpanIngestor struct {
+       // The logger to use.
+       lg              *common.Logger
+
+       // The dataStore we are ingesting spans into.
+       store           *dataStore
+
+       // The remote address these spans are coming from.
+       addr            string
+
+       // Default TracerId
+       defaultTrid     string
+
+       // The msgpack handle to use to serialize the spans.
+       mh              codec.MsgpackHandle
+
+       // The msgpack encoder to use to serialize the spans.
+       // Caching this avoids generating a lot of garbage and burning CPUs 
+       // creating new encoder objects for each span.
+       enc             *codec.Encoder
+
+       // The buffer which codec.Encoder is currently serializing to. 
+       // We have to create a new buffer for each span because once we hand it 
off to the shard, the
+       // shard manages the buffer lifecycle.
+       spanDataBytes   []byte
+
+       // An array mapping shard index to span batch.
+       batches         []*SpanIngestorBatch
+
+       // The total number of spans ingested.  Includes dropped spans. 
+       totalIngested   int
+
+       // The total number of spans the ingestor dropped because of a 
server-side error.
+       serverDropped   int
+}
+
+// A batch of spans destined for a particular shard.
+type SpanIngestorBatch struct {
+       incoming        []*IncomingSpan
+}
+
+func (store *dataStore) NewSpanIngestor(lg *common.Logger,
+               addr string, defaultTrid string) *SpanIngestor {
+       ing := &SpanIngestor {
+               lg: lg,
+               store: store,
+               addr: addr,
+               defaultTrid: defaultTrid,
+               spanDataBytes: make([]byte, 0, 1024),
+               batches: make([]*SpanIngestorBatch, len(store.shards)),
+       }
+       ing.mh.WriteExt = true
+       ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh)
+       for batchIdx := range(ing.batches) {
+               ing.batches[batchIdx] = &SpanIngestorBatch {
+                       incoming: make([]*IncomingSpan, 0, 
WRITESPANS_BATCH_SIZE),
+               }
+       }
+       return ing
+}
+
+func (ing *SpanIngestor) IngestSpan(span *common.Span) {
+       ing.totalIngested++
+       // Make sure the span ID is valid.
+       spanIdProblem := span.Id.FindProblem()
+       if spanIdProblem != "" {
+               // Can't print the invalid span ID because String() might fail.
+               ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem)
+               ing.serverDropped++
+               return
+       }
+
+       // Set the default tracer id, if needed.
+       if span.TracerId == "" {
+               span.TracerId = ing.defaultTrid
+       }
+
+       // Encode the span data.  Doing the encoding here is better than doing 
it
+       // in the shard goroutine, because we can achieve more parallelism.
+       // There is one shard goroutine per shard, but potentially many more
+       // ingestors per shard.
+       err := ing.enc.Encode(span.SpanData)
+       if err != nil {
+               ing.lg.Warnf("Failed to encode span ID %s: %s\n",
+                       span.Id.String(), err.Error())
+               ing.serverDropped++
+               return
+       }
+       spanDataBytes := ing.spanDataBytes
+       ing.spanDataBytes = make([]byte, 0, 1024)
+       ing.enc.ResetBytes(&ing.spanDataBytes)
+
+       // Determine which shard this span should go to.
+       shardIdx := ing.store.getShardIndex(span.Id)
+       batch := ing.batches[shardIdx]
+       incomingLen := len(batch.incoming)
+       if ing.lg.TraceEnabled() {
+               ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, 
" +
+                       "incomingLen=%d, cap(batch.incoming)=%d\n",
+                       span.Id.String(), shardIdx, incomingLen, 
cap(batch.incoming))
+       }
+       if incomingLen + 1 == cap(batch.incoming) {
+               if ing.lg.TraceEnabled() {
+                       ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d 
spans for " +
+                               "shard %d\n", len(batch.incoming), shardIdx)
+               }
+               ing.store.WriteSpans(shardIdx, batch.incoming)
+               batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE)
+               incomingLen = 0
+       } else {
+               batch.incoming = batch.incoming[0:incomingLen+1]
+       }
+       batch.incoming[incomingLen] = &IncomingSpan {
+               Addr: ing.addr,
+               Span: span,
+               SpanDataBytes: spanDataBytes,
+       }
+}
+
+func (ing *SpanIngestor) Close(clientDropped int, startTime time.Time) {
+       for shardIdx := range(ing.batches) {
+               batch := ing.batches[shardIdx]
+               if len(batch.incoming) > 0 {
+                       if ing.lg.TraceEnabled() {
+                               ing.lg.Tracef("SpanIngestor#Close: flushing %d 
span(s) for " +
+                                       "shard %d\n", len(batch.incoming), 
shardIdx)
+                       }
+                       ing.store.WriteSpans(shardIdx, batch.incoming)
+               }
+               batch.incoming = nil
+       }
+       ing.lg.Debugf("Closed span ingestor for %s.  Ingested %d span(s); 
dropped " +
+               "%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped)
+
+       endTime := time.Now()
+       ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested,
+               ing.serverDropped, clientDropped, endTime.Sub(startTime))
+}
+
+func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) {
+       store.shards[shardIdx].incoming <- ispans
 }
 
 func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
@@ -709,14 +844,14 @@ func (shd *shard) FindSpan(sid common.SpanId) 
*common.Span {
 
 func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, 
error) {
        r := bytes.NewBuffer(buf)
-       decoder := gob.NewDecoder(r)
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       decoder := codec.NewDecoder(r, mh)
        data := common.SpanData{}
        err := decoder.Decode(&data)
        if err != nil {
                return nil, err
        }
-       // Gob encoding translates empty slices to nil.  Reverse this so that 
we're always dealing with
-       // non-nil slices.
        if data.Parents == nil {
                data.Parents = []common.SpanId{}
        }
@@ -1259,19 +1394,6 @@ func (store *dataStore) ServerStats() 
*common.ServerStats {
        serverStats.LastStartMs = store.startMs
        serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
        serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
-       wsData := store.msink.wsm.GetData()
-       serverStats.HostSpanMetrics = store.msink.AccessServerTotals()
-       for k, v := range wsData.clientDroppedMap {
-               smtx := serverStats.HostSpanMetrics[k]
-               if smtx == nil {
-                       smtx = &common.SpanMetrics {}
-                       serverStats.HostSpanMetrics[k] = smtx
-               }
-               smtx.ClientDropped = v
-       }
-       serverStats.IngestedSpans = wsData.ingestedSpans
-       serverStats.ClientDroppedSpans = wsData.clientDroppedSpans
-       serverStats.MaxWriteSpansLatencyMs = wsData.latencyMax
-       serverStats.AverageWriteSpansLatencyMs = wsData.latencyAverage
+       store.msink.PopulateServerStats(&serverStats)
        return &serverStats
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index d9f4a0a..e6d1df7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -73,12 +73,11 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{
 }
 
 func createSpans(spans []common.Span, store *dataStore) {
+       ing := store.NewSpanIngestor(store.lg, "127.0.0.1", "")
        for idx := range spans {
-               store.WriteSpan(&IncomingSpan{
-                       Addr: "127.0.0.1",
-                       Span: &spans[idx],
-               })
+               ing.IngestSpan(&spans[idx])
        }
+       ing.Close(0, time.Now())
        store.WrittenSpans.Waits(int64(len(spans)))
 }
 
@@ -87,7 +86,7 @@ func TestDatastoreWriteAndRead(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
                Cnf: map[string]string{
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
                },
                WrittenSpans: common.NewSemaphore(0),
        }
@@ -151,7 +150,7 @@ func TestSimpleQuery(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
                Cnf: map[string]string{
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
                },
                WrittenSpans: common.NewSemaphore(0),
        }
@@ -161,11 +160,8 @@ func TestSimpleQuery(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1": &common.SpanMetrics{
-                       Written: uint64(len(SIMPLE_TEST_SPANS)),
-               },
-       })
+
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
 
        testQuery(t, ht, &common.Query{
                Predicates: []common.Predicate{
@@ -183,7 +179,7 @@ func TestQueries2(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
                Cnf: map[string]string{
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
                },
                WrittenSpans: common.NewSemaphore(0),
        }
@@ -193,11 +189,7 @@ func TestQueries2(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1": &common.SpanMetrics{
-                       Written: uint64(len(SIMPLE_TEST_SPANS)),
-               },
-       })
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
        testQuery(t, ht, &common.Query{
                Predicates: []common.Predicate{
                        common.Predicate{
@@ -241,7 +233,7 @@ func TestQueries3(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
                Cnf: map[string]string{
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
                },
                WrittenSpans: common.NewSemaphore(0),
        }
@@ -251,11 +243,7 @@ func TestQueries3(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1": &common.SpanMetrics{
-                       Written: uint64(len(SIMPLE_TEST_SPANS)),
-               },
-       })
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
        testQuery(t, ht, &common.Query{
                Predicates: []common.Predicate{
                        common.Predicate{
@@ -299,7 +287,7 @@ func TestQueries4(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
                Cnf: map[string]string{
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
                },
                WrittenSpans: common.NewSemaphore(0),
        }
@@ -345,7 +333,7 @@ func TestQueries4(t *testing.T) {
 func BenchmarkDatastoreWrites(b *testing.B) {
        htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
                Cnf: map[string]string{
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "15000",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
                        conf.HTRACE_LOG_LEVEL: "INFO",
                },
                WrittenSpans: common.NewSemaphore(0),
@@ -372,25 +360,20 @@ func BenchmarkDatastoreWrites(b *testing.B) {
        b.ResetTimer()
 
        // Write many random spans.
+       ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
        for n := 0; n < b.N; n++ {
-               ht.Store.WriteSpan(&IncomingSpan{
-                       Addr: "127.0.0.1",
-                       Span: allSpans[n],
-               })
+               ing.IngestSpan(allSpans[n])
        }
+       ing.Close(0, time.Now())
        // Wait for all the spans to be written.
        ht.Store.WrittenSpans.Waits(int64(b.N))
-       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1": &common.SpanMetrics{
-                       Written: uint64(b.N), // should be less than?
-               },
-       })
+       assertNumWrittenEquals(b, ht.Store.msink, b.N)
 }
 
 func TestReloadDataStore(t *testing.T) {
        htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
                Cnf: map[string]string{
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
                },
                DataDirs: make([]string, 2),
                KeepDataDirsOnClose: true,
@@ -494,7 +477,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
        t.Parallel()
        htraceBld := &MiniHTracedBuilder{Name: 
"TestQueriesWithContinuationTokens1",
                Cnf: map[string]string{
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
                },
                WrittenSpans: common.NewSemaphore(0),
        }
@@ -504,11 +487,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1": &common.SpanMetrics{
-                       Written: uint64(len(SIMPLE_TEST_SPANS)),
-               },
-       })
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
        // Adding a prev value to this query excludes the first result that we
        // would normally get.
        testQuery(t, ht, &common.Query{

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index a0f2e81..a649420 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -266,26 +266,11 @@ func (hand *HrpcHandler) WriteSpans(req 
*common.WriteSpansReq,
                return errors.New(fmt.Sprintf("Failed to split host and port " +
                        "for %s: %s\n", req.Addr, err.Error()))
        }
-       for i := range req.Spans {
-               span := req.Spans[i]
-               spanIdProblem := span.Id.FindProblem()
-               if spanIdProblem != "" {
-                       return errors.New(fmt.Sprintf("Invalid span ID: %s", 
spanIdProblem))
-               }
-               if span.TracerId == "" {
-                       span.TracerId = req.DefaultTrid
-               }
-               if hand.lg.TraceEnabled() {
-                       hand.lg.Tracef("writing span %d: %s\n", i, 
span.ToJson())
-               }
-               hand.store.WriteSpan(&IncomingSpan{
-                       Addr: client,
-                       Span: span,
-               })
+       ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+       for spanIdx := range req.Spans {
+               ing.IngestSpan(req.Spans[spanIdx])
        }
-       endTime := time.Now()
-       hand.store.msink.Update(client, req.ClientDropped, len(req.Spans),
-                       endTime.Sub(startTime))
+       ing.Close(int(req.ClientDropped), startTime)
        return nil
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
index cfff418..5ce3339 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
@@ -20,7 +20,6 @@
 package main
 
 import (
-       "encoding/json"
        "math"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
@@ -40,252 +39,114 @@ import (
 
 const LATENCY_CIRC_BUF_SIZE = 4096
 
-type ServerSpanMetrics struct {
-       // The total number of spans written to HTraced.
-       Written uint64
-
-       // The total number of spans dropped by the server.
-       ServerDropped uint64
-}
-
-func (spm *ServerSpanMetrics) Clone() *ServerSpanMetrics {
-       return &ServerSpanMetrics{
-               Written:       spm.Written,
-               ServerDropped: spm.ServerDropped,
-       }
-}
-
-func (spm *ServerSpanMetrics) String() string {
-       jbytes, err := json.Marshal(*spm)
-       if err != nil {
-               panic(err)
-       }
-       return string(jbytes)
-}
-
-func (spm *ServerSpanMetrics) Add(ospm *ServerSpanMetrics) {
-       spm.Written += ospm.Written
-       spm.ServerDropped += ospm.ServerDropped
-}
-
-func (spm *ServerSpanMetrics) Clear() {
-       spm.Written = 0
-       spm.ServerDropped = 0
-}
-
-// A map from network address strings to ServerSpanMetrics structures.
-type ServerSpanMetricsMap map[string]*ServerSpanMetrics
-
-func (smtxMap ServerSpanMetricsMap) IncrementDropped(addr string, maxMtx int,
-       lg *common.Logger) {
-       mtx := smtxMap[addr]
-       if mtx == nil {
-               mtx = &ServerSpanMetrics{}
-               smtxMap[addr] = mtx
-       }
-       mtx.ServerDropped++
-       smtxMap.Prune(maxMtx, lg)
-}
-
-func (smtxMap ServerSpanMetricsMap) IncrementWritten(addr string, maxMtx int,
-       lg *common.Logger) {
-       mtx := smtxMap[addr]
-       if mtx == nil {
-               mtx = &ServerSpanMetrics{}
-               smtxMap[addr] = mtx
-       }
-       mtx.Written++
-       smtxMap.Prune(maxMtx, lg)
-}
-
-func (smtxMap ServerSpanMetricsMap) Prune(maxMtx int, lg *common.Logger) {
-       if len(smtxMap) >= maxMtx {
-               // Delete a random entry
-               for k := range smtxMap {
-                       lg.Warnf("Evicting metrics entry for addr %s "+
-                               "because there are more than %d addrs.\n", k, 
maxMtx)
-                       delete(smtxMap, k)
-                       return
-               }
-       }
-}
-
-type AccessReq struct {
-       mtxMap common.SpanMetricsMap
-       done   chan interface{}
-}
-
 type MetricsSink struct {
-       // The total span metrics.
-       smtxMap ServerSpanMetricsMap
-
-       // A channel of incoming shard metrics.
-       // When this is shut down, the MetricsSink will exit.
-       updateReqs chan ServerSpanMetricsMap
-
-       // A channel of incoming requests for shard metrics.
-       accessReqs chan *AccessReq
-
-       // This will be closed when the MetricsSink has exited.
-       exited chan interface{}
-
-       // The logger used by this MetricsSink.
+       // The metrics sink logger.
        lg *common.Logger
 
-       // The maximum number of metrics totals we will maintain.
+       // The maximum number of entries we shuld allow in the HostSpanMetrics 
map.
        maxMtx int
 
-       // Metrics about WriteSpans requests
-       wsm WriteSpanMetrics
-}
+       // The total number of spans ingested by the server (counting dropped 
spans)
+       IngestedSpans uint64
 
-func NewMetricsSink(cnf *conf.Config) *MetricsSink {
-       mcl := MetricsSink{
-               smtxMap:          make(ServerSpanMetricsMap),
-               updateReqs:       make(chan ServerSpanMetricsMap, 128),
-               accessReqs:       make(chan *AccessReq),
-               exited:           make(chan interface{}),
-               lg:               common.NewLogger("metrics", cnf),
-               maxMtx:           
cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
-               wsm: WriteSpanMetrics {
-                       clientDroppedMap: make(map[string]uint64),
-                       latencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
-               },
-       }
-       go mcl.run()
-       return &mcl
-}
-
-func (msink *MetricsSink) run() {
-       lg := msink.lg
-       defer func() {
-               lg.Info("MetricsSink: stopping service goroutine.\n")
-               close(msink.exited)
-       }()
-       lg.Tracef("MetricsSink: starting.\n")
-       for {
-               select {
-               case updateReq, open := <-msink.updateReqs:
-                       if !open {
-                               lg.Trace("MetricsSink: shutting down 
cleanly.\n")
-                               return
-                       }
-                       for addr, umtx := range updateReq {
-                               smtx := msink.smtxMap[addr]
-                               if smtx == nil {
-                                       smtx = &ServerSpanMetrics{}
-                                       msink.smtxMap[addr] = smtx
-                               }
-                               smtx.Add(umtx)
-                               if lg.TraceEnabled() {
-                                       lg.Tracef("MetricsSink: updated %s to 
%s\n", addr, smtx.String())
-                               }
-                       }
-                       msink.smtxMap.Prune(msink.maxMtx, lg)
-               case accessReq := <-msink.accessReqs:
-                       msink.handleAccessReq(accessReq)
-               }
-       }
-}
-
-func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) {
-       msink.lg.Debug("MetricsSink: accessing global metrics.\n")
-       defer close(accessReq.done)
-       for addr, smtx := range msink.smtxMap {
-               accessReq.mtxMap[addr] = &common.SpanMetrics{
-                       Written:       smtx.Written,
-                       ServerDropped: smtx.ServerDropped,
-               }
-       }
-}
-
-func (msink *MetricsSink) AccessServerTotals() common.SpanMetricsMap {
-       accessReq := &AccessReq{
-               mtxMap: make(common.SpanMetricsMap),
-               done:   make(chan interface{}),
-       }
-       msink.accessReqs <- accessReq
-       <-accessReq.done
-       return accessReq.mtxMap
-}
-
-func (msink *MetricsSink) UpdateMetrics(mtxMap ServerSpanMetricsMap) {
-       msink.updateReqs <- mtxMap
-}
-
-func (msink *MetricsSink) Shutdown() {
-       close(msink.updateReqs)
-       <-msink.exited
-}
+       // The total number of spans written to leveldb since the server 
started.
+       WrittenSpans uint64
 
-type WriteSpanMetrics struct {
-       // Lock protecting WriteSpanMetrics
-       lock sync.Mutex
-
-       // The number of spans which each client has self-reported that it has
-       // dropped.
-       clientDroppedMap map[string]uint64
+       // The total number of spans dropped by the server.
+       ServerDropped uint64
 
-       // The total number of new span writes we've gotten since startup.
-       ingestedSpans uint64
+       // The total number of spans dropped by the client (self-reported).
+       ClientDroppedEstimate uint64
 
-       // The total number of spans all clients have dropped since startup.
-       clientDroppedSpans uint64
+       // Per-host Span Metrics
+       HostSpanMetrics common.SpanMetricsMap
 
        // The last few writeSpan latencies
-       latencyCircBuf *CircBufU32
-}
+       wsLatencyCircBuf *CircBufU32
 
-type WriteSpanMetricsData struct {
-       clientDroppedMap map[string]uint64
-       ingestedSpans uint64
-       clientDroppedSpans uint64
-       latencyMax uint32
-       latencyAverage uint32
+       // Lock protecting all metrics
+       lock sync.Mutex
 }
 
-func (msink *MetricsSink) Update(client string, clientDropped uint64, 
clientWritten int,
-               wsLatency time.Duration) {
-       wsLatencyNs := wsLatency.Nanoseconds() / 1000000
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+       return &MetricsSink {
+               lg:               common.NewLogger("metrics", cnf),
+               maxMtx:           
cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+               HostSpanMetrics: make(common.SpanMetricsMap),
+               wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
+       }
+}
+
+// Update the total number of spans which were ingested, as well as other
+// metrics that get updated during span ingest. 
+func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
+               serverDropped int, clientDroppedEstimate int, wsLatency 
time.Duration) {
+       msink.lock.Lock()
+       defer msink.lock.Unlock()
+       msink.IngestedSpans += uint64(totalIngested)
+       msink.ServerDropped += uint64(serverDropped)
+       msink.ClientDroppedEstimate += uint64(clientDroppedEstimate)
+       msink.updateSpanMetrics(addr, 0, serverDropped, clientDroppedEstimate)
+       wsLatencyMs := wsLatency.Nanoseconds() / 1000000
        var wsLatency32 uint32
-       if wsLatencyNs > math.MaxUint32 {
+       if wsLatencyMs > math.MaxUint32 {
                wsLatency32 = math.MaxUint32
        } else {
-               wsLatency32 = uint32(wsLatencyNs)
-       }
-       msink.wsm.update(msink.maxMtx, client, clientDropped, clientWritten, 
wsLatency32)
-}
-
-func (wsm *WriteSpanMetrics) update(maxMtx int, client string, clientDropped 
uint64,
-               clientWritten int, wsLatency uint32) {
-       wsm.lock.Lock()
-       defer wsm.lock.Unlock()
-       wsm.clientDroppedMap[client] = clientDropped
-       if len(wsm.clientDroppedMap) >= maxMtx {
-               // Delete a random entry
-               for k := range wsm.clientDroppedMap {
-                       delete(wsm.clientDroppedMap, k)
-                       return
+               wsLatency32 = uint32(wsLatencyMs)
+       }
+       msink.wsLatencyCircBuf.Append(wsLatency32)
+}
+
+// Update the per-host span metrics.  Must be called with the lock held.
+func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
+               serverDropped int, clientDroppedEstimate int) {
+       mtx, found := msink.HostSpanMetrics[addr]
+       if !found {
+               // Ensure that the per-host span metrics map doesn't grow too 
large.
+               if len(msink.HostSpanMetrics) >= msink.maxMtx {
+                       // Delete a random entry
+                       for k := range msink.HostSpanMetrics {
+                               msink.lg.Warnf("Evicting metrics entry for addr 
%s "+
+                                       "because there are more than %d 
addrs.\n", k, msink.maxMtx)
+                               delete(msink.HostSpanMetrics, k)
+                               break
+                       }
+               }
+               mtx = &common.SpanMetrics { }
+               msink.HostSpanMetrics[addr] = mtx
+       }
+       mtx.Written += uint64(numWritten)
+       mtx.ServerDropped += uint64(serverDropped)
+       mtx.ClientDroppedEstimate += uint64(clientDroppedEstimate)
+}
+
+// Update the total number of spans which were persisted to disk.
+func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
+               serverDropped int) {
+       msink.lock.Lock()
+       defer msink.lock.Unlock()
+       msink.WrittenSpans += uint64(totalWritten)
+       msink.ServerDropped += uint64(serverDropped)
+       msink.updateSpanMetrics(addr, totalWritten, serverDropped, 0)
+}
+
+// Read the server stats.
+func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
+       msink.lock.Lock()
+       defer msink.lock.Unlock()
+       stats.IngestedSpans = msink.IngestedSpans
+       stats.WrittenSpans = msink.WrittenSpans
+       stats.ServerDroppedSpans = msink.ServerDropped
+       stats.ClientDroppedEstimate = msink.ClientDroppedEstimate
+       stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
+       stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
+       stats.HostSpanMetrics = make(common.SpanMetricsMap)
+       for k, v := range(msink.HostSpanMetrics) {
+               stats.HostSpanMetrics[k] = &common.SpanMetrics {
+                       Written: v.Written,
+                       ServerDropped: v.ServerDropped,
+                       ClientDroppedEstimate: v.ClientDroppedEstimate,
                }
-       }
-       wsm.ingestedSpans += uint64(clientWritten)
-       wsm.clientDroppedSpans += uint64(clientDropped)
-       wsm.latencyCircBuf.Append(wsLatency)
-}
-
-func (wsm *WriteSpanMetrics) GetData() *WriteSpanMetricsData {
-       wsm.lock.Lock()
-       defer wsm.lock.Unlock()
-       clientDroppedMap := make(map[string]uint64)
-       for k, v := range wsm.clientDroppedMap {
-               clientDroppedMap[k] = v
-       }
-       return &WriteSpanMetricsData {
-               clientDroppedMap: clientDroppedMap,
-               ingestedSpans: wsm.ingestedSpans,
-               clientDroppedSpans: wsm.clientDroppedSpans,
-               latencyMax: wsm.latencyCircBuf.Max(),
-               latencyAverage: wsm.latencyCircBuf.Average(),
        }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index 5243d9e..e1dba1f 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -20,6 +20,7 @@
 package main
 
 import (
+       "fmt"
        htrace "org/apache/htrace/client"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
@@ -28,43 +29,6 @@ import (
        "time"
 )
 
-func TestMetricsSinkStartupShutdown(t *testing.T) {
-       cnfBld := conf.Builder{
-               Values:   conf.TEST_VALUES(),
-               Defaults: conf.DEFAULTS,
-       }
-       cnf, err := cnfBld.Build()
-       if err != nil {
-               t.Fatalf("failed to create conf: %s", err.Error())
-       }
-       msink := NewMetricsSink(cnf)
-       msink.Shutdown()
-}
-
-func TestAddSpanMetrics(t *testing.T) {
-       a := &ServerSpanMetrics{
-               Written:       100,
-               ServerDropped: 200,
-       }
-       b := &ServerSpanMetrics{
-               Written:       500,
-               ServerDropped: 100,
-       }
-       a.Add(b)
-       if a.Written != 600 {
-               t.Fatalf("SpanMetrics#Add failed to update #Written")
-       }
-       if a.ServerDropped != 300 {
-               t.Fatalf("SpanMetrics#Add failed to update #Dropped")
-       }
-       if b.Written != 500 {
-               t.Fatalf("SpanMetrics#Add updated b#Written")
-       }
-       if b.ServerDropped != 100 {
-               t.Fatalf("SpanMetrics#Add updated b#Dropped")
-       }
-}
-
 func compareTotals(a, b common.SpanMetricsMap) bool {
        for k, v := range a {
                if !reflect.DeepEqual(v, b[k]) {
@@ -79,112 +43,52 @@ func compareTotals(a, b common.SpanMetricsMap) bool {
        return true
 }
 
-func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) {
-       for {
-               time.Sleep(1 * time.Millisecond)
-               totals := msink.AccessServerTotals()
-               if compareTotals(totals, expectedTotals) {
-                       return
-               }
-       }
+type Fatalfer interface {
+       Fatalf(format string, args ...interface{})
 }
 
-func TestMetricsSinkMessages(t *testing.T) {
-       cnfBld := conf.Builder{
-               Values:   conf.TEST_VALUES(),
-               Defaults: conf.DEFAULTS,
-       }
-       cnf, err := cnfBld.Build()
-       if err != nil {
-               t.Fatalf("failed to create conf: %s", err.Error())
-       }
-       msink := NewMetricsSink(cnf)
-       totals := msink.AccessServerTotals()
-       if len(totals) != 0 {
-               t.Fatalf("Expected no data in the MetricsSink to start with.")
+func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink,
+               expectedNumWritten int) {
+       var sstats common.ServerStats
+       msink.PopulateServerStats(&sstats)
+       if sstats.WrittenSpans != uint64(expectedNumWritten) {
+               t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n",
+                       sstats.WrittenSpans, len(SIMPLE_TEST_SPANS))
+       }
+       if sstats.HostSpanMetrics["127.0.0.1"] == nil {
+               t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] 
found.")
+       }
+       if sstats.HostSpanMetrics["127.0.0.1"].Written !=
+                       uint64(expectedNumWritten) {
+               t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but " 
+
+                       "expected %d\n", 
sstats.HostSpanMetrics["127.0.0.1"].Written,
+                       len(SIMPLE_TEST_SPANS))
        }
-       msink.UpdateMetrics(ServerSpanMetricsMap{
-               "192.168.0.100": &ServerSpanMetrics{
-                       Written:       20,
-                       ServerDropped: 10,
-               },
-       })
-       waitForMetrics(msink, common.SpanMetricsMap{
-               "192.168.0.100": &common.SpanMetrics{
-                       Written:       20,
-                       ServerDropped: 10,
-               },
-       })
-       msink.UpdateMetrics(ServerSpanMetricsMap{
-               "192.168.0.100": &ServerSpanMetrics{
-                       Written:       200,
-                       ServerDropped: 100,
-               },
-       })
-       msink.UpdateMetrics(ServerSpanMetricsMap{
-               "192.168.0.100": &ServerSpanMetrics{
-                       Written:       1000,
-                       ServerDropped: 1000,
-               },
-       })
-       waitForMetrics(msink, common.SpanMetricsMap{
-               "192.168.0.100": &common.SpanMetrics{
-                       Written:       1220,
-                       ServerDropped: 1110,
-               },
-       })
-       msink.UpdateMetrics(ServerSpanMetricsMap{
-               "192.168.0.200": &ServerSpanMetrics{
-                       Written:       200,
-                       ServerDropped: 100,
-               },
-       })
-       waitForMetrics(msink, common.SpanMetricsMap{
-               "192.168.0.100": &common.SpanMetrics{
-                       Written:       1220,
-                       ServerDropped: 1110,
-               },
-               "192.168.0.200": &common.SpanMetrics{
-                       Written:       200,
-                       ServerDropped: 100,
-               },
-       })
-       msink.Shutdown()
 }
 
-func TestMetricsSinkMessagesEviction(t *testing.T) {
+func TestMetricsSinkPerHostEviction(t *testing.T) {
        cnfBld := conf.Builder{
                Values:   conf.TEST_VALUES(),
                Defaults: conf.DEFAULTS,
        }
        cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
-       cnfBld.Values[conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS] = "1"
        cnf, err := cnfBld.Build()
        if err != nil {
                t.Fatalf("failed to create conf: %s", err.Error())
        }
        msink := NewMetricsSink(cnf)
-       msink.UpdateMetrics(ServerSpanMetricsMap{
-               "192.168.0.100": &ServerSpanMetrics{
-                       Written:       20,
-                       ServerDropped: 10,
-               },
-               "192.168.0.101": &ServerSpanMetrics{
-                       Written:       20,
-                       ServerDropped: 10,
-               },
-               "192.168.0.102": &ServerSpanMetrics{
-                       Written:       20,
-                       ServerDropped: 10,
-               },
-       })
-       for {
-               totals := msink.AccessServerTotals()
-               if len(totals) == 2 {
-                       break
+       msink.UpdatePersisted("192.168.0.100", 20, 10)
+       msink.UpdatePersisted("192.168.0.101", 20, 10)
+       msink.UpdatePersisted("192.168.0.102", 20, 10)
+       msink.lock.Lock()
+       defer msink.lock.Unlock()
+       if len(msink.HostSpanMetrics) != 2 {
+               for k, v := range(msink.HostSpanMetrics) {
+                       fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v)
                }
+               t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got 
%d\n",
+                       len(msink.HostSpanMetrics))
        }
-       msink.Shutdown()
 }
 
 func TestIngestedSpansMetricsRest(t *testing.T) {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
index dcc916a..0140dbb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -43,7 +43,7 @@ func TestReapingOldSpans(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_SPAN_EXPIRY_MS:              
fmt.Sprintf("%d", 60*60*1000),
                        conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  "1",
-                       conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+                       conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1",
                },
                WrittenSpans: common.NewSemaphore(0),
                DataDirs:     make([]string, 2),
@@ -52,12 +52,11 @@ func TestReapingOldSpans(t *testing.T) {
        if err != nil {
                t.Fatalf("failed to create mini htraced cluster: %s\n", 
err.Error())
        }
-       for i := range testSpans {
-               ht.Store.WriteSpan(&IncomingSpan{
-                       Addr: "127.0.0.1",
-                       Span: testSpans[i],
-               })
+       ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
+       for spanIdx := range testSpans {
+               ing.IngestSpan(testSpans[spanIdx])
        }
+       ing.Close(0, time.Now())
        // Wait the spans to be created
        ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
        // Set a reaper date that will remove all the spans except final one.

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 432375d..1b90bd4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -252,27 +252,12 @@ func (hand *writeSpansHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Reques
        }
        hand.lg.Debugf("writeSpansHandler: received %d span(s).  defaultTrid = 
%s\n",
                len(msg.Spans), msg.DefaultTrid)
+
+       ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
        for spanIdx := range msg.Spans {
-               if hand.lg.DebugEnabled() {
-                       hand.lg.Debugf("writing span %s\n", 
msg.Spans[spanIdx].ToJson())
-               }
-               span := msg.Spans[spanIdx]
-               if span.TracerId == "" {
-                       span.TracerId = msg.DefaultTrid
-               }
-               spanIdProblem := span.Id.FindProblem()
-               if spanIdProblem != "" {
-                       hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", 
spanIdProblem))
-               } else {
-                       hand.store.WriteSpan(&IncomingSpan{
-                               Addr: client,
-                               Span: span,
-                       })
-               }
+               ing.IngestSpan(msg.Spans[spanIdx])
        }
-       endTime := time.Now()
-       hand.store.msink.Update(client, msg.ClientDropped, len(msg.Spans),
-                       endTime.Sub(startTime))
+       ing.Close(int(msg.ClientDropped), startTime)
 }
 
 type queryHandler struct {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go 
b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 7b5e433..c81bbb7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -216,7 +216,10 @@ func printServerStats(hcl *htrace.Client) int {
                common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
        fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
        fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans)
-       fmt.Fprintf(w, "Spans dropped by clients\t%d\n", 
stats.ClientDroppedSpans)
+       fmt.Fprintf(w, "Spans written\t%d\n", stats.WrittenSpans)
+       fmt.Fprintf(w, "Spans dropped by server\t%d\n", 
stats.ServerDroppedSpans)
+       fmt.Fprintf(w, "Estimated spans dropped by clients\t%d\n",
+               stats.ClientDroppedEstimate)
        dur := time.Millisecond * 
time.Duration(stats.AverageWriteSpansLatencyMs)
        fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String())
        dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs)
@@ -244,8 +247,8 @@ func printServerStats(hcl *htrace.Client) int {
        sort.Sort(keys)
        for k := range keys {
                mtx := mtxMap[keys[k]]
-               fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient 
dropped: %d\n",
-                       keys[k], mtx.Written, mtx.ServerDropped, 
mtx.ClientDropped)
+               fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient 
dropped estimate: %d\n",
+                       keys[k], mtx.Written, mtx.ServerDropped, 
mtx.ClientDroppedEstimate)
        }
        w.Flush()
        return EXIT_SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/app/server_info_view.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_info_view.js 
b/htrace-webapp/src/main/webapp/app/server_info_view.js
index 62775e8..efb7545 100644
--- a/htrace-webapp/src/main/webapp/app/server_info_view.js
+++ b/htrace-webapp/src/main/webapp/app/server_info_view.js
@@ -50,7 +50,7 @@ htrace.ServerInfoView = Backbone.View.extend({
             '<th>Remote</th>' +
             '<th>Written</th>' +
             '<th>ServerDropped</th>' +
-            '<th>ClientDropped</th>' +
+            '<th>ClientDroppedEstimate</th>' +
           '</tr>' +
         '</thead>';
     var remotes = [];
@@ -69,7 +69,7 @@ htrace.ServerInfoView = Backbone.View.extend({
         "<td>" + remote + "</td>" +
         "<td>" + smtx.Written + "</td>" +
         "<td>" + smtx.ServerDropped + "</td>" +
-        "<td>" + smtx.ClientDropped + "</td>" +
+        "<td>" + smtx.ClientDroppedEstimate + "</td>" +
         "</tr>";
     }
     out = out + '</table>';

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/app/server_stats.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_stats.js 
b/htrace-webapp/src/main/webapp/app/server_stats.js
index 783041c..4cfea92 100644
--- a/htrace-webapp/src/main/webapp/app/server_stats.js
+++ b/htrace-webapp/src/main/webapp/app/server_stats.js
@@ -22,8 +22,13 @@ htrace.ServerStats = Backbone.Model.extend({
   defaults: {
     "LastStartMs": "0",
     "CurMs": "0",
+    "ReapedSpans": "(unknown)",
     "IngestedSpans": "(unknown)",
-    "ReapedSpans": "(unknown)"
+    "WrittenSpans": "(unknown)",
+    "ServerDroppedSpans": "(unknown)",
+    "ClientDroppedSpans": "(unknown)",
+    "MaxWriteSpansLatencyMs": "(unknown)",
+    "AverageWriteSpansLatencyMs": "(unknown)"
   },
 
   url: function() {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/index.html 
b/htrace-webapp/src/main/webapp/index.html
index 16fd2f9..1e20ec0 100644
--- a/htrace-webapp/src/main/webapp/index.html
+++ b/htrace-webapp/src/main/webapp/index.html
@@ -84,8 +84,16 @@
               <td><%= model.stats.get("IngestedSpans") %></td>
             </tr>
             <tr>
-              <td>Client Dropped Spans</td>
-              <td><%= model.stats.get("ClientDroppedSpans") %></td>
+              <td>Spans Written</td>
+              <td><%= model.stats.get("WrittenSpans") %></td>
+            </tr>
+            <tr>
+              <td>Server Dropped Spans</td>
+              <td><%= model.stats.get("ServerDroppedSpans") %></td>
+            </tr>
+            <tr>
+              <td>Estimated Client Dropped Spans</td>
+              <td><%= model.stats.get("ClientDroppedEstimate") %></td>
             </tr>
             <tr>
               <td>Maximum WriteSpans Latency (ms)</td>

Reply via email to