I figured this out. https://github.com/mpchadwick/kapacitor_utils/blob/c0b4f250e3725f30475a05806fe0f1f677631c81/query_parser/query_parser.go
In the end it was me being a golang newbie. Rather than reusing the existing point in the loop I needed to instantiate a new point in each iteration. On Wednesday, July 20, 2016 at 9:30:40 PM UTC-4, [email protected] wrote: > Related to the thread here > https://groups.google.com/forum/#!topic/influxdb/lDRWWoMxaSY I'm trying to > write a UDF (in Go) that processes each point written to log parser into > multiple points on a separate measurement to track query parameter usage > > I have a UDF that looks like this which... > > > ``` > /** > * Takes in a stream from logparser and tracks usage of query parameters in > the > * URL > * > * query_param,key=key value=value > */ > package main > > import ( > "errors" > "log" > "os" > "strings" > > "github.com/influxdata/kapacitor/udf" > "github.com/influxdata/kapacitor/udf/agent" > ) > > type queryParserHandler struct { > agent *agent.Agent > begin *udf.BeginBatch > end *udf.EndBatch > } > > func newQueryParserHandler(a *agent.Agent) *queryParserHandler { > return &queryParserHandler{agent: a} > } > > func (*queryParserHandler) Info() (*udf.InfoResponse, error) { > log.Println("+++++ Info was called +++++") > info := &udf.InfoResponse{ > Wants: udf.EdgeType_STREAM, > Provides: udf.EdgeType_BATCH, > Options: map[string]*udf.OptionInfo{}, > } > return info, nil > } > > func (q *queryParserHandler) Init(r *udf.InitRequest) (*udf.InitResponse, > error) { > log.Println("+++++ Init was called +++++") > init := &udf.InitResponse{ > Success: true, > Error: "", > } > return init, nil > } > > func (*queryParserHandler) Snaphost() (*udf.SnapshotResponse, error) { > return &udf.SnapshotResponse{}, nil > } > > func (*queryParserHandler) Restore(req *udf.RestoreRequest) > (*udf.RestoreResponse, error) { > return &udf.RestoreResponse{ > Success: true, > }, nil > } > > func (*queryParserHandler) BeginBatch(begin *udf.BeginBatch) error { > log.Println("+++++ BeginBatch was called +++++") > return errors.New("batching not supported") > } > > func (q *queryParserHandler) Point(p *udf.Point) error { > log.Println("+++++ Point was called +++++") > value := p.FieldsString["request"] > pos := strings.Index(value, "?") > if pos == -1 { > // No query string? See ya! > return nil > } > > // Reset the point so we can reuse it > p.Database = "" > p.RetentionPolicy = "" > p.Group = "" > p.Dimensions = nil > p.FieldsDouble = nil > p.FieldsInt = nil > p.Tags = map[string]string{} > p.FieldsString = map[string]string{} > > query := value[pos+1 : len(value)] > params := strings.Split(query, "&") > > q.agent.Responses <- &udf.Response{ > Message: &udf.Response_Begin{ > Begin: q.begin, > }, > } > > for i := 0; i < len(params); i++ { > parts := strings.Split(params[i], "=") > p.Tags["key"] = parts[0] > p.FieldsString["value"] = parts[1] > > log.Println(parts[0]) > > q.agent.Responses <- &udf.Response{ > Message: &udf.Response_Point{ > Point: p, > }, > } > } > > q.agent.Responses <- &udf.Response{ > Message: &udf.Response_End{ > End: q.end, > }, > } > > close(q.agent.Responses) > > return nil > } > > func (*queryParserHandler) EndBatch(end *udf.EndBatch) error { > return errors.New("batching not supported") > } > > func (q *queryParserHandler) Stop() { > close(q.agent.Responses) > } > > func main() { > a := agent.New(os.Stdin, os.Stdout) > h := newQueryParserHandler(a) > a.Handler = h > > log.Println("Starting agent") > a.Start() > err := a.Wait() > if err != nil { > log.Fatal(err) > } > } > ``` > > My TICKScript looks like this > > ``` > stream > |from() > .measurement('logparser_grok') > @queryParser() > |influxDBOut() > .database('telegraf') > .retentionPolicy('default') > .measurement('query_param') > > ``` > > The points don't get recorded and I wind up with this error message > > [task:query_parser] 2016/07/20 21:26:34 E! failed to snapshot task > query_parser keepalive timedout, last keepalive received was: 2016-07-20 > 21:04:57.81406868 -0400 EDT > > I've been able to get Wants - Stream / Provide - Stream to at least write > data, but it only writes the final point, which makes me think my UDF should > provide a batch > > ``` > /** > * Takes in a stream from logparser and tracks usage of query parameters in > the > * URL > * > * query_param,key=key value=value > */ > package main > > import ( > "errors" > "log" > "os" > "strings" > > "github.com/influxdata/kapacitor/udf" > "github.com/influxdata/kapacitor/udf/agent" > ) > > type queryParserHandler struct { > agent *agent.Agent > } > > func newQueryParserHandler(a *agent.Agent) *queryParserHandler { > return &queryParserHandler{agent: a} > } > > func (*queryParserHandler) Info() (*udf.InfoResponse, error) { > log.Println("+++++ Info was called +++++") > info := &udf.InfoResponse{ > Wants: udf.EdgeType_STREAM, > Provides: udf.EdgeType_STREAM, > Options: map[string]*udf.OptionInfo{}, > } > return info, nil > } > > func (q *queryParserHandler) Init(r *udf.InitRequest) (*udf.InitResponse, > error) { > log.Println("+++++ Init was called +++++") > init := &udf.InitResponse{ > Success: true, > Error: "", > } > return init, nil > } > > func (*queryParserHandler) Snaphost() (*udf.SnapshotResponse, error) { > return &udf.SnapshotResponse{}, nil > } > > func (*queryParserHandler) Restore(req *udf.RestoreRequest) > (*udf.RestoreResponse, error) { > return &udf.RestoreResponse{ > Success: true, > }, nil > } > > func (*queryParserHandler) BeginBatch(begin *udf.BeginBatch) error { > log.Println("+++++ BeginBatch was called +++++") > return errors.New("batching not supported") > } > > func (q *queryParserHandler) Point(p *udf.Point) error { > log.Println("+++++ Point was called +++++") > value := p.FieldsString["request"] > pos := strings.Index(value, "?") > if pos == -1 { > // No query string? See ya! > return nil > } > > // Reset the point so we can reuse it > p.Database = "" > p.RetentionPolicy = "" > p.Group = "" > p.Dimensions = nil > p.FieldsDouble = nil > p.FieldsInt = nil > p.Tags = map[string]string{} > p.FieldsString = map[string]string{} > > query := value[pos+1 : len(value)] > params := strings.Split(query, "&") > > for i := 0; i < len(params); i++ { > parts := strings.Split(params[i], "=") > p.Tags["key"] = parts[0] > p.FieldsString["value"] = parts[1] > > log.Println(parts[0]) > > q.agent.Responses <- &udf.Response{ > Message: &udf.Response_Point{ > Point: p, > }, > } > } > > return nil > } > > func (*queryParserHandler) EndBatch(end *udf.EndBatch) error { > return errors.New("batching not supported") > } > > func (q *queryParserHandler) Stop() { > close(q.agent.Responses) > } > > func main() { > a := agent.New(os.Stdin, os.Stdout) > h := newQueryParserHandler(a) > a.Handler = h > > log.Println("Starting agent") > a.Start() > err := a.Wait() > if err != nil { > log.Fatal(err) > } > } > ``` > > I've spend a good amount of time looking through the examples, Kapacitor > source code, GitHub, Google but not sure what I'm doing wrong. Any help would > be much appreciated. -- Remember to include the InfluxDB version number with all issue reports --- You received this message because you are subscribed to the Google Groups "InfluxDB" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/influxdb. To view this discussion on the web visit https://groups.google.com/d/msgid/influxdb/1ef112c7-edcc-45d9-8080-86612f993296%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
