Repository: incubator-htrace
Updated Branches:
  refs/heads/master 5f871b619 -> 0fd383546


HTRACE-115. The htraced datastore should use uint64 for span ids rather than 
int64 (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/0fd38354
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/0fd38354
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/0fd38354

Branch: refs/heads/master
Commit: 0fd383546cfa3e586288d973babcb09882193ec8
Parents: 5f871b6
Author: Colin P. Mccabe <[email protected]>
Authored: Wed Feb 25 10:28:24 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Thu Feb 26 11:38:51 2015 -0800

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/common/query.go    |  4 +-
 .../src/go/src/org/apache/htrace/common/span.go |  6 +-
 .../src/org/apache/htrace/htraced/datastore.go  | 99 +++++++++++---------
 .../go/src/org/apache/htrace/htraced/rest.go    | 20 ++--
 4 files changed, 72 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0fd38354/htrace-core/src/go/src/org/apache/htrace/common/query.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/query.go 
b/htrace-core/src/go/src/org/apache/htrace/common/query.go
index 6e7d6c7..a32909e 100644
--- a/htrace-core/src/go/src/org/apache/htrace/common/query.go
+++ b/htrace-core/src/go/src/org/apache/htrace/common/query.go
@@ -60,7 +60,7 @@ func (op Op) IsDescending() bool {
 
 func (op Op) IsValid() bool {
        ops := ValidOps()
-       for i := range(ops) {
+       for i := range ops {
                if ops[i] == op {
                        return true
                }
@@ -85,7 +85,7 @@ const (
 
 func (field Field) IsValid() bool {
        fields := ValidFields()
-       for i := range(fields) {
+       for i := range fields {
                if fields[i] == field {
                        return true
                }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0fd38354/htrace-core/src/go/src/org/apache/htrace/common/span.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span.go 
b/htrace-core/src/go/src/org/apache/htrace/common/span.go
index 9b7af22..c273ad9 100644
--- a/htrace-core/src/go/src/org/apache/htrace/common/span.go
+++ b/htrace-core/src/go/src/org/apache/htrace/common/span.go
@@ -43,14 +43,14 @@ type TimelineAnnotation struct {
        Msg  string `json:"m"`
 }
 
-type SpanId int64
+type SpanId uint64
 
 func (id SpanId) String() string {
        return fmt.Sprintf("%016x", uint64(id))
 }
 
-func (id SpanId) Val() int64 {
-       return int64(id)
+func (id SpanId) Val() uint64 {
+       return uint64(id)
 }
 
 func (id SpanId) MarshalJSON() ([]byte, error) {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0fd38354/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
index a4233f4..f7c8ece 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
@@ -46,9 +46,6 @@ import (
 // losing a little bit of trace data if htraced goes down is not critical.  We 
use the "gob" package
 // for serialization.  We assume that there will be many more writes than 
reads.
 //
-// TODO: implement redundancy (storing data on more than 1 drive)
-// TODO: implement re-loading old span data
-//
 // Schema
 // m -> dataStoreMetadata
 // s[8-byte-big-endian-sid] -> SpanData
@@ -57,6 +54,14 @@ import (
 // d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
 // p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
 //
+// Note that span IDs are unsigned 64-bit numbers.
+// Begin times, end times, and durations are signed 64-bit numbers.
+// In order to get LevelDB to properly compare the signed 64-bit quantities,
+// we flip the highest bit.  This way, we can get leveldb to view negative
+// quantities as less than non-negative ones.  This also means that we can do
+// all queries using unsigned 64-bit math, rather than having to special-case
+// the signed fields.
+//
 
 const DATA_STORE_VERSION = 1
 
@@ -85,7 +90,7 @@ func (stats *Statistics) Copy() *Statistics {
 }
 
 // Translate a span id into a leveldb key.
-func makeKey(tag byte, sid int64) []byte {
+func makeKey(tag byte, sid uint64) []byte {
        id := uint64(sid)
        return []byte{
                tag,
@@ -100,7 +105,7 @@ func makeKey(tag byte, sid int64) []byte {
        }
 }
 
-func keyToInt(key []byte) int64 {
+func keyToInt(key []byte) uint64 {
        var id uint64
        id = (uint64(key[0]) << 56) |
                (uint64(key[1]) << 48) |
@@ -110,12 +115,10 @@ func keyToInt(key []byte) int64 {
                (uint64(key[5]) << 16) |
                (uint64(key[6]) << 8) |
                (uint64(key[7]) << 0)
-       return int64(id)
+       return id
 }
 
-func makeSecondaryKey(tag byte, first int64, second int64) []byte {
-       fir := uint64(first)
-       sec := uint64(second)
+func makeSecondaryKey(tag byte, fir uint64, sec uint64) []byte {
        return []byte{
                tag,
                byte(0xff & (fir >> 56)),
@@ -191,6 +194,15 @@ func (shd *shard) processIncoming() {
        }
 }
 
+// Convert a signed 64-bit number into an unsigned 64-bit number.  We flip the
+// highest bit, so that negative input values map to unsigned numbers which are
+// less than non-negative input values.
+func s2u64(val int64) uint64 {
+       ret := uint64(val)
+       ret ^= 0x8000000000000000
+       return ret
+}
+
 func (shd *shard) writeSpan(span *common.Span) error {
        batch := levigo.NewWriteBatch()
        defer batch.Close()
@@ -211,11 +223,11 @@ func (shd *shard) writeSpan(span *common.Span) error {
        }
 
        // Add to the other secondary indices.
-       batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, span.Begin,
+       batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, s2u64(span.Begin),
                span.Id.Val()), EMPTY_BYTE_BUF)
-       batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, span.End,
+       batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, s2u64(span.End),
                span.Id.Val()), EMPTY_BYTE_BUF)
-       batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, span.Duration(),
+       batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, 
s2u64(span.Duration()),
                span.Id.Val()), EMPTY_BYTE_BUF)
 
        err = shd.ldb.Write(shd.store.writeOpts, batch)
@@ -229,8 +241,9 @@ func (shd *shard) writeSpan(span *common.Span) error {
        return nil
 }
 
-func (shd *shard) FindChildren(sid int64, childIds []common.SpanId, lim int32) 
([]common.SpanId, int32, error) {
-       searchKey := makeKey('p', sid)
+func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId,
+       lim int32) ([]common.SpanId, int32, error) {
+       searchKey := makeKey('p', sid.Val())
        iter := shd.ldb.NewIterator(shd.store.readOpts)
        defer iter.Close()
        iter.Seek(searchKey)
@@ -402,21 +415,21 @@ func (store *dataStore) Close() {
 }
 
 // Get the index of the shard which stores the given spanId.
-func (store *dataStore) getShardIndex(spanId int64) int {
-       return int(uint64(spanId) % uint64(len(store.shards)))
+func (store *dataStore) getShardIndex(sid common.SpanId) int {
+       return int(sid.Val() % uint64(len(store.shards)))
 }
 
 func (store *dataStore) WriteSpan(span *common.Span) {
-       store.shards[store.getShardIndex(span.Id.Val())].incoming <- span
+       store.shards[store.getShardIndex(span.Id)].incoming <- span
 }
 
-func (store *dataStore) FindSpan(sid int64) *common.Span {
+func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
        return store.shards[store.getShardIndex(sid)].FindSpan(sid)
 }
 
-func (shd *shard) FindSpan(sid int64) *common.Span {
+func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
        lg := shd.store.lg
-       buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid))
+       buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid.Val()))
        if err != nil {
                if strings.Index(err.Error(), "NotFound:") != -1 {
                        return nil
@@ -435,7 +448,7 @@ func (shd *shard) FindSpan(sid int64) *common.Span {
        return span
 }
 
-func (shd *shard) decodeSpan(sid int64, buf []byte) (*common.Span, error) {
+func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, 
error) {
        r := bytes.NewBuffer(buf)
        decoder := gob.NewDecoder(r)
        data := common.SpanData{}
@@ -452,7 +465,7 @@ func (shd *shard) decodeSpan(sid int64, buf []byte) 
(*common.Span, error) {
 }
 
 // Find the children of a given span id.
-func (store *dataStore) FindChildren(sid int64, lim int32) []common.SpanId {
+func (store *dataStore) FindChildren(sid common.SpanId, lim int32) 
[]common.SpanId {
        childIds := make([]common.SpanId, 0)
        var err error
 
@@ -482,8 +495,8 @@ func (store *dataStore) FindChildren(sid int64, lim int32) 
[]common.SpanId {
 
 type predicateData struct {
        *common.Predicate
-       intKey int64
-       strKey string
+       uintKey uint64
+       strKey  string
 }
 
 func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
@@ -499,20 +512,20 @@ func loadPredicateData(pred *common.Predicate) 
(*predicateData, error) {
                        return nil, errors.New(fmt.Sprintf("Unable to parse 
span id '%s': %s",
                                pred.Val, err.Error()))
                }
-               p.intKey = id.Val()
+               p.uintKey = id.Val()
                break
        case common.DESCRIPTION:
                // Any string is valid for a description.
                p.strKey = pred.Val
                break
        case common.BEGIN_TIME, common.END_TIME, common.DURATION:
-               // Base-10 numeric fields.
+               // Parse a base-10 signed numeric field.
                v, err := strconv.ParseInt(pred.Val, 10, 64)
                if err != nil {
                        return nil, errors.New(fmt.Sprintf("Unable to parse %s 
'%s': %s",
                                pred.Field, pred.Val, err.Error()))
                }
-               p.intKey = v
+               p.uintKey = s2u64(v)
                break
        default:
                return nil, errors.New(fmt.Sprintf("Unknown field %s", 
pred.Field))
@@ -563,18 +576,18 @@ func (pred *predicateData) fieldIsNumeric() bool {
 }
 
 // Get the values that this predicate cares about for a given span.
-func (pred *predicateData) extractRelevantSpanData(span *common.Span) (int64, 
string) {
+func (pred *predicateData) extractRelevantSpanData(span *common.Span) (uint64, 
string) {
        switch pred.Field {
        case common.SPAN_ID:
                return span.Id.Val(), ""
        case common.DESCRIPTION:
                return 0, span.Description
        case common.BEGIN_TIME:
-               return span.Begin, ""
+               return s2u64(span.Begin), ""
        case common.END_TIME:
-               return span.End, ""
+               return s2u64(span.End), ""
        case common.DURATION:
-               return span.Duration(), ""
+               return s2u64(span.Duration()), ""
        default:
                panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", 
pred.Field))
        }
@@ -614,13 +627,13 @@ func (pred *predicateData) satisfiedBy(span *common.Span) 
bool {
        if pred.fieldIsNumeric() {
                switch pred.Op {
                case common.EQUALS:
-                       return intVal == pred.intKey
+                       return intVal == pred.uintKey
                case common.LESS_THAN_OR_EQUALS:
-                       return intVal <= pred.intKey
+                       return intVal <= pred.uintKey
                case common.GREATER_THAN_OR_EQUALS:
-                       return intVal >= pred.intKey
+                       return intVal >= pred.uintKey
                case common.GREATER_THAN:
-                       return intVal > pred.intKey
+                       return intVal > pred.uintKey
                default:
                        panic(fmt.Sprintf("unknown Op type %s should have been 
caught "+
                                "during normalization", pred.Op))
@@ -666,7 +679,7 @@ func (pred *predicateData) createSource(store *dataStore) 
(*source, error) {
                shd := store.shards[shardIdx]
                src.iters = append(src.iters, 
shd.ldb.NewIterator(store.readOpts))
        }
-       searchKey := makeKey(src.keyPrefix, pred.intKey)
+       searchKey := makeKey(src.keyPrefix, pred.uintKey)
        for i := range src.iters {
                src.iters[i].Seek(searchKey)
        }
@@ -729,23 +742,23 @@ func (src *source) populateNextFromShard(shardIdx int) {
                        break // Can't read past end of indexed section
                }
                var span *common.Span
-               var sid int64
+               var sid common.SpanId
                if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
                        // The span id maps to the span itself.
-                       sid = keyToInt(key[1:])
+                       sid = common.SpanId(keyToInt(key[1:]))
                        span, err = src.store.shards[shardIdx].decodeSpan(sid, 
iter.Value())
                        if err != nil {
-                               lg.Debugf("Internal error decoding span %016x 
in shard %d: %s\n",
-                                       sid, shardIdx, err.Error())
+                               lg.Debugf("Internal error decoding span %s in 
shard %d: %s\n",
+                                       sid.String(), shardIdx, err.Error())
                                break
                        }
                } else {
                        // With a secondary index, we have to look up the span 
by id.
-                       sid = keyToInt(key[9:])
+                       sid = common.SpanId(keyToInt(key[9:]))
                        span = src.store.shards[shardIdx].FindSpan(sid)
                        if span == nil {
-                               lg.Debugf("Internal error rehydrating span 
%016x in shard %d\n",
-                                       sid, shardIdx)
+                               lg.Debugf("Internal error rehydrating span %s 
in shard %d\n",
+                                       sid.String(), shardIdx)
                                break
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0fd38354/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
index 495aed0..dc9a061 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
@@ -64,6 +64,7 @@ func (hand *serverInfoHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Reques
                        fmt.Sprintf("error marshalling ServerInfo: %s\n", 
err.Error()))
                return
        }
+       hand.lg.Debugf("Returned serverInfo %s\n", string(buf))
        w.Write(buf)
 }
 
@@ -72,15 +73,16 @@ type dataStoreHandler struct {
        store *dataStore
 }
 
-func (hand *dataStoreHandler) parse64(w http.ResponseWriter, str string) 
(int64, bool) {
+func (hand *dataStoreHandler) parseSid(w http.ResponseWriter,
+       str string) (common.SpanId, bool) {
        val, err := strconv.ParseUint(str, 16, 64)
        if err != nil {
                writeError(hand.lg, w, http.StatusBadRequest,
                        fmt.Sprintf("Failed to parse span ID %s: %s", str, 
err.Error()))
                w.Write([]byte("Error parsing : " + err.Error()))
-               return -1, false
+               return 0, false
        }
-       return int64(val), true
+       return common.SpanId(val), true
 }
 
 func (hand *dataStoreHandler) getReqField32(fieldName string, w 
http.ResponseWriter,
@@ -108,15 +110,15 @@ func (hand *findSidHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Request)
        req.ParseForm()
        vars := mux.Vars(req)
        stringSid := vars["id"]
-       sid, ok := hand.parse64(w, stringSid)
+       sid, ok := hand.parseSid(w, stringSid)
        if !ok {
                return
        }
-       hand.lg.Debugf("findSidHandler(sid=%s)\n", common.SpanId(sid))
+       hand.lg.Debugf("findSidHandler(sid=%s)\n", sid.String())
        span := hand.store.FindSpan(sid)
        if span == nil {
-               writeError(hand.lg, w, http.StatusNoContent, fmt.Sprintf("No 
such span as %s\n",
-                       common.SpanId(sid)))
+               writeError(hand.lg, w, http.StatusNoContent,
+                       fmt.Sprintf("No such span as %s\n", sid.String()))
                return
        }
        w.Write(span.ToJson())
@@ -131,7 +133,7 @@ func (hand *findChildrenHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Requ
        req.ParseForm()
        vars := mux.Vars(req)
        stringSid := vars["id"]
-       sid, ok := hand.parse64(w, stringSid)
+       sid, ok := hand.parseSid(w, stringSid)
        if !ok {
                return
        }
@@ -140,7 +142,7 @@ func (hand *findChildrenHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Requ
        if !ok {
                return
        }
-       hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", 
common.SpanId(sid), lim)
+       hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", sid.String(), 
lim)
        children := hand.store.FindChildren(sid, lim)
        jbytes, err := json.Marshal(children)
        if err != nil {

Reply via email to