Related to the thread here 
https://groups.google.com/forum/#!topic/influxdb/lDRWWoMxaSY I'm trying to 
write a UDF (in Go) that processes each point written to log parser into 
multiple points on a separate measurement to track query parameter usage

I have a UDF that looks like this which...


```
/**
 * Takes in a stream from logparser and tracks usage of query parameters in the
 * URL
 *
 * query_param,key=key value=value
 */
package main

import (
        "errors"
        "log"
        "os"
        "strings"

        "github.com/influxdata/kapacitor/udf"
        "github.com/influxdata/kapacitor/udf/agent"
)

type queryParserHandler struct {
        agent *agent.Agent
        begin *udf.BeginBatch
        end   *udf.EndBatch
}

func newQueryParserHandler(a *agent.Agent) *queryParserHandler {
        return &queryParserHandler{agent: a}
}

func (*queryParserHandler) Info() (*udf.InfoResponse, error) {
        log.Println("+++++ Info was called +++++")
        info := &udf.InfoResponse{
                Wants:    udf.EdgeType_STREAM,
                Provides: udf.EdgeType_BATCH,
                Options:  map[string]*udf.OptionInfo{},
        }
        return info, nil
}

func (q *queryParserHandler) Init(r *udf.InitRequest) (*udf.InitResponse, 
error) {
        log.Println("+++++ Init was called +++++")
        init := &udf.InitResponse{
                Success: true,
                Error:   "",
        }
        return init, nil
}

func (*queryParserHandler) Snaphost() (*udf.SnapshotResponse, error) {
        return &udf.SnapshotResponse{}, nil
}

func (*queryParserHandler) Restore(req *udf.RestoreRequest) 
(*udf.RestoreResponse, error) {
        return &udf.RestoreResponse{
                Success: true,
        }, nil
}

func (*queryParserHandler) BeginBatch(begin *udf.BeginBatch) error {
        log.Println("+++++ BeginBatch was called +++++")
        return errors.New("batching not supported")
}

func (q *queryParserHandler) Point(p *udf.Point) error {
        log.Println("+++++ Point was called +++++")
        value := p.FieldsString["request"]
        pos := strings.Index(value, "?")
        if pos == -1 {
                // No query string? See ya!
                return nil
        }

        // Reset the point so we can reuse it
        p.Database = ""
        p.RetentionPolicy = ""
        p.Group = ""
        p.Dimensions = nil
        p.FieldsDouble = nil
        p.FieldsInt = nil
        p.Tags = map[string]string{}
        p.FieldsString = map[string]string{}

        query := value[pos+1 : len(value)]
        params := strings.Split(query, "&")

        q.agent.Responses <- &udf.Response{
                Message: &udf.Response_Begin{
                        Begin: q.begin,
                },
        }

        for i := 0; i < len(params); i++ {
                parts := strings.Split(params[i], "=")
                p.Tags["key"] = parts[0]
                p.FieldsString["value"] = parts[1]

                log.Println(parts[0])

                q.agent.Responses <- &udf.Response{
                        Message: &udf.Response_Point{
                                Point: p,
                        },
                }
        }

        q.agent.Responses <- &udf.Response{
                Message: &udf.Response_End{
                        End: q.end,
                },
        }

        close(q.agent.Responses)

        return nil
}

func (*queryParserHandler) EndBatch(end *udf.EndBatch) error {
        return errors.New("batching not supported")
}

func (q *queryParserHandler) Stop() {
        close(q.agent.Responses)
}

func main() {
        a := agent.New(os.Stdin, os.Stdout)
        h := newQueryParserHandler(a)
        a.Handler = h

        log.Println("Starting agent")
        a.Start()
        err := a.Wait()
        if err != nil {
                log.Fatal(err)
        }
}
```

My TICKScript looks like this

```
stream
    |from()
        .measurement('logparser_grok')
    @queryParser()
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('default')
        .measurement('query_param')

```

