Repository: incubator-htrace
Updated Branches:
  refs/heads/master 0ca3f8f23 -> af2c084ed


HTRACE-88. Add REST query API to htraced (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/af2c084e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/af2c084e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/af2c084e

Branch: refs/heads/master
Commit: af2c084ed8fe5af6a4cee350d80f154f9e328aa5
Parents: 0ca3f8f
Author: Colin P. Mccabe <[email protected]>
Authored: Wed Jan 14 19:41:10 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Tue Jan 27 16:51:28 2015 -0800

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/client/client.go   |  19 +
 .../go/src/org/apache/htrace/common/query.go    |  87 ++++
 .../src/go/src/org/apache/htrace/common/span.go |  11 +-
 .../src/org/apache/htrace/htraced/datastore.go  | 417 ++++++++++++++++++-
 .../org/apache/htrace/htraced/datastore_test.go | 155 +++++++
 .../go/src/org/apache/htrace/htraced/rest.go    |  43 +-
 6 files changed, 718 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go 
b/htrace-core/src/go/src/org/apache/htrace/client/client.go
index fbcdcc6..52fe78e 100644
--- a/htrace-core/src/go/src/org/apache/htrace/client/client.go
+++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go
@@ -106,6 +106,25 @@ func (hcl *Client) FindChildren(sid common.SpanId, lim 
int) ([]common.SpanId, er
        return spanIds, nil
 }
 
+// Make a query
+func (hcl *Client) Query(query *common.Query) ([]common.Span, error) {
+       in, err := json.Marshal(query)
+       if err != nil {
+               return nil, errors.New(fmt.Sprintf("Error marshalling query: 
%s", err.Error()))
+       }
+       var out []byte
+       out, _, err = hcl.makeRestRequest("GET", "query", bytes.NewReader(in))
+       if err != nil {
+               return nil, err
+       }
+       var spans []common.Span
+       err = json.Unmarshal(out, &spans)
+       if err != nil {
+               return nil, errors.New(fmt.Sprintf("Error unmarshalling 
results: %s", err.Error()))
+       }
+       return spans, nil
+}
+
 func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
        return hcl.makeRestRequest("GET", reqName, nil)
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/common/query.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/query.go 
b/htrace-core/src/go/src/org/apache/htrace/common/query.go
new file mode 100644
index 0000000..0c909a1
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/common/query.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+       "encoding/json"
+)
+
+//
+// Represents queries that can be sent to htraced.
+//
+// Each query consists of set of predicates that will be 'AND'ed together to
+// return a set of spans.  Predicates contain an operation, a field, and a
+// value.
+//
+// For example, a query might be "return the first 100 spans between 5:00pm
+// and 5:01pm"  This query would have two predicates: time greater than or
+// equal to 5:00pm, and time less than or equal to 5:01pm.
+// In HTrace, times are always expressed in milliseconds since the Epoch.
+// So this would become:
+// { "lim" : 100, "pred" : [
+//   { "op" : "ge", "field" : "begin", "val" : 1234 },
+//   { "op" : "le", "field" : "begin", "val" : 5678 },
+// ] }
+//
+// Where '1234' and '5678' were replaced by times since the epoch in
+// milliseconds.
+//
+
+type Op string
+
+const (
+       CONTAINS               Op = "cn"
+       EQUALS                 Op = "eq"
+       LESS_THAN_OR_EQUALS    Op = "le"
+       GREATER_THAN_OR_EQUALS Op = "ge"
+)
+
+func (op Op) IsDescending() bool {
+       return op == LESS_THAN_OR_EQUALS
+}
+
+type Field string
+
+const (
+       SPAN_ID     Field = "spanid"
+       DESCRIPTION Field = "description"
+       BEGIN_TIME  Field = "begin"
+       END_TIME    Field = "end"
+       DURATION    Field = "duration"
+)
+
+type Predicate struct {
+       Op    Op     `json:"op"`
+       Field Field  `json:"field"`
+       Val   string `val:"val"`
+}
+
+type Query struct {
+       Predicates []Predicate `json:"pred"`
+       Lim        int         `json:"lim"`
+}
+
+func (query *Query) String() string {
+       buf, err := json.Marshal(query)
+       if err != nil {
+               panic(err)
+       }
+       return string(buf)
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/common/span.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span.go 
b/htrace-core/src/go/src/org/apache/htrace/common/span.go
index 36e716a..64975d2 100644
--- a/htrace-core/src/go/src/org/apache/htrace/common/span.go
+++ b/htrace-core/src/go/src/org/apache/htrace/common/span.go
@@ -80,7 +80,11 @@ func (id *SpanId) UnmarshalJSON(b []byte) error {
        if b[len(b)-1] != DOUBLE_QUOTE {
                return errors.New("Expected spanID to end with a string quote.")
        }
-       v, err := strconv.ParseUint(string(b[1:len(b)-1]), 16, 64)
+       return id.FromString(string(b[1 : len(b)-1]))
+}
+
+func (id *SpanId) FromString(str string) error {
+       v, err := strconv.ParseUint(str, 16, 64)
        if err != nil {
                return err
        }
@@ -111,3 +115,8 @@ func (span *Span) ToJson() []byte {
        }
        return jbytes
 }
+
+// Compute the span duration.  We ignore overflow since we never deal with 
negative times.
+func (span *Span) Duration() int64 {
+       return span.End - span.Begin
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
index 40678bd..523b7ab 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
@@ -22,10 +22,13 @@ package main
 import (
        "bytes"
        "encoding/gob"
+       "errors"
+       "fmt"
        "github.com/jmhodges/levigo"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "os"
+       "strconv"
        "strings"
        "sync/atomic"
        "syscall"
@@ -49,14 +52,23 @@ import (
 // Schema
 // m -> dataStoreMetadata
 // s[8-byte-big-endian-sid] -> SpanData
+// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
+// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
+// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
 // p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
-// t[8-byte-big-endian-time][8-byte-big-endian-child-sid] -> {}
 //
 
 const DATA_STORE_VERSION = 1
 
 var EMPTY_BYTE_BUF []byte = []byte{}
 
+const SPAN_ID_INDEX_PREFIX = 's'
+const BEGIN_TIME_INDEX_PREFIX = 'b'
+const END_TIME_INDEX_PREFIX = 'e'
+const DURATION_INDEX_PREFIX = 'd'
+const PARENT_ID_INDEX_PREFIX = 'p'
+const INVALID_INDEX_PREFIX = 0
+
 type Statistics struct {
        NumSpansWritten uint64
 }
@@ -190,15 +202,21 @@ func (shd *shard) writeSpan(span *common.Span) error {
        if err != nil {
                return err
        }
-       batch.Put(makeKey('s', span.Id.Val()), spanDataBuf.Bytes())
+       batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), 
spanDataBuf.Bytes())
 
        // Add this to the parent index.
        for parentIdx := range span.Parents {
-               batch.Put(makeSecondaryKey('p', span.Parents[parentIdx].Val(), 
span.Id.Val()), EMPTY_BYTE_BUF)
+               batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX,
+                       span.Parents[parentIdx].Val(), span.Id.Val()), 
EMPTY_BYTE_BUF)
        }
 
-       // Add this to the timeline index.
-       batch.Put(makeSecondaryKey('t', span.Begin, span.Id.Val()), 
EMPTY_BYTE_BUF)
+       // Add to the other secondary indices.
+       batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, span.Begin,
+               span.Id.Val()), EMPTY_BYTE_BUF)
+       batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, span.End,
+               span.Id.Val()), EMPTY_BYTE_BUF)
+       batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, span.Duration(),
+               span.Id.Val()), EMPTY_BYTE_BUF)
 
        err = shd.ldb.Write(shd.store.writeOpts, batch)
        if err != nil {
@@ -407,21 +425,30 @@ func (shd *shard) FindSpan(sid int64) *common.Span {
                        shd.path, sid, err.Error())
                return nil
        }
