Repository: incubator-htrace Updated Branches: refs/heads/master 0ca3f8f23 -> af2c084ed
HTRACE-88. Add REST query API to htraced (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/af2c084e Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/af2c084e Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/af2c084e Branch: refs/heads/master Commit: af2c084ed8fe5af6a4cee350d80f154f9e328aa5 Parents: 0ca3f8f Author: Colin P. Mccabe <[email protected]> Authored: Wed Jan 14 19:41:10 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Tue Jan 27 16:51:28 2015 -0800 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/client/client.go | 19 + .../go/src/org/apache/htrace/common/query.go | 87 ++++ .../src/go/src/org/apache/htrace/common/span.go | 11 +- .../src/org/apache/htrace/htraced/datastore.go | 417 ++++++++++++++++++- .../org/apache/htrace/htraced/datastore_test.go | 155 +++++++ .../go/src/org/apache/htrace/htraced/rest.go | 43 +- 6 files changed, 718 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go b/htrace-core/src/go/src/org/apache/htrace/client/client.go index fbcdcc6..52fe78e 100644 --- a/htrace-core/src/go/src/org/apache/htrace/client/client.go +++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go @@ -106,6 +106,25 @@ func (hcl *Client) FindChildren(sid common.SpanId, lim int) ([]common.SpanId, er return spanIds, nil } +// Make a query +func (hcl *Client) Query(query *common.Query) ([]common.Span, error) { + in, err := json.Marshal(query) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error marshalling query: %s", err.Error())) + } + var out []byte + out, _, err = hcl.makeRestRequest("GET", "query", bytes.NewReader(in)) + if err != nil { + return nil, err + } + var spans []common.Span + err = json.Unmarshal(out, &spans) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error unmarshalling results: %s", err.Error())) + } + return spans, nil +} + func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) { return hcl.makeRestRequest("GET", reqName, nil) } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/common/query.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/query.go b/htrace-core/src/go/src/org/apache/htrace/common/query.go new file mode 100644 index 0000000..0c909a1 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/common/query.go @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "encoding/json" +) + +// +// Represents queries that can be sent to htraced. +// +// Each query consists of set of predicates that will be 'AND'ed together to +// return a set of spans. Predicates contain an operation, a field, and a +// value. +// +// For example, a query might be "return the first 100 spans between 5:00pm +// and 5:01pm" This query would have two predicates: time greater than or +// equal to 5:00pm, and time less than or equal to 5:01pm. +// In HTrace, times are always expressed in milliseconds since the Epoch. +// So this would become: +// { "lim" : 100, "pred" : [ +// { "op" : "ge", "field" : "begin", "val" : 1234 }, +// { "op" : "le", "field" : "begin", "val" : 5678 }, +// ] } +// +// Where '1234' and '5678' were replaced by times since the epoch in +// milliseconds. +// + +type Op string + +const ( + CONTAINS Op = "cn" + EQUALS Op = "eq" + LESS_THAN_OR_EQUALS Op = "le" + GREATER_THAN_OR_EQUALS Op = "ge" +) + +func (op Op) IsDescending() bool { + return op == LESS_THAN_OR_EQUALS +} + +type Field string + +const ( + SPAN_ID Field = "spanid" + DESCRIPTION Field = "description" + BEGIN_TIME Field = "begin" + END_TIME Field = "end" + DURATION Field = "duration" +) + +type Predicate struct { + Op Op `json:"op"` + Field Field `json:"field"` + Val string `val:"val"` +} + +type Query struct { + Predicates []Predicate `json:"pred"` + Lim int `json:"lim"` +} + +func (query *Query) String() string { + buf, err := json.Marshal(query) + if err != nil { + panic(err) + } + return string(buf) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/common/span.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span.go b/htrace-core/src/go/src/org/apache/htrace/common/span.go index 36e716a..64975d2 100644 --- a/htrace-core/src/go/src/org/apache/htrace/common/span.go +++ b/htrace-core/src/go/src/org/apache/htrace/common/span.go @@ -80,7 +80,11 @@ func (id *SpanId) UnmarshalJSON(b []byte) error { if b[len(b)-1] != DOUBLE_QUOTE { return errors.New("Expected spanID to end with a string quote.") } - v, err := strconv.ParseUint(string(b[1:len(b)-1]), 16, 64) + return id.FromString(string(b[1 : len(b)-1])) +} + +func (id *SpanId) FromString(str string) error { + v, err := strconv.ParseUint(str, 16, 64) if err != nil { return err } @@ -111,3 +115,8 @@ func (span *Span) ToJson() []byte { } return jbytes } + +// Compute the span duration. We ignore overflow since we never deal with negative times. +func (span *Span) Duration() int64 { + return span.End - span.Begin +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go index 40678bd..523b7ab 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go @@ -22,10 +22,13 @@ package main import ( "bytes" "encoding/gob" + "errors" + "fmt" "github.com/jmhodges/levigo" "org/apache/htrace/common" "org/apache/htrace/conf" "os" + "strconv" "strings" "sync/atomic" "syscall" @@ -49,14 +52,23 @@ import ( // Schema // m -> dataStoreMetadata // s[8-byte-big-endian-sid] -> SpanData +// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {} +// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {} +// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {} // p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {} -// t[8-byte-big-endian-time][8-byte-big-endian-child-sid] -> {} // const DATA_STORE_VERSION = 1 var EMPTY_BYTE_BUF []byte = []byte{} +const SPAN_ID_INDEX_PREFIX = 's' +const BEGIN_TIME_INDEX_PREFIX = 'b' +const END_TIME_INDEX_PREFIX = 'e' +const DURATION_INDEX_PREFIX = 'd' +const PARENT_ID_INDEX_PREFIX = 'p' +const INVALID_INDEX_PREFIX = 0 + type Statistics struct { NumSpansWritten uint64 } @@ -190,15 +202,21 @@ func (shd *shard) writeSpan(span *common.Span) error { if err != nil { return err } - batch.Put(makeKey('s', span.Id.Val()), spanDataBuf.Bytes()) + batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), spanDataBuf.Bytes()) // Add this to the parent index. for parentIdx := range span.Parents { - batch.Put(makeSecondaryKey('p', span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF) + batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX, + span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF) } - // Add this to the timeline index. - batch.Put(makeSecondaryKey('t', span.Begin, span.Id.Val()), EMPTY_BYTE_BUF) + // Add to the other secondary indices. + batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, span.Begin, + span.Id.Val()), EMPTY_BYTE_BUF) + batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, span.End, + span.Id.Val()), EMPTY_BYTE_BUF) + batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, span.Duration(), + span.Id.Val()), EMPTY_BYTE_BUF) err = shd.ldb.Write(shd.store.writeOpts, batch) if err != nil { @@ -407,21 +425,30 @@ func (shd *shard) FindSpan(sid int64) *common.Span { shd.path, sid, err.Error()) return nil } - r := bytes.NewBuffer(buf) - decoder := gob.NewDecoder(r) - data := common.SpanData{} - err = decoder.Decode(&data) + var span *common.Span + span, err = shd.decodeSpan(sid, buf) if err != nil { lg.Errorf("Shard(%s): FindSpan(%016x) decode error: %s\n", shd.path, sid, err.Error()) return nil } + return span +} + +func (shd *shard) decodeSpan(sid int64, buf []byte) (*common.Span, error) { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + data := common.SpanData{} + err := decoder.Decode(&data) + if err != nil { + return nil, err + } // Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with // non-nil slices. if data.Parents == nil { data.Parents = []common.SpanId{} } - return &common.Span{Id: common.SpanId(sid), SpanData: data} + return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil } // Find the children of a given span id. @@ -453,5 +480,371 @@ func (store *dataStore) FindChildren(sid int64, lim int32) []common.SpanId { return childIds } -//func (store *dataStore) FindByTimeRange(startTime int64, endTime int64, lim int32) []int64 { -//} +type predicateData struct { + *common.Predicate + intKey int64 + strKey string +} + +func loadPredicateData(pred *common.Predicate) (*predicateData, error) { + p := predicateData{Predicate: pred} + + // Parse the input value given to make sure it matches up with the field + // type. + switch pred.Field { + case common.SPAN_ID: + // Span IDs are sent as hex strings. + var id common.SpanId + if err := id.FromString(pred.Val); err != nil { + return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s", + pred.Val, err.Error())) + } + p.intKey = id.Val() + break + case common.DESCRIPTION: + // Any string is valid for a description. + p.strKey = pred.Val + break + case common.BEGIN_TIME, common.END_TIME, common.DURATION: + // Base-10 numeric fields. + v, err := strconv.ParseInt(pred.Val, 10, 64) + if err != nil { + return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s", + pred.Field, pred.Val, err.Error())) + } + p.intKey = v + break + default: + return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field)) + } + + // Validate the predicate operation. + switch pred.Op { + case common.EQUALS, common.LESS_THAN_OR_EQUALS, common.GREATER_THAN_OR_EQUALS: + break + case common.CONTAINS: + if p.fieldIsNumeric() { + return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+ + "numeric field like '%s'", pred.Field)) + } + default: + return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'", + pred.Op)) + } + + return &p, nil +} + +// Get the index prefix for this predicate, or 0 if it is not indexed. +func (pred *predicateData) getIndexPrefix() byte { + switch pred.Field { + case common.SPAN_ID: + return SPAN_ID_INDEX_PREFIX + case common.BEGIN_TIME: + return BEGIN_TIME_INDEX_PREFIX + case common.END_TIME: + return END_TIME_INDEX_PREFIX + case common.DURATION: + return DURATION_INDEX_PREFIX + default: + return INVALID_INDEX_PREFIX + } +} + +// Returns true if the predicate type is numeric. +func (pred *predicateData) fieldIsNumeric() bool { + switch pred.Field { + case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION: + return true + default: + return false + } +} + +// Get the values that this predicate cares about for a given span. +func (pred *predicateData) extractRelevantSpanData(span *common.Span) (int64, string) { + switch pred.Field { + case common.SPAN_ID: + return span.Id.Val(), "" + case common.DESCRIPTION: + return 0, span.Description + case common.BEGIN_TIME: + return span.Begin, "" + case common.END_TIME: + return span.End, "" + case common.DURATION: + return span.Duration(), "" + default: + panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field)) + } +} + +func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool { + // nil is after everything. + if a == nil { + if b == nil { + return false + } + return false + } else if b == nil { + return true + } + // Compare the spans according to this predicate. + aInt, aStr := pred.extractRelevantSpanData(a) + bInt, bStr := pred.extractRelevantSpanData(b) + if pred.fieldIsNumeric() { + if pred.Op.IsDescending() { + return aInt > bInt + } else { + return aInt < bInt + } + } else { + if pred.Op.IsDescending() { + return aStr > bStr + } else { + return aStr < bStr + } + } +} + +// Returns true if the predicate is satisfied by the given span. +func (pred *predicateData) satisfiedBy(span *common.Span) bool { + intVal, strVal := pred.extractRelevantSpanData(span) + if pred.fieldIsNumeric() { + switch pred.Op { + case common.EQUALS: + return intVal == pred.intKey + case common.LESS_THAN_OR_EQUALS: + return intVal <= pred.intKey + case common.GREATER_THAN_OR_EQUALS: + return intVal >= pred.intKey + default: + panic(fmt.Sprintf("unknown Op type %s should have been caught "+ + "during normalization", pred.Op)) + } + } else { + switch pred.Op { + case common.CONTAINS: + return strings.Contains(strVal, pred.strKey) + case common.EQUALS: + return strVal == pred.strKey + case common.LESS_THAN_OR_EQUALS: + return strVal <= pred.strKey + case common.GREATER_THAN_OR_EQUALS: + return strVal >= pred.strKey + default: + panic(fmt.Sprintf("unknown Op type %s should have been caught "+ + "during normalization", pred.Op)) + } + } +} + +func (pred *predicateData) createSource(store *dataStore) (*source, error) { + var ret *source + src := source{store: store, + pred: pred, + iters: make([]*levigo.Iterator, 0, len(store.shards)), + nexts: make([]*common.Span, len(store.shards)), + numRead: make([]int, len(store.shards)), + keyPrefix: pred.getIndexPrefix(), + } + if src.keyPrefix == INVALID_INDEX_PREFIX { + return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+ + "predicate on field %s", pred.Field)) + } + defer func() { + if ret == nil { + src.Close() + } + }() + for shardIdx := range store.shards { + shd := store.shards[shardIdx] + src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts)) + } + searchKey := makeKey(src.keyPrefix, pred.intKey) + for i := range src.iters { + src.iters[i].Seek(searchKey) + } + ret = &src + return ret, nil +} + +// A source of spans. +type source struct { + store *dataStore + pred *predicateData + iters []*levigo.Iterator + nexts []*common.Span + numRead []int + keyPrefix byte +} + +// Fill in the entry in the 'next' array for a specific shard. +func (src *source) populateNextFromShard(shardIdx int) { + lg := src.store.lg + var err error + iter := src.iters[shardIdx] + if iter == nil { + lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx) + return // There are no more entries in this shard. + } + if src.nexts[shardIdx] != nil { + lg.Debugf("No need to populate shard %d\n", shardIdx) + return // We already have a valid entry for this shard. + } + for { + if !iter.Valid() { + lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx) + break // Can't read past end of DB + } + src.numRead[shardIdx]++ + key := iter.Key() + if !bytes.HasPrefix(key, []byte{src.keyPrefix}) { + lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s", + shardIdx, string(src.keyPrefix)) + break // Can't read past end of indexed section + } + var span *common.Span + var sid int64 + if src.keyPrefix == SPAN_ID_INDEX_PREFIX { + // The span id maps to the span itself. + sid = keyToInt(key[1:]) + span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value()) + if err != nil { + lg.Debugf("Internal error decoding span %016x in shard %d: %s\n", + sid, shardIdx, err.Error()) + break + } + } else { + // With a secondary index, we have to look up the span by id. + sid = keyToInt(key[9:]) + span = src.store.shards[shardIdx].FindSpan(sid) + if span == nil { + lg.Debugf("Internal error rehydrating span %016x in shard %d\n", + sid, shardIdx) + break + } + } + if src.pred.Op.IsDescending() { + iter.Prev() + } else { + iter.Next() + } + if src.pred.satisfiedBy(span) { + lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx) + src.nexts[shardIdx] = span // Found valid entry + return + } else { + lg.Debugf("Span %016x from shard %d does not satisfy the predicate.\n", + sid, shardIdx) + if src.numRead[shardIdx] <= 1 && src.pred.Op.IsDescending() { + // When dealing with descending predicates, the first span we read might not satisfy + // the predicate, even though subsequent ones will. This is because the iter.Seek() + // function "moves the iterator the position of the key given or, if the key doesn't + // exist, the next key that does exist in the database." So if we're on that "next + // key" it will not satisfy the predicate, but the keys previous to it might. + continue + } + // This and subsequent entries don't satisfy predicate + break + } + } + lg.Debugf("Closing iterator for shard %d.\n", shardIdx) + iter.Close() + src.iters[shardIdx] = nil +} + +func (src *source) next() *common.Span { + for shardIdx := range src.iters { + src.populateNextFromShard(shardIdx) + } + var best *common.Span + bestIdx := -1 + for shardIdx := range src.iters { + span := src.nexts[shardIdx] + if src.pred.spanPtrIsBefore(span, best) { + best = span + bestIdx = shardIdx + } + } + if bestIdx >= 0 { + src.nexts[bestIdx] = nil + } + return best +} + +func (src *source) Close() { + for i := range src.iters { + if src.iters[i] != nil { + src.iters[i].Close() + } + } + src.iters = nil +} + +func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) { + // Read spans from the first predicate that is indexed. + p := *preds + for i := range p { + pred := p[i] + if pred.getIndexPrefix() != INVALID_INDEX_PREFIX { + *preds = append(p[0:i], p[i+1:]...) + return pred.createSource(store) + } + } + // If there are no predicates that are indexed, read rows in order of span id. + spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: "0000000000000000", + } + spanIdPredData, err := loadPredicateData(&spanIdPred) + if err != nil { + return nil, err + } + return spanIdPredData.createSource(store) +} + +func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) { + lg := store.lg + // Parse predicate data. + var err error + preds := make([]*predicateData, len(query.Predicates)) + for i := range query.Predicates { + preds[i], err = loadPredicateData(&query.Predicates[i]) + if err != nil { + return nil, err + } + } + // Get a source of rows. + var src *source + src, err = store.obtainSource(&preds) + if err != nil { + return nil, err + } + defer src.Close() + lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src) + + // Filter the spans through the remaining predicates. + ret := make([]*common.Span, 0, 32) + for { + if len(ret) >= query.Lim { + break // we hit the result size limit + } + span := src.next() + if span == nil { + break // the source has no more spans to give + } + lg.Debugf("src.next returned span %s\n", span.ToJson()) + satisfied := true + for predIdx := range preds { + if !preds[predIdx].satisfiedBy(span) { + satisfied = false + break + } + } + if satisfied { + ret = append(ret, span) + } + } + return ret, nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go index f0449fe..3330723 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go @@ -20,6 +20,8 @@ package main import ( + "bytes" + "encoding/json" "math/rand" "org/apache/htrace/common" "org/apache/htrace/test" @@ -116,6 +118,159 @@ func TestDatastoreWriteAndRead(t *testing.T) { } } +func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query, + expectedSpans []common.Span) { + spans, err := ht.Store.HandleQuery(query) + if err != nil { + t.Fatalf("First query failed: %s\n", err.Error()) + } + expectedBuf := new(bytes.Buffer) + dec := json.NewEncoder(expectedBuf) + err = dec.Encode(expectedSpans) + if err != nil { + t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error()) + } + spansBuf := new(bytes.Buffer) + dec = json.NewEncoder(spansBuf) + err = dec.Encode(spans) + if err != nil { + t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error()) + } + t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans), + len(expectedSpans)) + common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes())) +} + +// Test queries on the datastore. +func TestSimpleQuery(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) +} + +func TestQueries2(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries2", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + common.Predicate{ + Op: common.EQUALS, + Field: common.DESCRIPTION, + Val: "getFileDescriptors", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[0]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.EQUALS, + Field: common.DESCRIPTION, + Val: "getFileDescriptors", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[0]}) +} + +func TestQueries3(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries3", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.CONTAINS, + Field: common.DESCRIPTION, + Val: "Fd", + }, + common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "100", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: "0", + }, + }, + Lim: 200, + }, []common.Span{}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: "2", + }, + }, + Lim: 200, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) +} + func BenchmarkDatastoreWrites(b *testing.B) { htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites", WrittenSpans: make(chan *common.Span, b.N)} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go index efc89e1..39e5744 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go @@ -141,7 +141,9 @@ func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ children := hand.store.FindChildren(sid, lim) jbytes, err := json.Marshal(children) if err != nil { - panic(err) + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("Error marshalling children: %s", err.Error())) + return } w.Write(jbytes) } @@ -173,6 +175,42 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques } } +type queryHandler struct { + dataStoreHandler +} + +func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + setResponseHeaders(w.Header()) + _, ok := hand.getReqField32("lim", w, req) + if !ok { + return + } + var query common.Query + dec := json.NewDecoder(req.Body) + err := dec.Decode(&query) + if err != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Error parsing query: %s", err.Error())) + return + } + var results []*common.Span + results, err = hand.store.HandleQuery(&query) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("Internal error processing query %s: %s", + query.String(), err.Error())) + return + } + var jbytes []byte + jbytes, err = json.Marshal(results) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("Error marshalling results: %s", err.Error())) + return + } + w.Write(jbytes) +} + type defaultServeHandler struct { lg *common.Logger } @@ -225,6 +263,9 @@ func CreateRestServer(cnf *conf.Config, store *dataStore) (*RestServer, error) { store: store, lg: rsv.lg}} r.Handle("/writeSpans", writeSpansH).Methods("POST") + queryH := &queryHandler{dataStoreHandler: dataStoreHandler{store: store}} + r.Handle("/query", queryH).Methods("GET") + span := r.PathPrefix("/span").Subrouter() findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: store, lg: rsv.lg}} span.Handle("/{id}", findSidH).Methods("GET")
