Repository: incubator-htrace Updated Branches: refs/heads/master 44397a051 -> 2210541c4
HTRACE-328. htraced continues scanning in some cases even when no more results are possible (Colin Patrick McCabe via iwasakims) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/2210541c Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/2210541c Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/2210541c Branch: refs/heads/master Commit: 2210541c411d7920cf36d7da5f5ba706833a8af6 Parents: 44397a0 Author: Masatake Iwasaki <[email protected]> Authored: Thu Dec 17 23:48:38 2015 +0900 Committer: Masatake Iwasaki <[email protected]> Committed: Thu Dec 17 23:48:38 2015 +0900 ---------------------------------------------------------------------- .../src/org/apache/htrace/htraced/datastore.go | 68 +++++++++++++++----- .../org/apache/htrace/htraced/datastore_test.go | 44 ++++++++++++- .../go/src/org/apache/htrace/htraced/rest.go | 2 +- 3 files changed, 93 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/2210541c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index 596b652..82fb7b5 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -44,11 +44,11 @@ import ( // // The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data // coming from many daemons. Durability is not as big a concern as in some data stores, since -// losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package +// losing a little bit of trace data if htraced goes down is not critical. We use msgpack // for serialization. We assume that there will be many more writes than reads. // // Schema -// m -> dataStoreVersion +// w -> ShardInfo // s[8-byte-big-endian-sid] -> SpanData // b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {} // e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {} @@ -890,6 +890,19 @@ const ( SATISFIED = iota ) +func (r satisfiedByReturn) String() string { + switch (r) { + case NOT_SATISFIED: + return "NOT_SATISFIED" + case NOT_YET_SATISFIED: + return "NOT_YET_SATISFIED" + case SATISFIED: + return "SATISFIED" + default: + return "(unknown)" + } +} + // Determine whether the predicate is satisfied by the given span. func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn { val := pred.extractRelevantSpanData(span) @@ -910,7 +923,7 @@ func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn { if bytes.Compare(val, pred.key) <= 0 { return SATISFIED } else { - return NOT_SATISFIED + return NOT_YET_SATISFIED } case common.GREATER_THAN_OR_EQUALS: if bytes.Compare(val, pred.key) >= 0 { @@ -920,9 +933,7 @@ func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn { } case common.GREATER_THAN: cmp := bytes.Compare(val, pred.key) - if cmp < 0 { - return NOT_SATISFIED - } else if cmp == 0 { + if cmp <= 0 { return NOT_YET_SATISFIED } else { return SATISFIED @@ -1143,17 +1154,16 @@ func (src *source) populateNextFromShard(shardIdx int) { iter.Next() } ret = src.pred.satisfiedBy(span) - switch ret { - case NOT_SATISFIED: - break // This and subsequent entries don't satisfy predicate - case SATISFIED: + if ret == SATISFIED { if lg.DebugEnabled() { lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath) } src.nexts[shardIdx] = span // Found valid entry return - case NOT_YET_SATISFIED: - continue // try again + } + if ret == NOT_SATISFIED { + // This and subsequent entries don't satisfy predicate + break } } lg.Debugf("Closing iterator for shard %s.\n", shdPath) @@ -1209,6 +1219,18 @@ func (src *source) Close() { src.iters = nil } +func (src *source) getStats() string { + ret := fmt.Sprintf("Source stats: pred = %s", src.pred.String()) + prefix := ". " + for shardIdx := range src.shards { + next := fmt.Sprintf("%sRead %d spans from %s", prefix, + src.numRead[shardIdx], src.shards[shardIdx].path) + prefix = ", " + ret = ret + next + } + return ret +} + func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) { // Read spans from the first predicate that is indexed. p := *preds @@ -1231,7 +1253,7 @@ func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) return spanIdPredData.createSource(store, span) } -func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) { +func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error, []int) { lg := store.lg // Parse predicate data. var err error @@ -1239,14 +1261,14 @@ func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) for i := range query.Predicates { preds[i], err = loadPredicateData(&query.Predicates[i]) if err != nil { - return nil, err + return nil, err, nil } } // Get a source of rows. var src *source src, err = store.obtainSource(&preds, query.Prev) if err != nil { - return nil, err + return nil, err, nil } defer src.Close() if lg.DebugEnabled() { @@ -1254,13 +1276,25 @@ func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) } // Filter the spans through the remaining predicates. - ret := make([]*common.Span, 0, 32) + reserved := 32 + if query.Lim < reserved { + reserved = query.Lim + } + ret := make([]*common.Span, 0, reserved) for { if len(ret) >= query.Lim { + if lg.DebugEnabled() { + lg.Debugf("HandleQuery %s: hit query limit after obtaining " + + "%d results. %s\n.", query, query.Lim, src.getStats()) + } break // we hit the result size limit } span := src.next() if span == nil { + if lg.DebugEnabled() { + lg.Debugf("HandleQuery %s: found %d result(s), which are " + + "all that exist. %s\n", query, len(ret), src.getStats()) + } break // the source has no more spans to give } if lg.DebugEnabled() { @@ -1277,7 +1311,7 @@ func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) ret = append(ret, span) } } - return ret, nil + return ret, nil, src.numRead } func (store *dataStore) ServerStats() *common.ServerStats { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/2210541c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go index a693874..281ee2d 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go @@ -28,6 +28,7 @@ import ( "org/apache/htrace/conf" "org/apache/htrace/test" "os" + "reflect" "sort" "testing" "time" @@ -122,10 +123,15 @@ func TestDatastoreWriteAndRead(t *testing.T) { } func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query, - expectedSpans []common.Span) { - spans, err := ht.Store.HandleQuery(query) + expectedSpans []common.Span) { + testQueryExt(t, ht, query, expectedSpans, nil) +} + +func testQueryExt(t *testing.T, ht *MiniHTraced, query *common.Query, + expectedSpans []common.Span, expectedNumScanned []int) { + spans, err, numScanned := ht.Store.HandleQuery(query) if err != nil { - t.Fatalf("First query failed: %s\n", err.Error()) + t.Fatalf("Query %s failed: %s\n", query.String(), err.Error()) } expectedBuf := new(bytes.Buffer) dec := json.NewEncoder(expectedBuf) @@ -142,6 +148,12 @@ func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query, t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans), len(expectedSpans)) common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes())) + if expectedNumScanned != nil { + if !reflect.DeepEqual(expectedNumScanned, numScanned) { + t.Fatalf("Invalid values for numScanned: got %v, expected %v\n", + expectedNumScanned, numScanned) + } + } } // Test queries on the datastore. @@ -721,3 +733,29 @@ func TestQueriesWithContinuationTokens1(t *testing.T) { Prev: &SIMPLE_TEST_SPANS[1], }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]}) } + +func TestQueryRowsScanned(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueryRowsScanned", + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) + testQueryExt(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.EQUALS, + Field: common.SPAN_ID, + Val: common.TestId("00000000000000000000000000000001").String(), + }, + }, + Lim: 100, + Prev: nil, + }, []common.Span{SIMPLE_TEST_SPANS[0]}, + []int{2, 1}) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/2210541c/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go index 74ec0cf..eabeee7 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go @@ -278,7 +278,7 @@ func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } var results []*common.Span - results, err = hand.store.HandleQuery(&query) + results, err, _ = hand.store.HandleQuery(&query) if err != nil { writeError(hand.lg, w, http.StatusInternalServerError, fmt.Sprintf("Internal error processing query %s: %s",
