Hi,
I have updated the repository with a minimal example. I would be glad if
you guys can have a look at it.

https://github.com/emowhiz/heka

Thank you,
Regards,
Udara Chathuranga

On Mon, Jan 18, 2016 at 5:44 PM, Udara Chathuranga <[email protected]>
wrote:

> Hi,
> Since the given repository is having unnecessary code I''ll provide a
> minimal example which reproduces the issue as soon as possible. Sorry for
> the inconvenience.
>
> Thank you.
>
> Regards,
> Udara Chathuranga
>
> On Mon, Jan 18, 2016 at 5:30 PM, Udara Chathuranga <
> [email protected]> wrote:
>
>> Hi,
>> I have created following repository with sample project which shows the
>> error prone situation I have mentioned before. I would be glad if you can
>> have look at it.
>>
>> https://github.com/emowhiz/heka.git
>>
>> Thank you.
>>
>> Regards,
>> Udara Chathuranga
>>
>> On Tue, Dec 29, 2015 at 11:39 PM, Rob Miller <[email protected]> wrote:
>>
>>> Hrm. I'm not sure why you'd see the output in a different order. You
>>> definitely shouldn't be; Heka does guarantee that data from a single stream
>>> will be processed in the order received, and AFAICT there's nothing you've
>>> done here that would impact the order of the output.
>>>
>>> I notice that the `process_message` function in your output is missing a
>>> return code, which is weird, because I'd expect that to not work at all.
>>>
>>> One thing you might try as a debugging step is to use a FileOutput
>>> instead of a SandboxOutput to see if that gives you any better results.
>>> You'll need a SandboxEncoder to insert the count into your output. If that
>>> gives you consistent ordering, then we've at least narrowed the problem
>>> down.
>>>
>>> -r
>>>
>>>
>>> On 12/23/2015 12:26 AM, Udara Chathuranga wrote:
>>>
>>>> Hello,
>>>>
>>>> I have implemented a heka pipeline, which has a input, decoder, filter,
>>>> output.
>>>>
>>>> In the log file if there are two entities related to two customers.
>>>> (name, source, event, destination)
>>>>
>>>> 1. john,09400000900, reg1 , 09400000800
>>>>
>>>> 2. sam,09400000901, reg2 , 09400000801
>>>>
>>>>
>>>> When these go through the pipeline they are written to a output file
>>>> again. (pipeline and sandbox codes are below).
>>>>
>>>> At the output log event it should show;
>>>>
>>>> 094000008000
>>>>
>>>> 094000009011
>>>>
>>>> It doesn't always show in that order. When i start the pipeline again
>>>> and again each time the order of the two events are changing even though
>>>> it is the same log file. (i remove sandbox preservation and log streamer
>>>> directories)
>>>>
>>>> Eg: Sometimes it shows as following even though the only thing done is
>>>> restarting the pipeline.
>>>>
>>>> 094000009010
>>>>
>>>> 094000008001
>>>>
>>>> Is there any race condition happening somewhere between the filter and
>>>> output plug-ins ? Any help would be much appreciated
>>>>
>>>> _heka.toml_
>>>>
>>>> # input and decoder are here #
>>>>
>>>> [event-filter]
>>>> type = "SandboxFilter"
>>>> message_matcher = "Fields[type] == 'event_decode'"
>>>> filename = "lua/counter/consumer.lua"
>>>> preserve_data = true
>>>> memory_limit = 20000000
>>>>
>>>> [mongo_summary_output]
>>>> type = "SandboxOutput"
>>>> message_matcher =  "Fields[type] == 'summary_aggregated'"
>>>> filename = "lua/output/summary-output.lua"
>>>>
>>>> _Filter_
>>>>
>>>>      local message = {
>>>>          Payload =nil,
>>>>          Fields = {}
>>>>      }
>>>>
>>>> function process_message()
>>>>
>>>> message.Fields = {}
>>>>
>>>> if read_message("Fields[event-type]") == "reg1" then
>>>>
>>>> message.Fields["consumer"] = read_message("Fields[destination-address]")
>>>>
>>>> elseif read_message("Fields[event-type]") == "reg2" then
>>>>
>>>> **message.Fields["consumer"] = read_message("Fields[source-address]")
>>>>
>>>> end
>>>>
>>>>      message.Fields.type ="summary_aggregated"
>>>> if not pcall(inject_message,message)then return -1 end
>>>>
>>>> return 0
>>>> end
>>>>
>>>>
>>>> _Output_
>>>>
>>>> local count =0
>>>>
>>>> local buffer_file ="/home/eshan/buffer.log"
>>>>
>>>> function process_message()
>>>>
>>>>      localoutputbatch = read_message("Fields[consumer]") .. " " .. count
>>>>
>>>> local backup_file,e =io.open(buffer_file,"a+")
>>>> backup_file:write(outputbatch)
>>>> backup_file:close()
>>>> count =count +1
>>>>
>>>> end
>>>>
>>>> Regards,
>>>>
>>>> Udara Chathuranga
>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> Heka mailing list
>>>> [email protected]
>>>> https://mail.mozilla.org/listinfo/heka
>>>>
>>>>
>>>
>>
>
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to