The points don't get recorded and I wind up with this error message

[task:query_parser] 2016/07/20 21:26:34 E! failed to snapshot task query_parser 
keepalive timedout, last keepalive received was: 2016-07-20 21:04:57.81406868 
-0400 EDT

I've been able to get Wants - Stream / Provide - Stream to at least write data, 
but it only writes the final point, which makes me think my UDF should provide 
a batch

```
/**
 * Takes in a stream from logparser and tracks usage of query parameters in the
 * URL
 *
 * query_param,key=key value=value
 */
package main

import (
        "errors"
        "log"
        "os"
        "strings"

        "github.com/influxdata/kapacitor/udf"
        "github.com/influxdata/kapacitor/udf/agent"
)

type queryParserHandler struct {
        agent *agent.Agent
}

func newQueryParserHandler(a *agent.Agent) *queryParserHandler {
        return &queryParserHandler{agent: a}
}

func (*queryParserHandler) Info() (*udf.InfoResponse, error) {
        log.Println("+++++ Info was called +++++")
        info := &udf.InfoResponse{
                Wants:    udf.EdgeType_STREAM,
                Provides: udf.EdgeType_STREAM,
                Options:  map[string]*udf.OptionInfo{},
        }
        return info, nil
}

func (q *queryParserHandler) Init(r *udf.InitRequest) (*udf.InitResponse, 
error) {
        log.Println("+++++ Init was called +++++")
        init := &udf.InitResponse{
                Success: true,
                Error:   "",
        }
        return init, nil
}

func (*queryParserHandler) Snaphost() (*udf.SnapshotResponse, error) {
        return &udf.SnapshotResponse{}, nil
}

func (*queryParserHandler) Restore(req *udf.RestoreRequest) 
(*udf.RestoreResponse, error) {
        return &udf.RestoreResponse{
                Success: true,
        }, nil
}

func (*queryParserHandler) BeginBatch(begin *udf.BeginBatch) error {
        log.Println("+++++ BeginBatch was called +++++")
        return errors.New("batching not supported")
}

func (q *queryParserHandler) Point(p *udf.Point) error {
        log.Println("+++++ Point was called +++++")
        value := p.FieldsString["request"]
        pos := strings.Index(value, "?")
        if pos == -1 {
                // No query string? See ya!
                return nil
        }

        // Reset the point so we can reuse it
        p.Database = ""
        p.RetentionPolicy = ""
        p.Group = ""
        p.Dimensions = nil
        p.FieldsDouble = nil
        p.FieldsInt = nil
        p.Tags = map[string]string{}
        p.FieldsString = map[string]string{}

        query := value[pos+1 : len(value)]
        params := strings.Split(query, "&")

        for i := 0; i < len(params); i++ {
                parts := strings.Split(params[i], "=")
                p.Tags["key"] = parts[0]
                p.FieldsString["value"] = parts[1]

                log.Println(parts[0])

                q.agent.Responses <- &udf.Response{
                        Message: &udf.Response_Point{
                                Point: p,
                        },
                }
        }

        return nil
}

func (*queryParserHandler) EndBatch(end *udf.EndBatch) error {
        return errors.New("batching not supported")
}

func (q *queryParserHandler) Stop() {
        close(q.agent.Responses)
}

func main() {
        a := agent.New(os.Stdin, os.Stdout)
        h := newQueryParserHandler(a)
        a.Handler = h

        log.Println("Starting agent")
        a.Start()
        err := a.Wait()
        if err != nil {
                log.Fatal(err)
        }
}
```

I've spend a good amount of time looking through the examples, Kapacitor source 
code, GitHub, Google but not sure what I'm doing wrong. Any help would be much 
appreciated.

-- 
Remember to include the InfluxDB version number with all issue reports
--- 
You received this message because you are subscribed to the Google Groups 
"InfluxDB" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/influxdb.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/influxdb/33a13ec9-9a2b-4001-902d-cca588a12d57%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to