Hi, I have updated the repository with a minimal example. I would be glad if you guys can have a look at it.
https://github.com/emowhiz/heka Thank you, Regards, Udara Chathuranga On Mon, Jan 18, 2016 at 5:44 PM, Udara Chathuranga <[email protected]> wrote: > Hi, > Since the given repository is having unnecessary code I''ll provide a > minimal example which reproduces the issue as soon as possible. Sorry for > the inconvenience. > > Thank you. > > Regards, > Udara Chathuranga > > On Mon, Jan 18, 2016 at 5:30 PM, Udara Chathuranga < > [email protected]> wrote: > >> Hi, >> I have created following repository with sample project which shows the >> error prone situation I have mentioned before. I would be glad if you can >> have look at it. >> >> https://github.com/emowhiz/heka.git >> >> Thank you. >> >> Regards, >> Udara Chathuranga >> >> On Tue, Dec 29, 2015 at 11:39 PM, Rob Miller <[email protected]> wrote: >> >>> Hrm. I'm not sure why you'd see the output in a different order. You >>> definitely shouldn't be; Heka does guarantee that data from a single stream >>> will be processed in the order received, and AFAICT there's nothing you've >>> done here that would impact the order of the output. >>> >>> I notice that the `process_message` function in your output is missing a >>> return code, which is weird, because I'd expect that to not work at all. >>> >>> One thing you might try as a debugging step is to use a FileOutput >>> instead of a SandboxOutput to see if that gives you any better results. >>> You'll need a SandboxEncoder to insert the count into your output. If that >>> gives you consistent ordering, then we've at least narrowed the problem >>> down. >>> >>> -r >>> >>> >>> On 12/23/2015 12:26 AM, Udara Chathuranga wrote: >>> >>>> Hello, >>>> >>>> I have implemented a heka pipeline, which has a input, decoder, filter, >>>> output. >>>> >>>> In the log file if there are two entities related to two customers. >>>> (name, source, event, destination) >>>> >>>> 1. john,09400000900, reg1 , 09400000800 >>>> >>>> 2. sam,09400000901, reg2 , 09400000801 >>>> >>>> >>>> When these go through the pipeline they are written to a output file >>>> again. (pipeline and sandbox codes are below). >>>> >>>> At the output log event it should show; >>>> >>>> 094000008000 >>>> >>>> 094000009011 >>>> >>>> It doesn't always show in that order. When i start the pipeline again >>>> and again each time the order of the two events are changing even though >>>> it is the same log file. (i remove sandbox preservation and log streamer >>>> directories) >>>> >>>> Eg: Sometimes it shows as following even though the only thing done is >>>> restarting the pipeline. >>>> >>>> 094000009010 >>>> >>>> 094000008001 >>>> >>>> Is there any race condition happening somewhere between the filter and >>>> output plug-ins ? Any help would be much appreciated >>>> >>>> _heka.toml_ >>>> >>>> # input and decoder are here # >>>> >>>> [event-filter] >>>> type = "SandboxFilter" >>>> message_matcher = "Fields[type] == 'event_decode'" >>>> filename = "lua/counter/consumer.lua" >>>> preserve_data = true >>>> memory_limit = 20000000 >>>> >>>> [mongo_summary_output] >>>> type = "SandboxOutput" >>>> message_matcher = "Fields[type] == 'summary_aggregated'" >>>> filename = "lua/output/summary-output.lua" >>>> >>>> _Filter_ >>>> >>>> local message = { >>>> Payload =nil, >>>> Fields = {} >>>> } >>>> >>>> function process_message() >>>> >>>> message.Fields = {} >>>> >>>> if read_message("Fields[event-type]") == "reg1" then >>>> >>>> message.Fields["consumer"] = read_message("Fields[destination-address]") >>>> >>>> elseif read_message("Fields[event-type]") == "reg2" then >>>> >>>> **message.Fields["consumer"] = read_message("Fields[source-address]") >>>> >>>> end >>>> >>>> message.Fields.type ="summary_aggregated" >>>> if not pcall(inject_message,message)then return -1 end >>>> >>>> return 0 >>>> end >>>> >>>> >>>> _Output_ >>>> >>>> local count =0 >>>> >>>> local buffer_file ="/home/eshan/buffer.log" >>>> >>>> function process_message() >>>> >>>> localoutputbatch = read_message("Fields[consumer]") .. " " .. count >>>> >>>> local backup_file,e =io.open(buffer_file,"a+") >>>> backup_file:write(outputbatch) >>>> backup_file:close() >>>> count =count +1 >>>> >>>> end >>>> >>>> Regards, >>>> >>>> Udara Chathuranga >>>> >>>> >>>> >>>> _______________________________________________ >>>> Heka mailing list >>>> [email protected] >>>> https://mail.mozilla.org/listinfo/heka >>>> >>>> >>> >> >
_______________________________________________ Heka mailing list [email protected] https://mail.mozilla.org/listinfo/heka

