That's what the current behavior is actually - byte slices being passed
back and forth in a channel. I changed a bunch of stuff around so there is
now a use_buffering option and it blindly sends if buffering is false,
discarding failed batches. Buffering is on by default.

If you could give this (
https://github.com/highlyunavailable/heka/compare/feature/bufferedelasticsearch
) another glance for code quality, that'd be great - also, let me know if
there's some sort of other thing you'd like to see around this (unit tests,
edits to changelog, etc.) before I submit a pull request. I did test this
with an ES instance and it seems to perform well no matter how I kill the
instance - it'll fail and then once the instance comes up just start
sending again. The dashboard also reflects the correct number of processed
messages now.

On Mon, Jan 12, 2015 at 3:15 PM, Rob Miller <[email protected]> wrote:

> Oh, one other thought... ultimately the goal is for buffering to be a
> config option. In this case, if the user doesn't want buffering then Run
> could call SendRecord directly. Or, possibly better, spin up another
> goroutine that pulls the batch off of a channel and calls SendRecord, so
> processing new messages isn't blocked on the HTTP request.
>
> -r
>
>
> On 01/12/2015 02:34 PM, Tiru Srikantha wrote:
>
>> Thanks for the code review. I wrote that at 11pm and was going to go
>> over it again later today to clean it up, but you caught a couple things
>> I wouldn't have. queueFull got copied directly from TCPOutput and the
>> names changed, I was already going to go back and rework that for the
>> byte slice.
>>
>> About framing:
>> http://hekad.readthedocs.org/en/v0.8.2/message/index.html#stream-framing
>> doesn't show a "trailer" character and
>> https://github.com/mozilla-services/heka/blob/dev/
>> pipeline/stream_parser.go#L315
>> suggests to me that it will attempt to blindly grab `message_length -
>> header_size` bytes once it finds a header. It's definitely something to
>> make sure of for a more thorough implementation of buffering, I agree.
>>
>> On Mon, Jan 12, 2015 at 2:08 PM, Rob Miller <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>>     On 01/12/2015 11:52 AM, Tiru Srikantha wrote:
>>
>>         Yeah, the more I think about it the more I'm sure I was
>>         overengineering.
>>         Your point was a good one re: complexity. I'm not sure why we'd be
>>         framing the messages anyway, though. Outer/inner framing
>>         actually has no
>>         purpose in this context since the records are JSON entries and the
>>         format delimits the individual records. If we were doing binary
>>         buffering, I'd say that more work was needed to ensure the
>>         lengths were
>>         read properly, but for this specific plugin it wouldn't be a
>> problem
>>         unless I totally misunderstand framing.
>>
>>     No, you're understanding correctly, I'm just thinking in the general
>>     case. Our plan is to make the buffering functionality in
>>     BufferedOutput available to all outputs, so any batching support
>>     that's added needs to be able to support generic data streams. We
>>     have to make sure that if someone decides to buffer a batch of
>>     framed, protobuf encoded Heka messages that the buffer's batch
>>     framing and the individual message framing don't interfere with each
>>     other.
>>
>>         I implemented it
>>         (https://github.com/__highlyunavailable/heka/__compare/feature/__
>> bufferedelasticsearch?diff=__split&name=feature%__2Fbufferedelasticsearch
>>         <https://github.com/highlyunavailable/heka/compare/feature/
>> bufferedelasticsearch?diff=split&name=feature%2Fbufferedelasticsearch>)
>>
>>         and it works fine against a test ES instance without framing. I
>>         tried
>>         killing the instance and it errored as you might expect, then
>>         immediately started sending data when the instance came back up
>>         (including persisting the data through a Heka restart), so I
>>         consider it
>>         a success pending a bit more testing and cleaning up the unused
>>         variables. This also got rid of the receiver/committer
>>         goroutines from
>>         `Init()` and changed the behavior from "block if the buffer is
>>         full and
>>         we haven't sent the buffer yet" to "write the buffer to disk and
>>         send it
>>         when we have time" which I consider a better/safer behavior than
>>         leaving
>>         packs in the queue until there's space to process them.
>>
>>     Notes from a quick peek:
>>
>>     * You don't need an `if` at the end of QueueRecord, just `return
>>     b.QueueBytes(msgBytes)` should be fine.
>>     * If you're storing the config on the ElasticSearchOutput struct,
>>     you probably don't need to copy all of the attributes from the
>>     config struct to the output struct. Is there a reason to set
>>     `o.flushInterval = o.conf.FlushInterval` when you can just refer to
>>     `o.conf.FlushInterval` anywhere?
>>     * You're never checking the results of the QueueBytes call, and
>>     instead you're checking if the result of the or.Encode call is
>>     QueueIsFull, which doesn't really make much sense.
>>     * The queueFull seems a bit off, it should be dealing w/ byte
>>     slices, not a single pack. Also, a single success or failure might
>>     represent an entire batch of messages, not just a single one, w.r.t.
>>     your processMessageCount and dropMessageCount.
>>
>>         Also, the Processed Message Count seems to stay empty in the
>>         dashboard
>>         for the ES output - what do I need to implement to have it
>>         properly show
>>         how many messages it has sent to ES? I can't find anything in
>>         https://hekad.readthedocs.org/__en/latest/developing/plugin.
>> __html
>>         <https://hekad.readthedocs.org/en/latest/developing/plugin.html>
>>         about
>>         what interfaces need to be implemented or where I need to
>>         register to
>>         have that happen. Just adding a ReportMessage method to satisfy
>>         pipeline.ReportingPlugin didn't seem to do it.
>>
>>     Yes, this is a bit awkward. Right now you have to implement it on a
>>     case by case basis, when really Heka should be doing at least some
>>     of this accounting for you. We have an issue open for it:
>>     https://github.com/mozilla-__services/heka/issues/918
>>     <https://github.com/mozilla-services/heka/issues/918>
>>
>>     Using ReportMessage is the way to make it work, however. You can see
>>     how it's supposed to work in the SandboxFilter implementation:
>>     https://github.com/mozilla-__services/heka/blob/dev/__
>> sandbox/plugins/sandbox___filter.go#L120
>>     <https://github.com/mozilla-services/heka/blob/dev/
>> sandbox/plugins/sandbox_filter.go#L120>
>>
>>     Hope this helps,
>>
>>     -r
>>
>>
>>         On Mon, Jan 12, 2015 at 11:13 AM, Rob Miller
>>         <[email protected] <mailto:[email protected]>
>>         <mailto:[email protected] <mailto:[email protected]>>> wrote:
>>
>>              This is true, but this is true no matter how the buffering is
>>              engineered. There might be messages sitting on the router's
>>         channel,
>>              or the output matcher's channel, or the output's channel,
>>         all of
>>              which would be lost if Heka crashes. I've imagined the
>>         buffering to
>>              be protecting against downtime and/or slowness of the
>>         downstream
>>              destination, not as a delivery guarantee mechanism. I wish
>>         I could
>>              tell you Heka guarantees message delivery, but it doesn't.
>>         I'd love
>>              to add support, but it would need to be considered within the
>>              context of the whole pipeline, not just a single point.
>>
>>              That being said, harm reduction is reasonable, and I'm not
>>         dead set
>>              against being able to buffer partial batches. But I still
>>         shy away
>>              from the idea of having a separate SendRecords function for
>>         bulk
>>              sending, if possible. Maybe a buffer that was set up for
>>         batching
>>              could accumulate individual records to a short term buffer,
>>         adding
>>              them to the end of the send queue when a full batch is
>>         acquired? I'm
>>              open to further discussion.
>>
>>              Also, our message header contains a message length, so with
>>         care it
>>              should be possible to nest the framing, it's just that
>>         confusing the
>>              inner framing for the outer framing is a pitfall to beware.
>>
>>              -r
>>
>>
>>              On 01/11/2015 11:03 PM, Tiru Srikantha wrote:
>>
>>                  The simple approach means you can lose messages if the
>> Heka
>>                  process dies
>>                  before the ticker expires or the maxcount is hit, as
>>         well as the
>>                  nested
>>                  framing issues you raised. I'm probably mentally
>>                  over-engineering this,
>>                  though, and if it's good enough for you, then that's
>>         what you
>>                  get. I'll
>>                  send a PR with an implementation based on what you
>>         recommended - it
>>                  should be fairly simple given those limitations. Thanks
>>         for the
>>                  guidance.
>>
>>                  On Sun, Jan 11, 2015 at 6:52 PM, Rob Miller
>>         <[email protected] <mailto:[email protected]>
>>                  <mailto:[email protected] <mailto:[email protected]
>> >>
>>                  <mailto:[email protected]
>>         <mailto:[email protected]> <mailto:[email protected]
>>         <mailto:[email protected]>>>> wrote:
>>
>>
>>
>>                       On 01/11/2015 12:34 AM, Tiru Srikantha wrote:
>>
>>                           Yeah, I'm looking at this as well. I don't
>>         like losing
>>                  the bulk
>>                           because
>>                           that means it gets slow when your load gets
>>         high due to
>>                  HTTP
>>                           overhead.
>>                           If I didn't care about bulk it'd just be a quick
>>                  rewrite. The other
>>                           problem I ran into around bulk operations on a
>>         buffered
>>                  queue is
>>                           that
>>                           there are 4 points when you want to flush to
>>         the output:
>>
>>                           1. Queued messages count equals count to send.
>>                           2. Queued message size with the new message
>>         added will
>>                  exceed
>>                           the max
>>                           bulk message size, even if the count is less
>>         than max,
>>                  due to large
>>                           messages.
>>                           3. A timer expires to force a flush to the
>> output
>>                  target even if the
>>                           count or max size hasn't been hit yet, to get
>>         the data
>>                  into the
>>                           output
>>                           target in a timely manner.
>>                           4. Plugin is shutting down.
>>
>>                           I'm actually re-writing a lot of the
>>                  plugins/buffered_output.go
>>                           file in
>>                           my local fork because of this and teasing out
>>         the "read
>>                  the next
>>                           record
>>                           from the file pile" operation from the "send
>>         the next
>>                  record"
>>                           operation,
>>                           so what I'm aiming for is something like a
>>                  BulkBufferedOutput
>>                           interface
>>                           with a SendRecords(records [][]byte) method
>>         that must be
>>                           implemented in
>>                           lieu of BufferedOutput's SendRecord(record
>>         []byte) and
>>                  some code
>>                           that's
>>                           shared between them to buffer new messages and
>>         read
>>                  buffered
>>                           messages.
>>                           SendRecords would not advance the cursor until
>>         the bulk
>>                  operation
>>                           succeeded, as you might expect.
>>
>>
>>                       I recommend reading my other message in this thread
>>                       (http://is.gd/kzuhxs) for an alternate approach. I
>>         think
>>                  you can
>>                       achieve what you want with less effort, and better
>>                  separation of
>>                       concerns, by doing the batching *before* you pass
>>         data in
>>                  to the
>>                       BufferedOutput. Ultimately each buffer "record" is
>>         just a
>>                  slice of
>>                       bytes, the buffer doesn't need to know or care if
>>         that slice
>>                       contains a single serialized message or an
>> accumulated
>>                  batch of them.
>>
>>                           I'll submit a PR once I finish.
>>
>>
>>                       I always look forward to PRs, but I'll warn you
>>         that one
>>                  that the
>>                       approach described above will likely be rejected.
>>         I'm not
>>                  very keen
>>                       on introducing a separate buffering interface
>>         specifically for
>>                       batches, when a simpler change can solve the same
>>         problems.
>>
>>                       -r
>>
>>
>>
>>
>>
>>
>>
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to