Hi, I have created following repository with sample project which shows the error prone situation I have mentioned before. I would be glad if you can have look at it.
https://github.com/emowhiz/heka.git Thank you. Regards, Udara Chathuranga On Tue, Dec 29, 2015 at 11:39 PM, Rob Miller <[email protected]> wrote: > Hrm. I'm not sure why you'd see the output in a different order. You > definitely shouldn't be; Heka does guarantee that data from a single stream > will be processed in the order received, and AFAICT there's nothing you've > done here that would impact the order of the output. > > I notice that the `process_message` function in your output is missing a > return code, which is weird, because I'd expect that to not work at all. > > One thing you might try as a debugging step is to use a FileOutput instead > of a SandboxOutput to see if that gives you any better results. You'll need > a SandboxEncoder to insert the count into your output. If that gives you > consistent ordering, then we've at least narrowed the problem down. > > -r > > > On 12/23/2015 12:26 AM, Udara Chathuranga wrote: > >> Hello, >> >> I have implemented a heka pipeline, which has a input, decoder, filter, >> output. >> >> In the log file if there are two entities related to two customers. >> (name, source, event, destination) >> >> 1. john,09400000900, reg1 , 09400000800 >> >> 2. sam,09400000901, reg2 , 09400000801 >> >> >> When these go through the pipeline they are written to a output file >> again. (pipeline and sandbox codes are below). >> >> At the output log event it should show; >> >> 094000008000 >> >> 094000009011 >> >> It doesn't always show in that order. When i start the pipeline again >> and again each time the order of the two events are changing even though >> it is the same log file. (i remove sandbox preservation and log streamer >> directories) >> >> Eg: Sometimes it shows as following even though the only thing done is >> restarting the pipeline. >> >> 094000009010 >> >> 094000008001 >> >> Is there any race condition happening somewhere between the filter and >> output plug-ins ? Any help would be much appreciated >> >> _heka.toml_ >> >> # input and decoder are here # >> >> [event-filter] >> type = "SandboxFilter" >> message_matcher = "Fields[type] == 'event_decode'" >> filename = "lua/counter/consumer.lua" >> preserve_data = true >> memory_limit = 20000000 >> >> [mongo_summary_output] >> type = "SandboxOutput" >> message_matcher = "Fields[type] == 'summary_aggregated'" >> filename = "lua/output/summary-output.lua" >> >> _Filter_ >> >> local message = { >> Payload =nil, >> Fields = {} >> } >> >> function process_message() >> >> message.Fields = {} >> >> if read_message("Fields[event-type]") == "reg1" then >> >> message.Fields["consumer"] = read_message("Fields[destination-address]") >> >> elseif read_message("Fields[event-type]") == "reg2" then >> >> **message.Fields["consumer"] = read_message("Fields[source-address]") >> >> end >> >> message.Fields.type ="summary_aggregated" >> if not pcall(inject_message,message)then return -1 end >> >> return 0 >> end >> >> >> _Output_ >> >> local count =0 >> >> local buffer_file ="/home/eshan/buffer.log" >> >> function process_message() >> >> localoutputbatch = read_message("Fields[consumer]") .. " " .. count >> >> local backup_file,e =io.open(buffer_file,"a+") >> backup_file:write(outputbatch) >> backup_file:close() >> count =count +1 >> >> end >> >> Regards, >> >> Udara Chathuranga >> >> >> >> _______________________________________________ >> Heka mailing list >> [email protected] >> https://mail.mozilla.org/listinfo/heka >> >> >
_______________________________________________ Heka mailing list [email protected] https://mail.mozilla.org/listinfo/heka

