Hi,
I have created following repository with sample project which shows the
error prone situation I have mentioned before. I would be glad if you can
have look at it.

https://github.com/emowhiz/heka.git

Thank you.

Regards,
Udara Chathuranga

On Tue, Dec 29, 2015 at 11:39 PM, Rob Miller <[email protected]> wrote:

> Hrm. I'm not sure why you'd see the output in a different order. You
> definitely shouldn't be; Heka does guarantee that data from a single stream
> will be processed in the order received, and AFAICT there's nothing you've
> done here that would impact the order of the output.
>
> I notice that the `process_message` function in your output is missing a
> return code, which is weird, because I'd expect that to not work at all.
>
> One thing you might try as a debugging step is to use a FileOutput instead
> of a SandboxOutput to see if that gives you any better results. You'll need
> a SandboxEncoder to insert the count into your output. If that gives you
> consistent ordering, then we've at least narrowed the problem down.
>
> -r
>
>
> On 12/23/2015 12:26 AM, Udara Chathuranga wrote:
>
>> Hello,
>>
>> I have implemented a heka pipeline, which has a input, decoder, filter,
>> output.
>>
>> In the log file if there are two entities related to two customers.
>> (name, source, event, destination)
>>
>> 1. john,09400000900, reg1 , 09400000800
>>
>> 2. sam,09400000901, reg2 , 09400000801
>>
>>
>> When these go through the pipeline they are written to a output file
>> again. (pipeline and sandbox codes are below).
>>
>> At the output log event it should show;
>>
>> 094000008000
>>
>> 094000009011
>>
>> It doesn't always show in that order. When i start the pipeline again
>> and again each time the order of the two events are changing even though
>> it is the same log file. (i remove sandbox preservation and log streamer
>> directories)
>>
>> Eg: Sometimes it shows as following even though the only thing done is
>> restarting the pipeline.
>>
>> 094000009010
>>
>> 094000008001
>>
>> Is there any race condition happening somewhere between the filter and
>> output plug-ins ? Any help would be much appreciated
>>
>> _heka.toml_
>>
>> # input and decoder are here #
>>
>> [event-filter]
>> type = "SandboxFilter"
>> message_matcher = "Fields[type] == 'event_decode'"
>> filename = "lua/counter/consumer.lua"
>> preserve_data = true
>> memory_limit = 20000000
>>
>> [mongo_summary_output]
>> type = "SandboxOutput"
>> message_matcher =  "Fields[type] == 'summary_aggregated'"
>> filename = "lua/output/summary-output.lua"
>>
>> _Filter_
>>
>>      local message = {
>>          Payload =nil,
>>          Fields = {}
>>      }
>>
>> function process_message()
>>
>> message.Fields = {}
>>
>> if read_message("Fields[event-type]") == "reg1" then
>>
>> message.Fields["consumer"] = read_message("Fields[destination-address]")
>>
>> elseif read_message("Fields[event-type]") == "reg2" then
>>
>> **message.Fields["consumer"] = read_message("Fields[source-address]")
>>
>> end
>>
>>      message.Fields.type ="summary_aggregated"
>> if not pcall(inject_message,message)then return -1 end
>>
>> return 0
>> end
>>
>>
>> _Output_
>>
>> local count =0
>>
>> local buffer_file ="/home/eshan/buffer.log"
>>
>> function process_message()
>>
>>      localoutputbatch = read_message("Fields[consumer]") .. " " .. count
>>
>> local backup_file,e =io.open(buffer_file,"a+")
>> backup_file:write(outputbatch)
>> backup_file:close()
>> count =count +1
>>
>> end
>>
>> Regards,
>>
>> Udara Chathuranga
>>
>>
>>
>> _______________________________________________
>> Heka mailing list
>> [email protected]
>> https://mail.mozilla.org/listinfo/heka
>>
>>
>
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to