-       r := bytes.NewBuffer(buf)
-       decoder := gob.NewDecoder(r)
-       data := common.SpanData{}
-       err = decoder.Decode(&data)
+       var span *common.Span
+       span, err = shd.decodeSpan(sid, buf)
        if err != nil {
                lg.Errorf("Shard(%s): FindSpan(%016x) decode error: %s\n",
                        shd.path, sid, err.Error())
                return nil
        }
+       return span
+}
+
+func (shd *shard) decodeSpan(sid int64, buf []byte) (*common.Span, error) {
+       r := bytes.NewBuffer(buf)
+       decoder := gob.NewDecoder(r)
+       data := common.SpanData{}
+       err := decoder.Decode(&data)
+       if err != nil {
+               return nil, err
+       }
        // Gob encoding translates empty slices to nil.  Reverse this so that 
we're always dealing with
        // non-nil slices.
        if data.Parents == nil {
                data.Parents = []common.SpanId{}
        }
-       return &common.Span{Id: common.SpanId(sid), SpanData: data}
+       return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
 }
 
 // Find the children of a given span id.
@@ -453,5 +480,371 @@ func (store *dataStore) FindChildren(sid int64, lim 
int32) []common.SpanId {
        return childIds
 }
 
-//func (store *dataStore) FindByTimeRange(startTime int64, endTime int64, lim 
int32) []int64 {
-//}
+type predicateData struct {
+       *common.Predicate
+       intKey int64
+       strKey string
+}
+
+func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
+       p := predicateData{Predicate: pred}
+
+       // Parse the input value given to make sure it matches up with the field
+       // type.
+       switch pred.Field {
+       case common.SPAN_ID:
+               // Span IDs are sent as hex strings.
+               var id common.SpanId
+               if err := id.FromString(pred.Val); err != nil {
+                       return nil, errors.New(fmt.Sprintf("Unable to parse 
span id '%s': %s",
+                               pred.Val, err.Error()))
+               }
+               p.intKey = id.Val()
+               break
+       case common.DESCRIPTION:
+               // Any string is valid for a description.
+               p.strKey = pred.Val
+               break
+       case common.BEGIN_TIME, common.END_TIME, common.DURATION:
+               // Base-10 numeric fields.
+               v, err := strconv.ParseInt(pred.Val, 10, 64)
+               if err != nil {
+                       return nil, errors.New(fmt.Sprintf("Unable to parse %s 
'%s': %s",
+                               pred.Field, pred.Val, err.Error()))
+               }
+               p.intKey = v
+               break
+       default:
+               return nil, errors.New(fmt.Sprintf("Unknown field %s", 
pred.Field))
+       }
+
+       // Validate the predicate operation.
+       switch pred.Op {
+       case common.EQUALS, common.LESS_THAN_OR_EQUALS, 
common.GREATER_THAN_OR_EQUALS:
+               break
+       case common.CONTAINS:
+               if p.fieldIsNumeric() {
+                       return nil, errors.New(fmt.Sprintf("Can't use CONTAINS 
on a "+
+                               "numeric field like '%s'", pred.Field))
+               }
+       default:
+               return nil, errors.New(fmt.Sprintf("Unknown predicate operation 
'%s'",
+                       pred.Op))
+       }
+
+       return &p, nil
+}
+
+// Get the index prefix for this predicate, or 0 if it is not indexed.
+func (pred *predicateData) getIndexPrefix() byte {
+       switch pred.Field {
+       case common.SPAN_ID:
+               return SPAN_ID_INDEX_PREFIX
+       case common.BEGIN_TIME:
+               return BEGIN_TIME_INDEX_PREFIX
+       case common.END_TIME:
+               return END_TIME_INDEX_PREFIX
+       case common.DURATION:
+               return DURATION_INDEX_PREFIX
+       default:
+               return INVALID_INDEX_PREFIX
+       }
+}
+
+// Returns true if the predicate type is numeric.
+func (pred *predicateData) fieldIsNumeric() bool {
+       switch pred.Field {
+       case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, 
common.DURATION:
+               return true
+       default:
+               return false
+       }
+}
+
+// Get the values that this predicate cares about for a given span.
+func (pred *predicateData) extractRelevantSpanData(span *common.Span) (int64, 
string) {
+       switch pred.Field {
+       case common.SPAN_ID:
+               return span.Id.Val(), ""
+       case common.DESCRIPTION:
+               return 0, span.Description
+       case common.BEGIN_TIME:
+               return span.Begin, ""
+       case common.END_TIME:
+               return span.End, ""
+       case common.DURATION:
+               return span.Duration(), ""
+       default:
+               panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", 
pred.Field))
+       }
+}
+
+func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) 
bool {
+       // nil is after everything.
+       if a == nil {
+               if b == nil {
+                       return false
+               }
+               return false
+       } else if b == nil {
+               return true
+       }
+       // Compare the spans according to this predicate.
+       aInt, aStr := pred.extractRelevantSpanData(a)
+       bInt, bStr := pred.extractRelevantSpanData(b)
+       if pred.fieldIsNumeric() {
+               if pred.Op.IsDescending() {
+                       return aInt > bInt
+               } else {
+                       return aInt < bInt
+               }
+       } else {
+               if pred.Op.IsDescending() {
+                       return aStr > bStr
+               } else {
+                       return aStr < bStr
+               }
+       }
+}
+
+// Returns true if the predicate is satisfied by the given span.
+func (pred *predicateData) satisfiedBy(span *common.Span) bool {
+       intVal, strVal := pred.extractRelevantSpanData(span)
+       if pred.fieldIsNumeric() {
+               switch pred.Op {
+               case common.EQUALS:
+                       return intVal == pred.intKey
+               case common.LESS_THAN_OR_EQUALS:
+                       return intVal <= pred.intKey
+               case common.GREATER_THAN_OR_EQUALS:
+                       return intVal >= pred.intKey
+               default:
+                       panic(fmt.Sprintf("unknown Op type %s should have been 
caught "+
+                               "during normalization", pred.Op))
+               }
+       } else {
+               switch pred.Op {
+               case common.CONTAINS:
+                       return strings.Contains(strVal, pred.strKey)
+               case common.EQUALS:
+                       return strVal == pred.strKey
+               case common.LESS_THAN_OR_EQUALS:
+                       return strVal <= pred.strKey
+               case common.GREATER_THAN_OR_EQUALS:
+                       return strVal >= pred.strKey
+               default:
+                       panic(fmt.Sprintf("unknown Op type %s should have been 
caught "+
+                               "during normalization", pred.Op))
+               }
+       }
+}
+
+func (pred *predicateData) createSource(store *dataStore) (*source, error) {
+       var ret *source
+       src := source{store: store,
+               pred:      pred,
+               iters:     make([]*levigo.Iterator, 0, len(store.shards)),
+               nexts:     make([]*common.Span, len(store.shards)),
+               numRead:   make([]int, len(store.shards)),
+               keyPrefix: pred.getIndexPrefix(),
+       }
+       if src.keyPrefix == INVALID_INDEX_PREFIX {
+               return nil, errors.New(fmt.Sprintf("Can't create source from 
unindexed "+
+                       "predicate on field %s", pred.Field))
+       }
+       defer func() {
+               if ret == nil {
+                       src.Close()
+               }
+       }()
+       for shardIdx := range store.shards {
+               shd := store.shards[shardIdx]
+               src.iters = append(src.iters, 
shd.ldb.NewIterator(store.readOpts))
+       }
+       searchKey := makeKey(src.keyPrefix, pred.intKey)
+       for i := range src.iters {
+               src.iters[i].Seek(searchKey)
+       }
+       ret = &src
+       return ret, nil
+}
+
+// A source of spans.
+type source struct {
+       store     *dataStore
+       pred      *predicateData
+       iters     []*levigo.Iterator
+       nexts     []*common.Span
+       numRead   []int
+       keyPrefix byte
+}
+
+// Fill in the entry in the 'next' array for a specific shard.
+func (src *source) populateNextFromShard(shardIdx int) {
+       lg := src.store.lg
+       var err error
+       iter := src.iters[shardIdx]
+       if iter == nil {
+               lg.Debugf("Can't populate: No more entries in shard %d\n", 
shardIdx)
+               return // There are no more entries in this shard.
+       }
+       if src.nexts[shardIdx] != nil {
+               lg.Debugf("No need to populate shard %d\n", shardIdx)
+               return // We already have a valid entry for this shard.
+       }
+       for {
+               if !iter.Valid() {
+                       lg.Debugf("Can't populate: Iterator for shard %d is no 
longer valid.\n", shardIdx)
+                       break // Can't read past end of DB
+               }
+               src.numRead[shardIdx]++
+               key := iter.Key()
+               if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
+                       lg.Debugf("Can't populate: Iterator for shard %d does 
not have prefix %s",
+                               shardIdx, string(src.keyPrefix))
+                       break // Can't read past end of indexed section
+               }
+               var span *common.Span
+               var sid int64
+               if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
+                       // The span id maps to the span itself.
+                       sid = keyToInt(key[1:])
+                       span, err = src.store.shards[shardIdx].decodeSpan(sid, 
iter.Value())
+                       if err != nil {
+                               lg.Debugf("Internal error decoding span %016x 
in shard %d: %s\n",
+                                       sid, shardIdx, err.Error())
+                               break
+                       }
+               } else {
+                       // With a secondary index, we have to look up the span 
by id.
+                       sid = keyToInt(key[9:])
+                       span = src.store.shards[shardIdx].FindSpan(sid)
+                       if span == nil {
+                               lg.Debugf("Internal error rehydrating span 
%016x in shard %d\n",
+                                       sid, shardIdx)
+                               break
+                       }
+               }
+               if src.pred.Op.IsDescending() {
+                       iter.Prev()
+               } else {
+                       iter.Next()
+               }
+               if src.pred.satisfiedBy(span) {
+                       lg.Debugf("Populated valid span %016x from shard 
%d.\n", sid, shardIdx)
+                       src.nexts[shardIdx] = span // Found valid entry
+                       return
+               } else {
+                       lg.Debugf("Span %016x from shard %d does not satisfy 
the predicate.\n",
+                               sid, shardIdx)
+                       if src.numRead[shardIdx] <= 1 && 
src.pred.Op.IsDescending() {
+                               // When dealing with descending predicates, the 
first span we read might not satisfy
+                               // the predicate, even though subsequent ones 
will.  This is because the iter.Seek()
+                               // function "moves the iterator the position of 
the key given or, if the key doesn't
+                               // exist, the next key that does exist in the 
database."  So if we're on that "next
+                               // key" it will not satisfy the predicate, but 
the keys previous to it might.
+                               continue
+                       }
+                       // This and subsequent entries don't satisfy predicate
+                       break
+               }
+       }
+       lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
+       iter.Close()
+       src.iters[shardIdx] = nil
+}
+
+func (src *source) next() *common.Span {
+       for shardIdx := range src.iters {
+               src.populateNextFromShard(shardIdx)
+       }
+       var best *common.Span
+       bestIdx := -1
+       for shardIdx := range src.iters {
+               span := src.nexts[shardIdx]
+               if src.pred.spanPtrIsBefore(span, best) {
+                       best = span
+                       bestIdx = shardIdx
+               }
+       }
+       if bestIdx >= 0 {
+               src.nexts[bestIdx] = nil
+       }
+       return best
+}
+
+func (src *source) Close() {
+       for i := range src.iters {
+               if src.iters[i] != nil {
+                       src.iters[i].Close()
+               }
+       }
+       src.iters = nil
+}
+
+func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) 
{
+       // Read spans from the first predicate that is indexed.
+       p := *preds
+       for i := range p {
+               pred := p[i]
+               if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
+                       *preds = append(p[0:i], p[i+1:]...)
+                       return pred.createSource(store)
+               }
+       }
+       // If there are no predicates that are indexed, read rows in order of 
span id.
+       spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
+               Field: common.SPAN_ID,
+               Val:   "0000000000000000",
+       }
+       spanIdPredData, err := loadPredicateData(&spanIdPred)
+       if err != nil {
+               return nil, err
+       }
+       return spanIdPredData.createSource(store)
+}
+
+func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, 
error) {
+       lg := store.lg
+       // Parse predicate data.
+       var err error
+       preds := make([]*predicateData, len(query.Predicates))
+       for i := range query.Predicates {
+               preds[i], err = loadPredicateData(&query.Predicates[i])
+               if err != nil {
+                       return nil, err
+               }
+       }
+       // Get a source of rows.
+       var src *source
+       src, err = store.obtainSource(&preds)
+       if err != nil {
+               return nil, err
+       }
+       defer src.Close()
+       lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
+
+       // Filter the spans through the remaining predicates.
+       ret := make([]*common.Span, 0, 32)
+       for {
+               if len(ret) >= query.Lim {
+                       break // we hit the result size limit
+               }
+               span := src.next()
+               if span == nil {
+                       break // the source has no more spans to give
+               }
+               lg.Debugf("src.next returned span %s\n", span.ToJson())
+               satisfied := true
+               for predIdx := range preds {
+                       if !preds[predIdx].satisfiedBy(span) {
+                               satisfied = false
+                               break
+                       }
+               }
+               if satisfied {
+                       ret = append(ret, span)
+               }
+       }
+       return ret, nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
index f0449fe..3330723 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -20,6 +20,8 @@
 package main
 
 import (
+       "bytes"
+       "encoding/json"
        "math/rand"
        "org/apache/htrace/common"
        "org/apache/htrace/test"
@@ -116,6 +118,159 @@ func TestDatastoreWriteAndRead(t *testing.T) {
        }
 }
 
+func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
+       expectedSpans []common.Span) {
+       spans, err := ht.Store.HandleQuery(query)
+       if err != nil {
+               t.Fatalf("First query failed: %s\n", err.Error())
+       }
+       expectedBuf := new(bytes.Buffer)
+       dec := json.NewEncoder(expectedBuf)
+       err = dec.Encode(expectedSpans)
+       if err != nil {
+               t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", 
err.Error())
+       }
+       spansBuf := new(bytes.Buffer)
+       dec = json.NewEncoder(spansBuf)
+       err = dec.Encode(spans)
+       if err != nil {
+               t.Fatalf("Failed to encode result spans to JSON: %s\n", 
err.Error())
+       }
+       t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
+               len(expectedSpans))
+       common.ExpectStrEqual(t, string(expectedBuf.Bytes()), 
string(spansBuf.Bytes()))
+}
+
+// Test queries on the datastore.
+func TestSimpleQuery(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.GREATER_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+}
+
+func TestQueries2(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "125",
+                       },
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.DESCRIPTION,
+                               Val:   "getFileDescriptors",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.EQUALS,
+                               Field: common.DESCRIPTION,
+                               Val:   "getFileDescriptors",
+                       },
+               },
+               Lim: 2,
+       }, []common.Span{SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries3(t *testing.T) {
+       t.Parallel()
+       htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+               WrittenSpans: make(chan *common.Span, 100)}
+       ht, err := htraceBld.Build()
+       if err != nil {
+               panic(err)
+       }
+       defer ht.Close()
+       createSpans(SIMPLE_TEST_SPANS, ht.Store)
+       if ht.Store.GetStatistics().NumSpansWritten < 
uint64(len(SIMPLE_TEST_SPANS)) {
+               t.Fatal()
+       }
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.CONTAINS,
+                               Field: common.DESCRIPTION,
+                               Val:   "Fd",
+                       },
+                       common.Predicate{
+                               Op:    common.GREATER_THAN_OR_EQUALS,
+                               Field: common.BEGIN_TIME,
+                               Val:   "100",
+                       },
+               },
+               Lim: 5,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   "0",
+                       },
+               },
+               Lim: 200,
+       }, []common.Span{})
+
+       testQuery(t, ht, &common.Query{
+               Predicates: []common.Predicate{
+                       common.Predicate{
+                               Op:    common.LESS_THAN_OR_EQUALS,
+                               Field: common.SPAN_ID,
+                               Val:   "2",
+                       },
+               },
+               Lim: 200,
+       }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+}
+
 func BenchmarkDatastoreWrites(b *testing.B) {
        htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
                WrittenSpans: make(chan *common.Span, b.N)}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
