Hrm. I'm not sure why you'd see the output in a different order. You definitely
shouldn't be; Heka does guarantee that data from a single stream will be
processed in the order received, and AFAICT there's nothing you've done here
that would impact the order of the output.
I notice that the `process_message` function in your output is missing a return
code, which is weird, because I'd expect that to not work at all.
One thing you might try as a debugging step is to use a FileOutput instead of a
SandboxOutput to see if that gives you any better results. You'll need a
SandboxEncoder to insert the count into your output. If that gives you
consistent ordering, then we've at least narrowed the problem down.
-r
On 12/23/2015 12:26 AM, Udara Chathuranga wrote:
Hello,
I have implemented a heka pipeline, which has a input, decoder, filter, output.
In the log file if there are two entities related to two customers. (name,
source, event, destination)
1. john,09400000900, reg1 , 09400000800
2. sam,09400000901, reg2 , 09400000801
When these go through the pipeline they are written to a output file again.
(pipeline and sandbox codes are below).
At the output log event it should show;
094000008000
094000009011
It doesn't always show in that order. When i start the pipeline again
and again each time the order of the two events are changing even though
it is the same log file. (i remove sandbox preservation and log streamer
directories)
Eg: Sometimes it shows as following even though the only thing done is
restarting the pipeline.
094000009010
094000008001
Is there any race condition happening somewhere between the filter and output
plug-ins ? Any help would be much appreciated
_heka.toml_
# input and decoder are here #
[event-filter]
type = "SandboxFilter"
message_matcher = "Fields[type] == 'event_decode'"
filename = "lua/counter/consumer.lua"
preserve_data = true
memory_limit = 20000000
[mongo_summary_output]
type = "SandboxOutput"
message_matcher = "Fields[type] == 'summary_aggregated'"
filename = "lua/output/summary-output.lua"
_Filter_
local message = {
Payload =nil,
Fields = {}
}
function process_message()
message.Fields = {}
if read_message("Fields[event-type]") == "reg1" then
message.Fields["consumer"] = read_message("Fields[destination-address]")
elseif read_message("Fields[event-type]") == "reg2" then
**message.Fields["consumer"] = read_message("Fields[source-address]")
end
message.Fields.type ="summary_aggregated"
if not pcall(inject_message,message)then return -1 end
return 0
end
_Output_
local count =0
local buffer_file ="/home/eshan/buffer.log"
function process_message()
localoutputbatch = read_message("Fields[consumer]") .. " " .. count
local backup_file,e =io.open(buffer_file,"a+")
backup_file:write(outputbatch)
backup_file:close()
count =count +1
end
Regards,
Udara Chathuranga
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka