Repository: incubator-htrace Updated Branches: refs/heads/master 302e5b42c -> 8a4c9e577
HTRACE-160. htraced: support continuing a query from where the client left it off by sending a previous span (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/8a4c9e57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/8a4c9e57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/8a4c9e57 Branch: refs/heads/master Commit: 8a4c9e57775d4f07ce2068f2d78928ec1d34ed9d Parents: 302e5b4 Author: Colin P. Mccabe <[email protected]> Authored: Mon Apr 27 17:09:22 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Mon May 4 14:00:46 2015 -0700 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/common/query.go | 9 ++ .../src/go/src/org/apache/htrace/common/span.go | 4 + .../src/org/apache/htrace/htraced/datastore.go | 104 +++++++++++++++---- .../org/apache/htrace/htraced/datastore_test.go | 69 ++++++++++++ 4 files changed, 168 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8a4c9e57/htrace-htraced/src/go/src/org/apache/htrace/common/query.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/query.go b/htrace-htraced/src/go/src/org/apache/htrace/common/query.go index a32909e..8c9128f 100644 --- a/htrace-htraced/src/go/src/org/apache/htrace/common/query.go +++ b/htrace-htraced/src/go/src/org/apache/htrace/common/query.go @@ -103,9 +103,18 @@ type Predicate struct { Val string `val:"val"` } +func (pred *Predicate) String() string { + buf, err := json.Marshal(pred) + if err != nil { + panic(err) + } + return string(buf) +} + type Query struct { Predicates []Predicate `json:"pred"` Lim int `json:"lim"` + Prev *Span `json:"prev"` } func (query *Query) String() string { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8a4c9e57/htrace-htraced/src/go/src/org/apache/htrace/common/span.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/span.go b/htrace-htraced/src/go/src/org/apache/htrace/common/span.go index c273ad9..b276844 100644 --- a/htrace-htraced/src/go/src/org/apache/htrace/common/span.go +++ b/htrace-htraced/src/go/src/org/apache/htrace/common/span.go @@ -130,6 +130,10 @@ func (span *Span) ToJson() []byte { return jbytes } +func (span *Span) String() string { + return string(span.ToJson()) +} + // Compute the span duration. We ignore overflow since we never deal with negative times. func (span *Span) Duration() int64 { return span.End - span.Begin http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8a4c9e57/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go index a26779c..97af3fb 100644 --- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go @@ -90,19 +90,18 @@ func (stats *Statistics) Copy() *Statistics { } } -// Translate a span id into a leveldb key. -func makeKey(tag byte, sid uint64) []byte { - id := uint64(sid) +// Translate an 8-byte value into a leveldb key. +func makeKey(tag byte, val uint64) []byte { return []byte{ tag, - byte(0xff & (id >> 56)), - byte(0xff & (id >> 48)), - byte(0xff & (id >> 40)), - byte(0xff & (id >> 32)), - byte(0xff & (id >> 24)), - byte(0xff & (id >> 16)), - byte(0xff & (id >> 8)), - byte(0xff & (id >> 0)), + byte(0xff & (val >> 56)), + byte(0xff & (val >> 48)), + byte(0xff & (val >> 40)), + byte(0xff & (val >> 32)), + byte(0xff & (val >> 24)), + byte(0xff & (val >> 16)), + byte(0xff & (val >> 8)), + byte(0xff & (val >> 0)), } } @@ -704,7 +703,7 @@ func (pred *predicateData) satisfiedBy(span *common.Span) bool { } } -func (pred *predicateData) createSource(store *dataStore) (*source, error) { +func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) { var ret *source src := source{store: store, pred: pred, @@ -726,7 +725,76 @@ func (pred *predicateData) createSource(store *dataStore) (*source, error) { shd := store.shards[shardIdx] src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts)) } - searchKey := makeKey(src.keyPrefix, pred.uintKey) + var searchKey []byte + lg := store.lg + if prev != nil { + // If prev != nil, this query RPC is the continuation of a previous + // one. The final result returned the last time is 'prev'. + // + // To avoid returning the same results multiple times, we adjust the + // predicate here. If the predicate is on the span id field, we + // simply manipulate the span ID we're looking for. + // + // If the predicate is on a secondary index, we also use span ID, but + // in a slightly different way. Since the secondary indices are + // organized as [type-code][8b-secondary-key][8b-span-id], elements + // with the same secondary index field are ordered by span ID. So we + // create a 17-byte key incorporating the span ID from 'prev.' + var startId common.SpanId + switch (pred.Op) { + case common.EQUALS: + if pred.Field == common.SPAN_ID { + // This is an annoying corner case. There can only be one + // result each time we do an EQUALS search for a span id. + // Span id is the primary key for all our spans. + // But for some reason someone is asking for another result. + // We modify the query to search for the illegal 0 span ID, + // which will never be present. + lg.Debugf("Attempted to use a continuation token with an EQUALS " + + "SPAN_ID query. %s. Setting search id = 0", + pred.Predicate.String()) + startId = 0 + } else { + // When doing an EQUALS search on a secondary index, the + // results are sorted by span id. + startId = prev.Id + 1 + } + case common.LESS_THAN_OR_EQUALS: + // Subtract one from the previous span id. Since the previous + // start ID will never be 0 (0 is an illegal span id), we'll never + // wrap around when doing this. + startId = prev.Id - 1 + case common.GREATER_THAN_OR_EQUALS: + // We can't add one to the span id, since the previous span ID + // might be the maximum value. So just switch over to using + // GREATER_THAN. + pred.Op = common.GREATER_THAN + startId = prev.Id + case common.GREATER_THAN: + // This one is easy. + startId = prev.Id + default: + str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String()) + lg.Error(str + "\n") + panic(str) + } + if pred.Field == common.SPAN_ID { + pred.uintKey = uint64(startId) + searchKey = makeKey(src.keyPrefix, uint64(startId)) + } else { + // Start where the previous query left off. This means adjusting + // our uintKey. + pred.uintKey, _ = pred.extractRelevantSpanData(prev) + searchKey = makeSecondaryKey(src.keyPrefix, pred.uintKey, uint64(startId)) + } + if lg.TraceEnabled() { + lg.Tracef("Handling continuation token %s for %s. startId=%d, " + + "pred.uintKey=%d\n", prev, pred.Predicate.String(), startId, + pred.uintKey) + } + } else { + searchKey = makeKey(src.keyPrefix, pred.uintKey) + } for i := range src.iters { src.iters[i].Seek(searchKey) } @@ -815,7 +883,7 @@ func (src *source) populateNextFromShard(shardIdx int) { iter.Next() } if src.pred.satisfiedBy(span) { - lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx) + lg.Debugf("Populated valid span %v from shard %d.\n", sid, shardIdx) src.nexts[shardIdx] = span // Found valid entry return } else { @@ -861,14 +929,14 @@ func (src *source) Close() { src.iters = nil } -func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) { +func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) { // Read spans from the first predicate that is indexed. p := *preds for i := range p { pred := p[i] if pred.getIndexPrefix() != INVALID_INDEX_PREFIX { *preds = append(p[0:i], p[i+1:]...) - return pred.createSource(store) + return pred.createSource(store, span) } } // If there are no predicates that are indexed, read rows in order of span id. @@ -880,7 +948,7 @@ func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) { if err != nil { return nil, err } - return spanIdPredData.createSource(store) + return spanIdPredData.createSource(store, span) } func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) { @@ -896,7 +964,7 @@ func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) } // Get a source of rows. var src *source - src, err = store.obtainSource(&preds) + src, err = store.obtainSource(&preds, query.Prev) if err != nil { return nil, err } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8a4c9e57/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go index 79a7c4f..8a14f30 100644 --- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go @@ -443,3 +443,72 @@ func TestReloadDataStore(t *testing.T) { "incorrect version. But it failed with error %s\n", err.Error()) } } + +func TestQueriesWithContinuationTokens1(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + // Adding a prev value to this query excludes the first result that we + // would normally get. + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.BEGIN_TIME, + Val: "120", + }, + }, + Lim: 5, + Prev: &SIMPLE_TEST_SPANS[0], + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) + + // There is only one result from an EQUALS query on SPAN_ID. + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.EQUALS, + Field: common.SPAN_ID, + Val: "1", + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[0], + }, []common.Span{}) + + // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the + // span we pass as a continuation token. (Primary index edition). + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: "2", + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[1], + }, []common.Span{SIMPLE_TEST_SPANS[0]}) + + // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back the + // span we pass as a continuation token. (Secondary index edition). + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.DURATION, + Val: "0", + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[1], + }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]}) +}