index efc89e1..39e5744 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
@@ -141,7 +141,9 @@ func (hand *findChildrenHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Requ
        children := hand.store.FindChildren(sid, lim)
        jbytes, err := json.Marshal(children)
        if err != nil {
-               panic(err)
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("Error marshalling children: %s", 
err.Error()))
+               return
        }
        w.Write(jbytes)
 }
@@ -173,6 +175,42 @@ func (hand *writeSpansHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Reques
        }
 }
 
+type queryHandler struct {
+       dataStoreHandler
+}
+
+func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+       setResponseHeaders(w.Header())
+       _, ok := hand.getReqField32("lim", w, req)
+       if !ok {
+               return
+       }
+       var query common.Query
+       dec := json.NewDecoder(req.Body)
+       err := dec.Decode(&query)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusBadRequest,
+                       fmt.Sprintf("Error parsing query: %s", err.Error()))
+               return
+       }
+       var results []*common.Span
+       results, err = hand.store.HandleQuery(&query)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("Internal error processing query %s: %s",
+                               query.String(), err.Error()))
+               return
+       }
+       var jbytes []byte
+       jbytes, err = json.Marshal(results)
+       if err != nil {
+               writeError(hand.lg, w, http.StatusInternalServerError,
+                       fmt.Sprintf("Error marshalling results: %s", 
err.Error()))
+               return
+       }
+       w.Write(jbytes)
+}
+
 type defaultServeHandler struct {
        lg *common.Logger
 }
@@ -225,6 +263,9 @@ func CreateRestServer(cnf *conf.Config, store *dataStore) 
(*RestServer, error) {
                store: store, lg: rsv.lg}}
        r.Handle("/writeSpans", writeSpansH).Methods("POST")
 
+       queryH := &queryHandler{dataStoreHandler: dataStoreHandler{store: 
store}}
+       r.Handle("/query", queryH).Methods("GET")
+
        span := r.PathPrefix("/span").Subrouter()
        findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: 
store, lg: rsv.lg}}
        span.Handle("/{id}", findSidH).Methods("GET")

Reply via email to