I figured this out. 

https://github.com/mpchadwick/kapacitor_utils/blob/c0b4f250e3725f30475a05806fe0f1f677631c81/query_parser/query_parser.go

In the end it was me being a golang newbie. Rather than reusing the existing 
point in the loop I needed to instantiate a new point in each iteration.

On Wednesday, July 20, 2016 at 9:30:40 PM UTC-4, [email protected] wrote:
> Related to the thread here 
> https://groups.google.com/forum/#!topic/influxdb/lDRWWoMxaSY I'm trying to 
> write a UDF (in Go) that processes each point written to log parser into 
> multiple points on a separate measurement to track query parameter usage
> 
> I have a UDF that looks like this which...
> 
> 
> ```
> /**
>  * Takes in a stream from logparser and tracks usage of query parameters in 
> the
>  * URL
>  *
>  * query_param,key=key value=value
>  */
> package main
> 
> import (
>       "errors"
>       "log"
>       "os"
>       "strings"
> 
>       "github.com/influxdata/kapacitor/udf"
>       "github.com/influxdata/kapacitor/udf/agent"
> )
> 
> type queryParserHandler struct {
>       agent *agent.Agent
>       begin *udf.BeginBatch
>       end   *udf.EndBatch
> }
> 
> func newQueryParserHandler(a *agent.Agent) *queryParserHandler {
>       return &queryParserHandler{agent: a}
> }
> 
> func (*queryParserHandler) Info() (*udf.InfoResponse, error) {
>       log.Println("+++++ Info was called +++++")
>       info := &udf.InfoResponse{
>               Wants:    udf.EdgeType_STREAM,
>               Provides: udf.EdgeType_BATCH,
>               Options:  map[string]*udf.OptionInfo{},
>       }
>       return info, nil
> }
> 
> func (q *queryParserHandler) Init(r *udf.InitRequest) (*udf.InitResponse, 
> error) {
>       log.Println("+++++ Init was called +++++")
>       init := &udf.InitResponse{
>               Success: true,
>               Error:   "",
>       }
>       return init, nil
> }
> 
> func (*queryParserHandler) Snaphost() (*udf.SnapshotResponse, error) {
>       return &udf.SnapshotResponse{}, nil
> }
> 
> func (*queryParserHandler) Restore(req *udf.RestoreRequest) 
> (*udf.RestoreResponse, error) {
>       return &udf.RestoreResponse{
>               Success: true,
>       }, nil
> }
> 
> func (*queryParserHandler) BeginBatch(begin *udf.BeginBatch) error {
>       log.Println("+++++ BeginBatch was called +++++")
>       return errors.New("batching not supported")
> }
> 
> func (q *queryParserHandler) Point(p *udf.Point) error {
>       log.Println("+++++ Point was called +++++")
>       value := p.FieldsString["request"]
>       pos := strings.Index(value, "?")
>       if pos == -1 {
>               // No query string? See ya!
>               return nil
>       }
> 
>       // Reset the point so we can reuse it
>       p.Database = ""
>       p.RetentionPolicy = ""
>       p.Group = ""
>       p.Dimensions = nil
>       p.FieldsDouble = nil
>       p.FieldsInt = nil
>       p.Tags = map[string]string{}
>       p.FieldsString = map[string]string{}
> 
>       query := value[pos+1 : len(value)]
>       params := strings.Split(query, "&")
> 
>       q.agent.Responses <- &udf.Response{
>               Message: &udf.Response_Begin{
>                       Begin: q.begin,
>               },
>       }
> 
>       for i := 0; i < len(params); i++ {
>               parts := strings.Split(params[i], "=")
>               p.Tags["key"] = parts[0]
>               p.FieldsString["value"] = parts[1]
> 
>               log.Println(parts[0])
> 
>               q.agent.Responses <- &udf.Response{
>                       Message: &udf.Response_Point{
>                               Point: p,
>                       },
>               }
>       }
> 
>       q.agent.Responses <- &udf.Response{
>               Message: &udf.Response_End{
>                       End: q.end,
>               },
>       }
> 
>       close(q.agent.Responses)
> 
>       return nil
> }
> 
> func (*queryParserHandler) EndBatch(end *udf.EndBatch) error {
>       return errors.New("batching not supported")
> }
> 
> func (q *queryParserHandler) Stop() {
>       close(q.agent.Responses)
> }
> 
> func main() {
>       a := agent.New(os.Stdin, os.Stdout)
>       h := newQueryParserHandler(a)
>       a.Handler = h
> 
>       log.Println("Starting agent")
>       a.Start()
>       err := a.Wait()
>       if err != nil {
>               log.Fatal(err)
>       }
> }
> ```
> 
> My TICKScript looks like this
> 
> ```
> stream
>     |from()
>         .measurement('logparser_grok')
>     @queryParser()
>     |influxDBOut()
>         .database('telegraf')
>         .retentionPolicy('default')
>         .measurement('query_param')
> 
> ```
> 
> The points don't get recorded and I wind up with this error message
> 
> [task:query_parser] 2016/07/20 21:26:34 E! failed to snapshot task 
> query_parser keepalive timedout, last keepalive received was: 2016-07-20 
> 21:04:57.81406868 -0400 EDT
> 
> I've been able to get Wants - Stream / Provide - Stream to at least write 
> data, but it only writes the final point, which makes me think my UDF should 
> provide a batch
> 
> ```
> /**
>  * Takes in a stream from logparser and tracks usage of query parameters in 
> the
>  * URL
>  *
>  * query_param,key=key value=value
>  */
> package main
> 
> import (
>       "errors"
>       "log"
>       "os"
>       "strings"
> 
>       "github.com/influxdata/kapacitor/udf"
>       "github.com/influxdata/kapacitor/udf/agent"
> )
> 
> type queryParserHandler struct {
>       agent *agent.Agent
> }
> 
> func newQueryParserHandler(a *agent.Agent) *queryParserHandler {
>       return &queryParserHandler{agent: a}
> }
> 
> func (*queryParserHandler) Info() (*udf.InfoResponse, error) {
>       log.Println("+++++ Info was called +++++")
>       info := &udf.InfoResponse{
>               Wants:    udf.EdgeType_STREAM,
>               Provides: udf.EdgeType_STREAM,
>               Options:  map[string]*udf.OptionInfo{},
>       }
>       return info, nil
> }
> 
> func (q *queryParserHandler) Init(r *udf.InitRequest) (*udf.InitResponse, 
> error) {
>       log.Println("+++++ Init was called +++++")
>       init := &udf.InitResponse{
>               Success: true,
>               Error:   "",
>       }
>       return init, nil
> }
> 
> func (*queryParserHandler) Snaphost() (*udf.SnapshotResponse, error) {
>       return &udf.SnapshotResponse{}, nil
> }
> 
> func (*queryParserHandler) Restore(req *udf.RestoreRequest) 
> (*udf.RestoreResponse, error) {
>       return &udf.RestoreResponse{
>               Success: true,
>       }, nil
> }
> 
> func (*queryParserHandler) BeginBatch(begin *udf.BeginBatch) error {
>       log.Println("+++++ BeginBatch was called +++++")
>       return errors.New("batching not supported")
> }
> 
> func (q *queryParserHandler) Point(p *udf.Point) error {
>       log.Println("+++++ Point was called +++++")
>       value := p.FieldsString["request"]
>       pos := strings.Index(value, "?")
>       if pos == -1 {
>               // No query string? See ya!
>               return nil
>       }
> 
>       // Reset the point so we can reuse it
>       p.Database = ""
>       p.RetentionPolicy = ""
>       p.Group = ""
>       p.Dimensions = nil
>       p.FieldsDouble = nil
>       p.FieldsInt = nil
>       p.Tags = map[string]string{}
>       p.FieldsString = map[string]string{}
> 
>       query := value[pos+1 : len(value)]
>       params := strings.Split(query, "&")
> 
>       for i := 0; i < len(params); i++ {
>               parts := strings.Split(params[i], "=")
>               p.Tags["key"] = parts[0]
>               p.FieldsString["value"] = parts[1]
> 
>               log.Println(parts[0])
> 
>               q.agent.Responses <- &udf.Response{
>                       Message: &udf.Response_Point{
>                               Point: p,
>                       },
>               }
>       }
> 
>       return nil
> }
> 
> func (*queryParserHandler) EndBatch(end *udf.EndBatch) error {
>       return errors.New("batching not supported")
> }
> 
> func (q *queryParserHandler) Stop() {
>       close(q.agent.Responses)
> }
> 
> func main() {
>       a := agent.New(os.Stdin, os.Stdout)
>       h := newQueryParserHandler(a)
>       a.Handler = h
> 
>       log.Println("Starting agent")
>       a.Start()
>       err := a.Wait()
>       if err != nil {
>               log.Fatal(err)
>       }
> }
> ```
> 
> I've spend a good amount of time looking through the examples, Kapacitor 
> source code, GitHub, Google but not sure what I'm doing wrong. Any help would 
> be much appreciated.

-- 
Remember to include the InfluxDB version number with all issue reports
--- 
You received this message because you are subscribed to the Google Groups 
"InfluxDB" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/influxdb.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/influxdb/1ef112c7-edcc-45d9-8080-86612f993296%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to