On 04/30/2015 02:10 PM, Adriano Santos wrote:
Hi Rob,
Thanks for your promptly response. This is the problem I'm trying
to solve. I'm reading the memstat every 10 seconds and if the free
memory is below 5 mb for three consecutive reads I would like to push a
message to kafka with the latest read.
Great. So you can keep track of that in the filter and only emit a message when
the problem state arises, rather than emitting one for every message you
receive.
As a start I'm trying first to make the filter work and then add
more logic to it.
I just tried to modify my configuration to do not mach the message
type for a quick test and I still get the same issue
When you use `inject_message` from a SandboxFilter the Type value is prepended with
`heka.sandbox.`, for security reasons. So your message_matcher of "Type !=
'counter.stats'" is still matching the message you're injecting.
In cases like this it's much better to set your filter up to positively match on the
messages you want to catch, rather than trying to exclude certain ones. If you're trying
to catch the memstat messages generated by the Linux Memory Stats Decoder, for instance,
you could use "Type == 'stats.memstats'".
-r
[StatFilter]
type = "SandboxFilter"
filename = "%ENV[HOME]/heka/config/lua/stat_filter.lua"
message_matcher = "Type != 'counter.stats'"
--[[
--]]
require "math"
require "string"
require "table"
counter_msg = 0;
local msg = {
Type = "counter.stats",
Payload = nil,
Fields = {}
}
function process_message ()
msg.Payload = read_message("Payload")
msg.Fields = read_message("Fields")
counter_msg = counter_msg + 1
msg.Fields["Count_MemFree"] = counter_msg
inject_msg(msg)
return 0
end
Best,
Adriano
On Thu, Apr 30, 2015 at 3:57 PM, Rob Miller <[email protected]
<mailto:[email protected]>> wrote:
The error message is fairly clear here, I think:
"Plugin 'MemStatFilter' error: attempted to Inject a message to itself"
The issue is that the message that you're trying to inject will be
matched by the message_matcher of the filter doing the injecting.
This is explicitly not allowed because it likely leads to an
infinite loop. Try constructing the message you generate in such a
way that it will not match the filter that is doing the injecting
and you'll have better luck.
Other notes on your implementation:
- You almost certainly don't want to use the separate `timer_event`
version. You might get thousands of messages between each ticker
interval, so in that version the count will be right but you'll only
get the payload and other data of the last message that came through
before each tick.
- You'll get much less GC churn if you define the `local msg =
{...}` table outside of the process_message function, and only
overwrite the individual values you want to change each time, rather
than creating a new table.
- This whole thing smells a bit weird to me, I suspect it's not the
best approach. What is the problem you're trying to solve?
-r
On 04/30/2015 01:38 PM, Adriano Santos wrote:
I also tried this code before and I got the same error:
--[[
--]]
require "math"
require "string"
require "table"
counter_msg = 0;
payload = nil
fields = nil
function process_message ()
local msg = {
Timestamp = nil,
Type = "counter.stats",
Payload = read_message("Payload"),
Fields = read_message("Fields")
}
counter_msg = counter_msg + 1
msg.Fields["Count_MemFree"] = counter_msg
inject_msg(msg)
return 0
end
On Thu, Apr 30, 2015 at 3:18 PM, Adriano Santos
<[email protected] <mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>> wrote:
Hi Guys,
I wrote a lua sandbox filter plugin to count a field in the
message matcher. I'm trying also to include the content of the
message into the new message that is being injected.
--[[
--]]
require "math"
require "string"
require "table"
counter_msg = 0;
payload = nil
fields = nil
local floor = math.floor
function process_message ()
counter_msg = counter_msg + 1
payload = read_message("Payload")
fields = read_message("Fields")
return 0
end
function timer_event(ns)
local msg = {
Timestamp = nil,
Type = "counter.stats",
Payload = nil,
Fields = nil
}
msg.Payload = payload
msg.Fields = fields
msg.Fields["Count_MemFree"] = counter_msg
inject_msg(msg)
end
When the timer event is triggered I got the following
error:
2015/04/30 13:01:45 Plugin 'MemStatFilter' error: attempted to
Inject a message to itself
2015/04/30 13:01:45 Plugin 'MemStatFilter': stopped
2015/04/30 13:01:45 Plugin 'MemStatFilter': has stopped,
exiting
plugin without shutting down.
2015/04/30 13:01:45
:Timestamp: 2015-04-30 20:01:45.283326565 +0000 UTC
:Type: heka.terminated
:Hostname: slc01hza
:Pid: 26170
:Uuid: cc72c845-93b5-428f-a186-a2f186981efb
:Logger: hekad
:Payload: MemStatFilter (type SandboxFilter) terminated. Error:
Filter unloaded.
:EnvVersion:
:Severity: 7
:Fields:
| name:"plugin" type:string value:"MemStatFilter"
2015/04/30 13:01:45 Plugin 'MemStatFilter' error: Lost/Droppe
I know that modify the message inside a filter is not
allowed. I
thought create a new message using inject_message or
inject_payload
was available. I tried both.
Am I doing something wrong?
Best,
Adriano
_______________________________________________
Heka mailing list
[email protected] <mailto:[email protected]>
https://mail.mozilla.org/listinfo/heka
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka