Related to the thread here
https://groups.google.com/forum/#!topic/influxdb/lDRWWoMxaSY I'm trying to
write a UDF (in Go) that processes each point written to log parser into
multiple points on a separate measurement to track query parameter usage
I have a UDF that looks like this which...
```
/**
* Takes in a stream from logparser and tracks usage of query parameters in the
* URL
*
* query_param,key=key value=value
*/
package main
import (
"errors"
"log"
"os"
"strings"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/udf/agent"
)
type queryParserHandler struct {
agent *agent.Agent
begin *udf.BeginBatch
end *udf.EndBatch
}
func newQueryParserHandler(a *agent.Agent) *queryParserHandler {
return &queryParserHandler{agent: a}
}
func (*queryParserHandler) Info() (*udf.InfoResponse, error) {
log.Println("+++++ Info was called +++++")
info := &udf.InfoResponse{
Wants: udf.EdgeType_STREAM,
Provides: udf.EdgeType_BATCH,
Options: map[string]*udf.OptionInfo{},
}
return info, nil
}
func (q *queryParserHandler) Init(r *udf.InitRequest) (*udf.InitResponse,
error) {
log.Println("+++++ Init was called +++++")
init := &udf.InitResponse{
Success: true,
Error: "",
}
return init, nil
}
func (*queryParserHandler) Snaphost() (*udf.SnapshotResponse, error) {
return &udf.SnapshotResponse{}, nil
}
func (*queryParserHandler) Restore(req *udf.RestoreRequest)
(*udf.RestoreResponse, error) {
return &udf.RestoreResponse{
Success: true,
}, nil
}
func (*queryParserHandler) BeginBatch(begin *udf.BeginBatch) error {
log.Println("+++++ BeginBatch was called +++++")
return errors.New("batching not supported")
}
func (q *queryParserHandler) Point(p *udf.Point) error {
log.Println("+++++ Point was called +++++")
value := p.FieldsString["request"]
pos := strings.Index(value, "?")
if pos == -1 {
// No query string? See ya!
return nil
}
// Reset the point so we can reuse it
p.Database = ""
p.RetentionPolicy = ""
p.Group = ""
p.Dimensions = nil
p.FieldsDouble = nil
p.FieldsInt = nil
p.Tags = map[string]string{}
p.FieldsString = map[string]string{}
query := value[pos+1 : len(value)]
params := strings.Split(query, "&")
q.agent.Responses <- &udf.Response{
Message: &udf.Response_Begin{
Begin: q.begin,
},
}
for i := 0; i < len(params); i++ {
parts := strings.Split(params[i], "=")
p.Tags["key"] = parts[0]
p.FieldsString["value"] = parts[1]
log.Println(parts[0])
q.agent.Responses <- &udf.Response{
Message: &udf.Response_Point{
Point: p,
},
}
}
q.agent.Responses <- &udf.Response{
Message: &udf.Response_End{
End: q.end,
},
}
close(q.agent.Responses)
return nil
}
func (*queryParserHandler) EndBatch(end *udf.EndBatch) error {
return errors.New("batching not supported")
}
func (q *queryParserHandler) Stop() {
close(q.agent.Responses)
}
func main() {
a := agent.New(os.Stdin, os.Stdout)
h := newQueryParserHandler(a)
a.Handler = h
log.Println("Starting agent")
a.Start()
err := a.Wait()
if err != nil {
log.Fatal(err)
}
}
```
My TICKScript looks like this
```
stream
|from()
.measurement('logparser_grok')
@queryParser()
|influxDBOut()
.database('telegraf')
.retentionPolicy('default')
.measurement('query_param')
```
The points don't get recorded and I wind up with this error message
[task:query_parser] 2016/07/20 21:26:34 E! failed to snapshot task query_parser
keepalive timedout, last keepalive received was: 2016-07-20 21:04:57.81406868
-0400 EDT
I've been able to get Wants - Stream / Provide - Stream to at least write data,
but it only writes the final point, which makes me think my UDF should provide
a batch
```
/**
* Takes in a stream from logparser and tracks usage of query parameters in the
* URL
*
* query_param,key=key value=value
*/
package main
import (
"errors"
"log"
"os"
"strings"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/udf/agent"
)
type queryParserHandler struct {
agent *agent.Agent
}
func newQueryParserHandler(a *agent.Agent) *queryParserHandler {
return &queryParserHandler{agent: a}
}
func (*queryParserHandler) Info() (*udf.InfoResponse, error) {
log.Println("+++++ Info was called +++++")
info := &udf.InfoResponse{
Wants: udf.EdgeType_STREAM,
Provides: udf.EdgeType_STREAM,
Options: map[string]*udf.OptionInfo{},
}
return info, nil
}
func (q *queryParserHandler) Init(r *udf.InitRequest) (*udf.InitResponse,
error) {
log.Println("+++++ Init was called +++++")
init := &udf.InitResponse{
Success: true,
Error: "",
}
return init, nil
}
func (*queryParserHandler) Snaphost() (*udf.SnapshotResponse, error) {
return &udf.SnapshotResponse{}, nil
}
func (*queryParserHandler) Restore(req *udf.RestoreRequest)
(*udf.RestoreResponse, error) {
return &udf.RestoreResponse{
Success: true,
}, nil
}
func (*queryParserHandler) BeginBatch(begin *udf.BeginBatch) error {
log.Println("+++++ BeginBatch was called +++++")
return errors.New("batching not supported")
}
func (q *queryParserHandler) Point(p *udf.Point) error {
log.Println("+++++ Point was called +++++")
value := p.FieldsString["request"]
pos := strings.Index(value, "?")
if pos == -1 {
// No query string? See ya!
return nil
}
// Reset the point so we can reuse it
p.Database = ""
p.RetentionPolicy = ""
p.Group = ""
p.Dimensions = nil
p.FieldsDouble = nil
p.FieldsInt = nil
p.Tags = map[string]string{}
p.FieldsString = map[string]string{}
query := value[pos+1 : len(value)]
params := strings.Split(query, "&")
for i := 0; i < len(params); i++ {
parts := strings.Split(params[i], "=")
p.Tags["key"] = parts[0]
p.FieldsString["value"] = parts[1]
log.Println(parts[0])
q.agent.Responses <- &udf.Response{
Message: &udf.Response_Point{
Point: p,
},
}
}
return nil
}
func (*queryParserHandler) EndBatch(end *udf.EndBatch) error {
return errors.New("batching not supported")
}
func (q *queryParserHandler) Stop() {
close(q.agent.Responses)
}
func main() {
a := agent.New(os.Stdin, os.Stdout)
h := newQueryParserHandler(a)
a.Handler = h
log.Println("Starting agent")
a.Start()
err := a.Wait()
if err != nil {
log.Fatal(err)
}
}
```
I've spend a good amount of time looking through the examples, Kapacitor source
code, GitHub, Google but not sure what I'm doing wrong. Any help would be much
appreciated.
--
Remember to include the InfluxDB version number with all issue reports
---
You received this message because you are subscribed to the Google Groups
"InfluxDB" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/influxdb.
To view this discussion on the web visit
https://groups.google.com/d/msgid/influxdb/33a13ec9-9a2b-4001-902d-cca588a12d57%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.