Repository: incubator-htrace Updated Branches: refs/heads/master 5f871b619 -> 0fd383546
HTRACE-115. The htraced datastore should use uint64 for span ids rather than int64 (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/0fd38354 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/0fd38354 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/0fd38354 Branch: refs/heads/master Commit: 0fd383546cfa3e586288d973babcb09882193ec8 Parents: 5f871b6 Author: Colin P. Mccabe <[email protected]> Authored: Wed Feb 25 10:28:24 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Thu Feb 26 11:38:51 2015 -0800 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/common/query.go | 4 +- .../src/go/src/org/apache/htrace/common/span.go | 6 +- .../src/org/apache/htrace/htraced/datastore.go | 99 +++++++++++--------- .../go/src/org/apache/htrace/htraced/rest.go | 20 ++-- 4 files changed, 72 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0fd38354/htrace-core/src/go/src/org/apache/htrace/common/query.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/query.go b/htrace-core/src/go/src/org/apache/htrace/common/query.go index 6e7d6c7..a32909e 100644 --- a/htrace-core/src/go/src/org/apache/htrace/common/query.go +++ b/htrace-core/src/go/src/org/apache/htrace/common/query.go @@ -60,7 +60,7 @@ func (op Op) IsDescending() bool { func (op Op) IsValid() bool { ops := ValidOps() - for i := range(ops) { + for i := range ops { if ops[i] == op { return true } @@ -85,7 +85,7 @@ const ( func (field Field) IsValid() bool { fields := ValidFields() - for i := range(fields) { + for i := range fields { if fields[i] == field { return true } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0fd38354/htrace-core/src/go/src/org/apache/htrace/common/span.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span.go b/htrace-core/src/go/src/org/apache/htrace/common/span.go index 9b7af22..c273ad9 100644 --- a/htrace-core/src/go/src/org/apache/htrace/common/span.go +++ b/htrace-core/src/go/src/org/apache/htrace/common/span.go @@ -43,14 +43,14 @@ type TimelineAnnotation struct { Msg string `json:"m"` } -type SpanId int64 +type SpanId uint64 func (id SpanId) String() string { return fmt.Sprintf("%016x", uint64(id)) } -func (id SpanId) Val() int64 { - return int64(id) +func (id SpanId) Val() uint64 { + return uint64(id) } func (id SpanId) MarshalJSON() ([]byte, error) { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0fd38354/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go index a4233f4..f7c8ece 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go @@ -46,9 +46,6 @@ import ( // losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package // for serialization. We assume that there will be many more writes than reads. // -// TODO: implement redundancy (storing data on more than 1 drive) -// TODO: implement re-loading old span data -// // Schema // m -> dataStoreMetadata // s[8-byte-big-endian-sid] -> SpanData @@ -57,6 +54,14 @@ import ( // d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {} // p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {} // +// Note that span IDs are unsigned 64-bit numbers. +// Begin times, end times, and durations are signed 64-bit numbers. +// In order to get LevelDB to properly compare the signed 64-bit quantities, +// we flip the highest bit. This way, we can get leveldb to view negative +// quantities as less than non-negative ones. This also means that we can do +// all queries using unsigned 64-bit math, rather than having to special-case +// the signed fields. +// const DATA_STORE_VERSION = 1 @@ -85,7 +90,7 @@ func (stats *Statistics) Copy() *Statistics { } // Translate a span id into a leveldb key. -func makeKey(tag byte, sid int64) []byte { +func makeKey(tag byte, sid uint64) []byte { id := uint64(sid) return []byte{ tag, @@ -100,7 +105,7 @@ func makeKey(tag byte, sid int64) []byte { } } -func keyToInt(key []byte) int64 { +func keyToInt(key []byte) uint64 { var id uint64 id = (uint64(key[0]) << 56) | (uint64(key[1]) << 48) | @@ -110,12 +115,10 @@ func keyToInt(key []byte) int64 { (uint64(key[5]) << 16) | (uint64(key[6]) << 8) | (uint64(key[7]) << 0) - return int64(id) + return id } -func makeSecondaryKey(tag byte, first int64, second int64) []byte { - fir := uint64(first) - sec := uint64(second) +func makeSecondaryKey(tag byte, fir uint64, sec uint64) []byte { return []byte{ tag, byte(0xff & (fir >> 56)), @@ -191,6 +194,15 @@ func (shd *shard) processIncoming() { } } +// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the +// highest bit, so that negative input values map to unsigned numbers which are +// less than non-negative input values. +func s2u64(val int64) uint64 { + ret := uint64(val) + ret ^= 0x8000000000000000 + return ret +} + func (shd *shard) writeSpan(span *common.Span) error { batch := levigo.NewWriteBatch() defer batch.Close() @@ -211,11 +223,11 @@ func (shd *shard) writeSpan(span *common.Span) error { } // Add to the other secondary indices. - batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, span.Begin, + batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, s2u64(span.Begin), span.Id.Val()), EMPTY_BYTE_BUF) - batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, span.End, + batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, s2u64(span.End), span.Id.Val()), EMPTY_BYTE_BUF) - batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, span.Duration(), + batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, s2u64(span.Duration()), span.Id.Val()), EMPTY_BYTE_BUF) err = shd.ldb.Write(shd.store.writeOpts, batch) @@ -229,8 +241,9 @@ func (shd *shard) writeSpan(span *common.Span) error { return nil } -func (shd *shard) FindChildren(sid int64, childIds []common.SpanId, lim int32) ([]common.SpanId, int32, error) { - searchKey := makeKey('p', sid) +func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId, + lim int32) ([]common.SpanId, int32, error) { + searchKey := makeKey('p', sid.Val()) iter := shd.ldb.NewIterator(shd.store.readOpts) defer iter.Close() iter.Seek(searchKey) @@ -402,21 +415,21 @@ func (store *dataStore) Close() { } // Get the index of the shard which stores the given spanId. -func (store *dataStore) getShardIndex(spanId int64) int { - return int(uint64(spanId) % uint64(len(store.shards))) +func (store *dataStore) getShardIndex(sid common.SpanId) int { + return int(sid.Val() % uint64(len(store.shards))) } func (store *dataStore) WriteSpan(span *common.Span) { - store.shards[store.getShardIndex(span.Id.Val())].incoming <- span + store.shards[store.getShardIndex(span.Id)].incoming <- span } -func (store *dataStore) FindSpan(sid int64) *common.Span { +func (store *dataStore) FindSpan(sid common.SpanId) *common.Span { return store.shards[store.getShardIndex(sid)].FindSpan(sid) } -func (shd *shard) FindSpan(sid int64) *common.Span { +func (shd *shard) FindSpan(sid common.SpanId) *common.Span { lg := shd.store.lg - buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid)) + buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid.Val())) if err != nil { if strings.Index(err.Error(), "NotFound:") != -1 { return nil @@ -435,7 +448,7 @@ func (shd *shard) FindSpan(sid int64) *common.Span { return span } -func (shd *shard) decodeSpan(sid int64, buf []byte) (*common.Span, error) { +func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) { r := bytes.NewBuffer(buf) decoder := gob.NewDecoder(r) data := common.SpanData{} @@ -452,7 +465,7 @@ func (shd *shard) decodeSpan(sid int64, buf []byte) (*common.Span, error) { } // Find the children of a given span id. -func (store *dataStore) FindChildren(sid int64, lim int32) []common.SpanId { +func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId { childIds := make([]common.SpanId, 0) var err error @@ -482,8 +495,8 @@ func (store *dataStore) FindChildren(sid int64, lim int32) []common.SpanId { type predicateData struct { *common.Predicate - intKey int64 - strKey string + uintKey uint64 + strKey string } func loadPredicateData(pred *common.Predicate) (*predicateData, error) { @@ -499,20 +512,20 @@ func loadPredicateData(pred *common.Predicate) (*predicateData, error) { return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s", pred.Val, err.Error())) } - p.intKey = id.Val() + p.uintKey = id.Val() break case common.DESCRIPTION: // Any string is valid for a description. p.strKey = pred.Val break case common.BEGIN_TIME, common.END_TIME, common.DURATION: - // Base-10 numeric fields. + // Parse a base-10 signed numeric field. v, err := strconv.ParseInt(pred.Val, 10, 64) if err != nil { return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s", pred.Field, pred.Val, err.Error())) } - p.intKey = v + p.uintKey = s2u64(v) break default: return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field)) @@ -563,18 +576,18 @@ func (pred *predicateData) fieldIsNumeric() bool { } // Get the values that this predicate cares about for a given span. -func (pred *predicateData) extractRelevantSpanData(span *common.Span) (int64, string) { +func (pred *predicateData) extractRelevantSpanData(span *common.Span) (uint64, string) { switch pred.Field { case common.SPAN_ID: return span.Id.Val(), "" case common.DESCRIPTION: return 0, span.Description case common.BEGIN_TIME: - return span.Begin, "" + return s2u64(span.Begin), "" case common.END_TIME: - return span.End, "" + return s2u64(span.End), "" case common.DURATION: - return span.Duration(), "" + return s2u64(span.Duration()), "" default: panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field)) } @@ -614,13 +627,13 @@ func (pred *predicateData) satisfiedBy(span *common.Span) bool { if pred.fieldIsNumeric() { switch pred.Op { case common.EQUALS: - return intVal == pred.intKey + return intVal == pred.uintKey case common.LESS_THAN_OR_EQUALS: - return intVal <= pred.intKey + return intVal <= pred.uintKey case common.GREATER_THAN_OR_EQUALS: - return intVal >= pred.intKey + return intVal >= pred.uintKey case common.GREATER_THAN: - return intVal > pred.intKey + return intVal > pred.uintKey default: panic(fmt.Sprintf("unknown Op type %s should have been caught "+ "during normalization", pred.Op)) @@ -666,7 +679,7 @@ func (pred *predicateData) createSource(store *dataStore) (*source, error) { shd := store.shards[shardIdx] src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts)) } - searchKey := makeKey(src.keyPrefix, pred.intKey) + searchKey := makeKey(src.keyPrefix, pred.uintKey) for i := range src.iters { src.iters[i].Seek(searchKey) } @@ -729,23 +742,23 @@ func (src *source) populateNextFromShard(shardIdx int) { break // Can't read past end of indexed section } var span *common.Span - var sid int64 + var sid common.SpanId if src.keyPrefix == SPAN_ID_INDEX_PREFIX { // The span id maps to the span itself. - sid = keyToInt(key[1:]) + sid = common.SpanId(keyToInt(key[1:])) span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value()) if err != nil { - lg.Debugf("Internal error decoding span %016x in shard %d: %s\n", - sid, shardIdx, err.Error()) + lg.Debugf("Internal error decoding span %s in shard %d: %s\n", + sid.String(), shardIdx, err.Error()) break } } else { // With a secondary index, we have to look up the span by id. - sid = keyToInt(key[9:]) + sid = common.SpanId(keyToInt(key[9:])) span = src.store.shards[shardIdx].FindSpan(sid) if span == nil { - lg.Debugf("Internal error rehydrating span %016x in shard %d\n", - sid, shardIdx) + lg.Debugf("Internal error rehydrating span %s in shard %d\n", + sid.String(), shardIdx) break } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0fd38354/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go index 495aed0..dc9a061 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go @@ -64,6 +64,7 @@ func (hand *serverInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques fmt.Sprintf("error marshalling ServerInfo: %s\n", err.Error())) return } + hand.lg.Debugf("Returned serverInfo %s\n", string(buf)) w.Write(buf) } @@ -72,15 +73,16 @@ type dataStoreHandler struct { store *dataStore } -func (hand *dataStoreHandler) parse64(w http.ResponseWriter, str string) (int64, bool) { +func (hand *dataStoreHandler) parseSid(w http.ResponseWriter, + str string) (common.SpanId, bool) { val, err := strconv.ParseUint(str, 16, 64) if err != nil { writeError(hand.lg, w, http.StatusBadRequest, fmt.Sprintf("Failed to parse span ID %s: %s", str, err.Error())) w.Write([]byte("Error parsing : " + err.Error())) - return -1, false + return 0, false } - return int64(val), true + return common.SpanId(val), true } func (hand *dataStoreHandler) getReqField32(fieldName string, w http.ResponseWriter, @@ -108,15 +110,15 @@ func (hand *findSidHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) req.ParseForm() vars := mux.Vars(req) stringSid := vars["id"] - sid, ok := hand.parse64(w, stringSid) + sid, ok := hand.parseSid(w, stringSid) if !ok { return } - hand.lg.Debugf("findSidHandler(sid=%s)\n", common.SpanId(sid)) + hand.lg.Debugf("findSidHandler(sid=%s)\n", sid.String()) span := hand.store.FindSpan(sid) if span == nil { - writeError(hand.lg, w, http.StatusNoContent, fmt.Sprintf("No such span as %s\n", - common.SpanId(sid))) + writeError(hand.lg, w, http.StatusNoContent, + fmt.Sprintf("No such span as %s\n", sid.String())) return } w.Write(span.ToJson()) @@ -131,7 +133,7 @@ func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ req.ParseForm() vars := mux.Vars(req) stringSid := vars["id"] - sid, ok := hand.parse64(w, stringSid) + sid, ok := hand.parseSid(w, stringSid) if !ok { return } @@ -140,7 +142,7 @@ func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ if !ok { return } - hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", common.SpanId(sid), lim) + hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", sid.String(), lim) children := hand.store.FindChildren(sid, lim) jbytes, err := json.Marshal(children) if err != nil {
