Repository: incubator-htrace
Updated Branches:
  refs/heads/master 302e5b42c -> 8a4c9e577


HTRACE-160. htraced: support continuing a query from where the client left it 
off by sending a previous span (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/8a4c9e57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/8a4c9e57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/8a4c9e57

Branch: refs/heads/master
Commit: 8a4c9e57775d4f07ce2068f2d78928ec1d34ed9d
Parents: 302e5b4
Author: Colin P. Mccabe <[email protected]>
Authored: Mon Apr 27 17:09:22 2015 -0700
Committer: Colin P. Mccabe <[email protected]>
Committed: Mon May 4 14:00:46 2015 -0700

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/common/query.go    |   9 ++
 .../src/go/src/org/apache/htrace/common/span.go |   4 +
 .../src/org/apache/htrace/htraced/datastore.go  | 104 +++++++++++++++----
 .../org/apache/htrace/htraced/datastore_test.go |  69 ++++++++++++
 4 files changed, 168 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8a4c9e57/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/query.go 
b/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
index a32909e..8c9128f 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
@@ -103,9 +103,18 @@ type Predicate struct {
        Val   string `val:"val"`
 }
 
+func (pred *Predicate) String() string {
+       buf, err := json.Marshal(pred)
+       if err != nil {
+               panic(err)
+       }
+       return string(buf)
+}
+
 type Query struct {
        Predicates []Predicate `json:"pred"`
        Lim        int         `json:"lim"`
+       Prev       *Span       `json:"prev"`
 }
 
 func (query *Query) String() string {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8a4c9e57/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/span.go 
b/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
index c273ad9..b276844 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
@@ -130,6 +130,10 @@ func (span *Span) ToJson() []byte {
        return jbytes
 }
 
+func (span *Span) String() string {
+       return string(span.ToJson())
+}
+
 // Compute the span duration.  We ignore overflow since we never deal with 
negative times.
 func (span *Span) Duration() int64 {
        return span.End - span.Begin

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8a4c9e57/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
index a26779c..97af3fb 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
@@ -90,19 +90,18 @@ func (stats *Statistics) Copy() *Statistics {
        }
 }
 
-// Translate a span id into a leveldb key.
-func makeKey(tag byte, sid uint64) []byte {
-       id := uint64(sid)
+// Translate an 8-byte value into a leveldb key.
+func makeKey(tag byte, val uint64) []byte {
        return []byte{
                tag,
-               byte(0xff & (id >> 56)),
-               byte(0xff & (id >> 48)),
-               byte(0xff & (id >> 40)),
-               byte(0xff & (id >> 32)),
-               byte(0xff & (id >> 24)),
-               byte(0xff & (id >> 16)),
-               byte(0xff & (id >> 8)),
-               byte(0xff & (id >> 0)),
+               byte(0xff & (val >> 56)),
+               byte(0xff & (val >> 48)),
+               byte(0xff & (val >> 40)),
+               byte(0xff & (val >> 32)),
+               byte(0xff & (val >> 24)),
+               byte(0xff & (val >> 16)),
+               byte(0xff & (val >> 8)),
+               byte(0xff & (val >> 0)),
        }
 }
 
@@ -704,7 +703,7 @@ func (pred *predicateData) satisfiedBy(span *common.Span) 
bool {
        }
 }
 
-func (pred *predicateData) createSource(store *dataStore) (*source, error) {
+func (pred *predicateData) createSource(store *dataStore, prev *common.Span) 
(*source, error) {
        var ret *source
        src := source{store: store,
                pred:      pred,
@@ -726,7 +725,76 @@ func (pred *predicateData) createSource(store *dataStore) 
(*source, error) {
                shd := store.shards[shardIdx]
                src.iters = append(src.iters, 
shd.ldb.NewIterator(store.readOpts))
        }
-       searchKey := makeKey(src.keyPrefix, pred.uintKey)
+       var searchKey []byte
+       lg := store.lg
+       if prev != nil {
+               // If prev != nil, this query RPC is the continuation of a 
previous
+               // one.  The final result returned the last time is 'prev'.
+               //
+               // To avoid returning the same results multiple times, we 
adjust the
+               // predicate here.  If the predicate is on the span id field, we
+               // simply manipulate the span ID we're looking for.
+               //
+               // If the predicate is on a secondary index, we also use span 
ID, but
+               // in a slightly different way.  Since the secondary indices are
+               // organized as [type-code][8b-secondary-key][8b-span-id], 
elements
+               // with the same secondary index field are ordered by span ID.  
So we
+               // create a 17-byte key incorporating the span ID from 'prev.'
+               var startId common.SpanId
+               switch (pred.Op) {
+               case common.EQUALS:
+                       if pred.Field == common.SPAN_ID {
+                               // This is an annoying corner case.  There can 
only be one
+                               // result each time we do an EQUALS search for 
a span id.
+                               // Span id is the primary key for all our spans.
+                               // But for some reason someone is asking for 
another result.
+                               // We modify the query to search for the 
illegal 0 span ID,
+                               // which will never be present.
+                               lg.Debugf("Attempted to use a continuation 
token with an EQUALS " +
+                                       "SPAN_ID query. %s.  Setting search id 
= 0",
+                                       pred.Predicate.String())
+                               startId = 0
+                       } else {
+                               // When doing an EQUALS search on a secondary 
index, the
+                               // results are sorted by span id.
+                               startId = prev.Id + 1
+                       }
+               case common.LESS_THAN_OR_EQUALS:
+                       // Subtract one from the previous span id.  Since the 
previous
+                       // start ID will never be 0 (0 is an illegal span id), 
we'll never
+                       // wrap around when doing this.
+                       startId = prev.Id - 1
+               case common.GREATER_THAN_OR_EQUALS:
+                       // We can't add one to the span id, since the previous 
span ID
+                       // might be the maximum value.  So just switch over to 
using
+                       // GREATER_THAN.
+                       pred.Op = common.GREATER_THAN
+                       startId = prev.Id
+               case common.GREATER_THAN:
+                       // This one is easy.
+                       startId = prev.Id
+               default:
+                       str := fmt.Sprintf("Can't use a %v predicate as a 
source.", pred.Predicate.String())
+                       lg.Error(str + "\n")
+                       panic(str)
+               }
+               if pred.Field == common.SPAN_ID {
+                       pred.uintKey = uint64(startId)
+                       searchKey = makeKey(src.keyPrefix, uint64(startId))
+               } else {
+                       // Start where the previous query left off.  This means 
adjusting
+                       // our uintKey.
+                       pred.uintKey, _ = pred.extractRelevantSpanData(prev)
+                       searchKey = makeSecondaryKey(src.keyPrefix, 
pred.uintKey, uint64(startId))
+               }
+               if lg.TraceEnabled() {
+                       lg.Tracef("Handling continuation token %s for %s.  
startId=%d, " +
+                               "pred.uintKey=%d\n", prev, 
pred.Predicate.String(), startId,
+                               pred.uintKey)
+               }
+       } else {
+               searchKey = makeKey(src.keyPrefix, pred.uintKey)
+       }
        for i := range src.iters {
                src.iters[i].Seek(searchKey)
        }
@@ -815,7 +883,7 @@ func (src *source) populateNextFromShard(shardIdx int) {
                        iter.Next()
                }
                if src.pred.satisfiedBy(span) {
-                       lg.Debugf("Populated valid span %016x from shard 
%d.\n", sid, shardIdx)
+                       lg.Debugf("Populated valid span %v from shard %d.\n", 
sid, shardIdx)
                        src.nexts[shardIdx] = span // Found valid entry
                        return
                } else {
@@ -861,14 +929,14 @@ func (src *source) Close() {
        src.iters = nil
 }
 
-func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) 
{
+func (store *dataStore) obtainSource(preds *[]*predicateData, span 
*common.Span) (*source, error) {
        // Read spans from the first predicate that is indexed.
        p := *preds
        for i := range p {
                pred := p[i]
                if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
                        *preds = append(p[0:i], p[i+1:]...)
-                       return pred.createSource(store)
+                       return pred.createSource(store, span)
                }
        }
        // If there are no predicates that are indexed, read rows in order of 
span id.
@@ -880,7 +948,7 @@ func (store *dataStore) obtainSource(preds 
*[]*predicateData) (*source, error) {
        if err != nil {
                return nil, err
        }
-       return spanIdPredData.createSource(store)
+       return spanIdPredData.createSource(store, span)
 }
 
 func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, 
error) {
@@ -896,7 +964,7 @@ func (store *dataStore) HandleQuery(query *common.Query) 
([]*common.Span, error)
        }
        // Get a source of rows.
        var src *source
-       src, err = store.obtainSource(&preds)
+       src, err = store.obtainSource(&preds, query.Prev)
        if err != nil {
                return nil, err
        }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8a4c9e57/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
index 79a7c4f..8a14f30 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -443,3 +443,72 @@ func TestReloadDataStore(t *testing.T) {
                        "incorrect version.  But it failed with error %s\n", 
err.Error())
        }
 }
+
+func TestQueriesWithContinuationTokens1(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: 
"TestQueriesWithContinuationTokens1",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       // Adding a prev value to this query excludes the first result that we
+       // would normally get.
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.BEGIN_TIME,
+                               Val:   "120",
+                       },
+               },
+               Lim: 5,
+               Prev: &SIMPLE_TEST_SPANS[0],
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+       // There is only one result from an EQUALS query on SPAN_ID.
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   "1",
+                       },
+               },
+               Lim: 100,
+               Prev: &SIMPLE_TEST_SPANS[0],
+       }, []common.Span{})
+
+       // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the
+       // span we pass as a continuation token. (Primary index edition).
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   "2",
+                       },
+               },
+               Lim: 100,
+               Prev: &SIMPLE_TEST_SPANS[1],
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+       // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back 
the
+       // span we pass as a continuation token. (Secondary index edition).
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN,
+                               Field: common.DURATION,
+                               Val:   "0",
+                       },
+               },
+               Lim: 100,
+               Prev: &SIMPLE_TEST_SPANS[1],
+       }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]})
+}

Reply via email to