Repository: incubator-htrace
Updated Branches:
  refs/heads/master 44397a051 -> 2210541c4


HTRACE-328. htraced continues scanning in some cases even when no more results 
are possible (Colin Patrick McCabe via iwasakims)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/2210541c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/2210541c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/2210541c

Branch: refs/heads/master
Commit: 2210541c411d7920cf36d7da5f5ba706833a8af6
Parents: 44397a0
Author: Masatake Iwasaki <[email protected]>
Authored: Thu Dec 17 23:48:38 2015 +0900
Committer: Masatake Iwasaki <[email protected]>
Committed: Thu Dec 17 23:48:38 2015 +0900

----------------------------------------------------------------------
 .../src/org/apache/htrace/htraced/datastore.go  | 68 +++++++++++++++-----
 .../org/apache/htrace/htraced/datastore_test.go | 44 ++++++++++++-
 .../go/src/org/apache/htrace/htraced/rest.go    |  2 +-
 3 files changed, 93 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/2210541c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 596b652..82fb7b5 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -44,11 +44,11 @@ import (
 //
 // The main emphasis in the HTraceD data store is on quickly and efficiently 
storing trace span data
 // coming from many daemons.  Durability is not as big a concern as in some 
data stores, since
-// losing a little bit of trace data if htraced goes down is not critical.  We 
use the "gob" package
+// losing a little bit of trace data if htraced goes down is not critical.  We 
use msgpack
 // for serialization.  We assume that there will be many more writes than 
reads.
 //
 // Schema
-// m -> dataStoreVersion
+// w -> ShardInfo
 // s[8-byte-big-endian-sid] -> SpanData
 // b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
 // e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
@@ -890,6 +890,19 @@ const (
        SATISFIED = iota
 )
 
+func (r satisfiedByReturn) String() string {
+       switch (r) {
+       case NOT_SATISFIED:
+               return "NOT_SATISFIED"
+       case NOT_YET_SATISFIED:
+               return "NOT_YET_SATISFIED"
+       case SATISFIED:
+               return "SATISFIED"
+       default:
+               return "(unknown)"
+       }
+}
+
 // Determine whether the predicate is satisfied by the given span.
 func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn {
        val := pred.extractRelevantSpanData(span)
@@ -910,7 +923,7 @@ func (pred *predicateData) satisfiedBy(span *common.Span) 
satisfiedByReturn {
                if bytes.Compare(val, pred.key) <= 0 {
                        return SATISFIED
                } else {
-                       return NOT_SATISFIED
+                       return NOT_YET_SATISFIED
                }
        case common.GREATER_THAN_OR_EQUALS:
                if bytes.Compare(val, pred.key) >= 0 {
@@ -920,9 +933,7 @@ func (pred *predicateData) satisfiedBy(span *common.Span) 
satisfiedByReturn {
                }
        case common.GREATER_THAN:
                cmp := bytes.Compare(val, pred.key)
-               if cmp < 0 {
-                       return NOT_SATISFIED
-               } else if cmp == 0 {
+               if cmp <= 0 {
                        return NOT_YET_SATISFIED
                } else {
                        return SATISFIED
@@ -1143,17 +1154,16 @@ func (src *source) populateNextFromShard(shardIdx int) {
                        iter.Next()
                }
                ret = src.pred.satisfiedBy(span)
-               switch ret {
-               case NOT_SATISFIED:
-                       break // This and subsequent entries don't satisfy 
predicate
-               case SATISFIED:
+               if ret == SATISFIED {
                        if lg.DebugEnabled() {
                                lg.Debugf("Populated valid span %v from shard 
%s.\n", sid, shdPath)
                        }
                        src.nexts[shardIdx] = span // Found valid entry
                        return
-               case NOT_YET_SATISFIED:
-                       continue // try again
+               }
+               if ret == NOT_SATISFIED {
+                       // This and subsequent entries don't satisfy predicate
+                       break
                }
        }
        lg.Debugf("Closing iterator for shard %s.\n", shdPath)
@@ -1209,6 +1219,18 @@ func (src *source) Close() {
        src.iters = nil
 }
 
+func (src *source) getStats() string {
+       ret := fmt.Sprintf("Source stats: pred = %s", src.pred.String())
+       prefix := ". "
+       for shardIdx := range src.shards {
+               next := fmt.Sprintf("%sRead %d spans from %s", prefix,
+                       src.numRead[shardIdx], src.shards[shardIdx].path)
+               prefix = ", "
+               ret = ret + next
+       }
+       return ret
+}
+
 func (store *dataStore) obtainSource(preds *[]*predicateData, span 
*common.Span) (*source, error) {
        // Read spans from the first predicate that is indexed.
        p := *preds
@@ -1231,7 +1253,7 @@ func (store *dataStore) obtainSource(preds 
*[]*predicateData, span *common.Span)
        return spanIdPredData.createSource(store, span)
 }
 
-func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, 
error) {
+func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, 
error, []int) {
        lg := store.lg
        // Parse predicate data.
        var err error
@@ -1239,14 +1261,14 @@ func (store *dataStore) HandleQuery(query 
*common.Query) ([]*common.Span, error)
        for i := range query.Predicates {
                preds[i], err = loadPredicateData(&query.Predicates[i])
                if err != nil {
-                       return nil, err
+                       return nil, err, nil
                }
        }
        // Get a source of rows.
        var src *source
        src, err = store.obtainSource(&preds, query.Prev)
        if err != nil {
-               return nil, err
+               return nil, err, nil
        }
        defer src.Close()
        if lg.DebugEnabled() {
@@ -1254,13 +1276,25 @@ func (store *dataStore) HandleQuery(query 
*common.Query) ([]*common.Span, error)
        }
 
        // Filter the spans through the remaining predicates.
-       ret := make([]*common.Span, 0, 32)
+       reserved := 32
+       if query.Lim < reserved {
+               reserved = query.Lim
+       }
+       ret := make([]*common.Span, 0, reserved)
        for {
                if len(ret) >= query.Lim {
+                       if lg.DebugEnabled() {
+                               lg.Debugf("HandleQuery %s: hit query limit 
after obtaining " +
+                                       "%d results. %s\n.", query, query.Lim, 
src.getStats())
+                       }
                        break // we hit the result size limit
                }
                span := src.next()
                if span == nil {
+                       if lg.DebugEnabled() {
+                               lg.Debugf("HandleQuery %s: found %d result(s), 
which are " +
+                                       "all that exist. %s\n", query, 
len(ret), src.getStats())
+                       }
                        break // the source has no more spans to give
                }
                if lg.DebugEnabled() {
@@ -1277,7 +1311,7 @@ func (store *dataStore) HandleQuery(query *common.Query) 
([]*common.Span, error)
                        ret = append(ret, span)
                }
        }
-       return ret, nil
+       return ret, nil, src.numRead
 }
 
 func (store *dataStore) ServerStats() *common.ServerStats {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/2210541c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index a693874..281ee2d 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -28,6 +28,7 @@ import (
        "org/apache/htrace/conf"
        "org/apache/htrace/test"
        "os"
+       "reflect"
        "sort"
        "testing"
        "time"
@@ -122,10 +123,15 @@ func TestDatastoreWriteAndRead(t *testing.T) {
 }
 
 func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
-       expectedSpans []common.Span) {
-       spans, err := ht.Store.HandleQuery(query)
+               expectedSpans []common.Span) {
+       testQueryExt(t, ht, query, expectedSpans, nil)
+}
+
+func testQueryExt(t *testing.T, ht *MiniHTraced, query *common.Query,
+       expectedSpans []common.Span, expectedNumScanned []int) {
+       spans, err, numScanned := ht.Store.HandleQuery(query)
        if err != nil {
-               t.Fatalf("First query failed: %s\n", err.Error())
+               t.Fatalf("Query %s failed: %s\n", query.String(), err.Error())
        }
        expectedBuf := new(bytes.Buffer)
        dec := json.NewEncoder(expectedBuf)
@@ -142,6 +148,12 @@ func testQuery(t *testing.T, ht *MiniHTraced, query 
*common.Query,
        t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
                len(expectedSpans))
        common.ExpectStrEqual(t, string(expectedBuf.Bytes()), 
string(spansBuf.Bytes()))
+       if expectedNumScanned != nil {
+               if !reflect.DeepEqual(expectedNumScanned, numScanned) {
+                       t.Fatalf("Invalid values for numScanned: got %v, 
expected %v\n",
+                                       expectedNumScanned, numScanned)
+               }
+       }
 }
 
 // Test queries on the datastore.
@@ -721,3 +733,29 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
                Prev: &SIMPLE_TEST_SPANS[1],
        }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]})
 }
+
+func TestQueryRowsScanned(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueryRowsScanned",
+               WrittenSpans: common.NewSemaphore(0),
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
+       testQueryExt(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   
common.TestId("00000000000000000000000000000001").String(),
+                       },
+               },
+               Lim:  100,
+               Prev: nil,
+       }, []common.Span{SIMPLE_TEST_SPANS[0]},
+       []int{2, 1})
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/2210541c/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 74ec0cf..eabeee7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -278,7 +278,7 @@ func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, 
req *http.Request) {
                return
        }
        var results []*common.Span
-       results, err = hand.store.HandleQuery(&query)
+       results, err, _ = hand.store.HandleQuery(&query)
        if err != nil {
                writeError(hand.lg, w, http.StatusInternalServerError,
                        fmt.Sprintf("Internal error processing query %s: %s",

Reply via email to