That's what the current behavior is actually - byte slices being passed back and forth in a channel. I changed a bunch of stuff around so there is now a use_buffering option and it blindly sends if buffering is false, discarding failed batches. Buffering is on by default.
If you could give this ( https://github.com/highlyunavailable/heka/compare/feature/bufferedelasticsearch ) another glance for code quality, that'd be great - also, let me know if there's some sort of other thing you'd like to see around this (unit tests, edits to changelog, etc.) before I submit a pull request. I did test this with an ES instance and it seems to perform well no matter how I kill the instance - it'll fail and then once the instance comes up just start sending again. The dashboard also reflects the correct number of processed messages now. On Mon, Jan 12, 2015 at 3:15 PM, Rob Miller <[email protected]> wrote: > Oh, one other thought... ultimately the goal is for buffering to be a > config option. In this case, if the user doesn't want buffering then Run > could call SendRecord directly. Or, possibly better, spin up another > goroutine that pulls the batch off of a channel and calls SendRecord, so > processing new messages isn't blocked on the HTTP request. > > -r > > > On 01/12/2015 02:34 PM, Tiru Srikantha wrote: > >> Thanks for the code review. I wrote that at 11pm and was going to go >> over it again later today to clean it up, but you caught a couple things >> I wouldn't have. queueFull got copied directly from TCPOutput and the >> names changed, I was already going to go back and rework that for the >> byte slice. >> >> About framing: >> http://hekad.readthedocs.org/en/v0.8.2/message/index.html#stream-framing >> doesn't show a "trailer" character and >> https://github.com/mozilla-services/heka/blob/dev/ >> pipeline/stream_parser.go#L315 >> suggests to me that it will attempt to blindly grab `message_length - >> header_size` bytes once it finds a header. It's definitely something to >> make sure of for a more thorough implementation of buffering, I agree. >> >> On Mon, Jan 12, 2015 at 2:08 PM, Rob Miller <[email protected] >> <mailto:[email protected]>> wrote: >> >> On 01/12/2015 11:52 AM, Tiru Srikantha wrote: >> >> Yeah, the more I think about it the more I'm sure I was >> overengineering. >> Your point was a good one re: complexity. I'm not sure why we'd be >> framing the messages anyway, though. Outer/inner framing >> actually has no >> purpose in this context since the records are JSON entries and the >> format delimits the individual records. If we were doing binary >> buffering, I'd say that more work was needed to ensure the >> lengths were >> read properly, but for this specific plugin it wouldn't be a >> problem >> unless I totally misunderstand framing. >> >> No, you're understanding correctly, I'm just thinking in the general >> case. Our plan is to make the buffering functionality in >> BufferedOutput available to all outputs, so any batching support >> that's added needs to be able to support generic data streams. We >> have to make sure that if someone decides to buffer a batch of >> framed, protobuf encoded Heka messages that the buffer's batch >> framing and the individual message framing don't interfere with each >> other. >> >> I implemented it >> (https://github.com/__highlyunavailable/heka/__compare/feature/__ >> bufferedelasticsearch?diff=__split&name=feature%__2Fbufferedelasticsearch >> <https://github.com/highlyunavailable/heka/compare/feature/ >> bufferedelasticsearch?diff=split&name=feature%2Fbufferedelasticsearch>) >> >> and it works fine against a test ES instance without framing. I >> tried >> killing the instance and it errored as you might expect, then >> immediately started sending data when the instance came back up >> (including persisting the data through a Heka restart), so I >> consider it >> a success pending a bit more testing and cleaning up the unused >> variables. This also got rid of the receiver/committer >> goroutines from >> `Init()` and changed the behavior from "block if the buffer is >> full and >> we haven't sent the buffer yet" to "write the buffer to disk and >> send it >> when we have time" which I consider a better/safer behavior than >> leaving >> packs in the queue until there's space to process them. >> >> Notes from a quick peek: >> >> * You don't need an `if` at the end of QueueRecord, just `return >> b.QueueBytes(msgBytes)` should be fine. >> * If you're storing the config on the ElasticSearchOutput struct, >> you probably don't need to copy all of the attributes from the >> config struct to the output struct. Is there a reason to set >> `o.flushInterval = o.conf.FlushInterval` when you can just refer to >> `o.conf.FlushInterval` anywhere? >> * You're never checking the results of the QueueBytes call, and >> instead you're checking if the result of the or.Encode call is >> QueueIsFull, which doesn't really make much sense. >> * The queueFull seems a bit off, it should be dealing w/ byte >> slices, not a single pack. Also, a single success or failure might >> represent an entire batch of messages, not just a single one, w.r.t. >> your processMessageCount and dropMessageCount. >> >> Also, the Processed Message Count seems to stay empty in the >> dashboard >> for the ES output - what do I need to implement to have it >> properly show >> how many messages it has sent to ES? I can't find anything in >> https://hekad.readthedocs.org/__en/latest/developing/plugin. >> __html >> <https://hekad.readthedocs.org/en/latest/developing/plugin.html> >> about >> what interfaces need to be implemented or where I need to >> register to >> have that happen. Just adding a ReportMessage method to satisfy >> pipeline.ReportingPlugin didn't seem to do it. >> >> Yes, this is a bit awkward. Right now you have to implement it on a >> case by case basis, when really Heka should be doing at least some >> of this accounting for you. We have an issue open for it: >> https://github.com/mozilla-__services/heka/issues/918 >> <https://github.com/mozilla-services/heka/issues/918> >> >> Using ReportMessage is the way to make it work, however. You can see >> how it's supposed to work in the SandboxFilter implementation: >> https://github.com/mozilla-__services/heka/blob/dev/__ >> sandbox/plugins/sandbox___filter.go#L120 >> <https://github.com/mozilla-services/heka/blob/dev/ >> sandbox/plugins/sandbox_filter.go#L120> >> >> Hope this helps, >> >> -r >> >> >> On Mon, Jan 12, 2015 at 11:13 AM, Rob Miller >> <[email protected] <mailto:[email protected]> >> <mailto:[email protected] <mailto:[email protected]>>> wrote: >> >> This is true, but this is true no matter how the buffering is >> engineered. There might be messages sitting on the router's >> channel, >> or the output matcher's channel, or the output's channel, >> all of >> which would be lost if Heka crashes. I've imagined the >> buffering to >> be protecting against downtime and/or slowness of the >> downstream >> destination, not as a delivery guarantee mechanism. I wish >> I could >> tell you Heka guarantees message delivery, but it doesn't. >> I'd love >> to add support, but it would need to be considered within the >> context of the whole pipeline, not just a single point. >> >> That being said, harm reduction is reasonable, and I'm not >> dead set >> against being able to buffer partial batches. But I still >> shy away >> from the idea of having a separate SendRecords function for >> bulk >> sending, if possible. Maybe a buffer that was set up for >> batching >> could accumulate individual records to a short term buffer, >> adding >> them to the end of the send queue when a full batch is >> acquired? I'm >> open to further discussion. >> >> Also, our message header contains a message length, so with >> care it >> should be possible to nest the framing, it's just that >> confusing the >> inner framing for the outer framing is a pitfall to beware. >> >> -r >> >> >> On 01/11/2015 11:03 PM, Tiru Srikantha wrote: >> >> The simple approach means you can lose messages if the >> Heka >> process dies >> before the ticker expires or the maxcount is hit, as >> well as the >> nested >> framing issues you raised. I'm probably mentally >> over-engineering this, >> though, and if it's good enough for you, then that's >> what you >> get. I'll >> send a PR with an implementation based on what you >> recommended - it >> should be fairly simple given those limitations. Thanks >> for the >> guidance. >> >> On Sun, Jan 11, 2015 at 6:52 PM, Rob Miller >> <[email protected] <mailto:[email protected]> >> <mailto:[email protected] <mailto:[email protected] >> >> >> <mailto:[email protected] >> <mailto:[email protected]> <mailto:[email protected] >> <mailto:[email protected]>>>> wrote: >> >> >> >> On 01/11/2015 12:34 AM, Tiru Srikantha wrote: >> >> Yeah, I'm looking at this as well. I don't >> like losing >> the bulk >> because >> that means it gets slow when your load gets >> high due to >> HTTP >> overhead. >> If I didn't care about bulk it'd just be a quick >> rewrite. The other >> problem I ran into around bulk operations on a >> buffered >> queue is >> that >> there are 4 points when you want to flush to >> the output: >> >> 1. Queued messages count equals count to send. >> 2. Queued message size with the new message >> added will >> exceed >> the max >> bulk message size, even if the count is less >> than max, >> due to large >> messages. >> 3. A timer expires to force a flush to the >> output >> target even if the >> count or max size hasn't been hit yet, to get >> the data >> into the >> output >> target in a timely manner. >> 4. Plugin is shutting down. >> >> I'm actually re-writing a lot of the >> plugins/buffered_output.go >> file in >> my local fork because of this and teasing out >> the "read >> the next >> record >> from the file pile" operation from the "send >> the next >> record" >> operation, >> so what I'm aiming for is something like a >> BulkBufferedOutput >> interface >> with a SendRecords(records [][]byte) method >> that must be >> implemented in >> lieu of BufferedOutput's SendRecord(record >> []byte) and >> some code >> that's >> shared between them to buffer new messages and >> read >> buffered >> messages. >> SendRecords would not advance the cursor until >> the bulk >> operation >> succeeded, as you might expect. >> >> >> I recommend reading my other message in this thread >> (http://is.gd/kzuhxs) for an alternate approach. I >> think >> you can >> achieve what you want with less effort, and better >> separation of >> concerns, by doing the batching *before* you pass >> data in >> to the >> BufferedOutput. Ultimately each buffer "record" is >> just a >> slice of >> bytes, the buffer doesn't need to know or care if >> that slice >> contains a single serialized message or an >> accumulated >> batch of them. >> >> I'll submit a PR once I finish. >> >> >> I always look forward to PRs, but I'll warn you >> that one >> that the >> approach described above will likely be rejected. >> I'm not >> very keen >> on introducing a separate buffering interface >> specifically for >> batches, when a simpler change can solve the same >> problems. >> >> -r >> >> >> >> >> >> >>
_______________________________________________ Heka mailing list [email protected] https://mail.mozilla.org/listinfo/heka

