On 01/12/2015 11:52 AM, Tiru Srikantha wrote:
Yeah, the more I think about it the more I'm sure I was overengineering. Your point was a good one re: complexity. I'm not sure why we'd be framing the messages anyway, though. Outer/inner framing actually has no purpose in this context since the records are JSON entries and the format delimits the individual records. If we were doing binary buffering, I'd say that more work was needed to ensure the lengths were read properly, but for this specific plugin it wouldn't be a problem unless I totally misunderstand framing.
No, you're understanding correctly, I'm just thinking in the general case. Our plan is to make the buffering functionality in BufferedOutput available to all outputs, so any batching support that's added needs to be able to support generic data streams. We have to make sure that if someone decides to buffer a batch of framed, protobuf encoded Heka messages that the buffer's batch framing and the individual message framing don't interfere with each other.
I implemented it (https://github.com/highlyunavailable/heka/compare/feature/bufferedelasticsearch?diff=split&name=feature%2Fbufferedelasticsearch) and it works fine against a test ES instance without framing. I tried killing the instance and it errored as you might expect, then immediately started sending data when the instance came back up (including persisting the data through a Heka restart), so I consider it a success pending a bit more testing and cleaning up the unused variables. This also got rid of the receiver/committer goroutines from `Init()` and changed the behavior from "block if the buffer is full and we haven't sent the buffer yet" to "write the buffer to disk and send it when we have time" which I consider a better/safer behavior than leaving packs in the queue until there's space to process them.
Notes from a quick peek:
* You don't need an `if` at the end of QueueRecord, just `return b.QueueBytes(msgBytes)` should be fine. * If you're storing the config on the ElasticSearchOutput struct, you probably don't need to copy all of the attributes from the config struct to the output struct. Is there a reason to set `o.flushInterval = o.conf.FlushInterval` when you can just refer to `o.conf.FlushInterval` anywhere? * You're never checking the results of the QueueBytes call, and instead you're checking if the result of the or.Encode call is QueueIsFull, which doesn't really make much sense. * The queueFull seems a bit off, it should be dealing w/ byte slices, not a single pack. Also, a single success or failure might represent an entire batch of messages, not just a single one, w.r.t. your processMessageCount and dropMessageCount.
Also, the Processed Message Count seems to stay empty in the dashboard for the ES output - what do I need to implement to have it properly show how many messages it has sent to ES? I can't find anything in https://hekad.readthedocs.org/en/latest/developing/plugin.html about what interfaces need to be implemented or where I need to register to have that happen. Just adding a ReportMessage method to satisfy pipeline.ReportingPlugin didn't seem to do it.
Yes, this is a bit awkward. Right now you have to implement it on a case by case basis, when really Heka should be doing at least some of this accounting for you. We have an issue open for it: https://github.com/mozilla-services/heka/issues/918 Using ReportMessage is the way to make it work, however. You can see how it's supposed to work in the SandboxFilter implementation: https://github.com/mozilla-services/heka/blob/dev/sandbox/plugins/sandbox_filter.go#L120 Hope this helps, -r
On Mon, Jan 12, 2015 at 11:13 AM, Rob Miller <[email protected] <mailto:[email protected]>> wrote: This is true, but this is true no matter how the buffering is engineered. There might be messages sitting on the router's channel, or the output matcher's channel, or the output's channel, all of which would be lost if Heka crashes. I've imagined the buffering to be protecting against downtime and/or slowness of the downstream destination, not as a delivery guarantee mechanism. I wish I could tell you Heka guarantees message delivery, but it doesn't. I'd love to add support, but it would need to be considered within the context of the whole pipeline, not just a single point. That being said, harm reduction is reasonable, and I'm not dead set against being able to buffer partial batches. But I still shy away from the idea of having a separate SendRecords function for bulk sending, if possible. Maybe a buffer that was set up for batching could accumulate individual records to a short term buffer, adding them to the end of the send queue when a full batch is acquired? I'm open to further discussion. Also, our message header contains a message length, so with care it should be possible to nest the framing, it's just that confusing the inner framing for the outer framing is a pitfall to beware. -r On 01/11/2015 11:03 PM, Tiru Srikantha wrote: The simple approach means you can lose messages if the Heka process dies before the ticker expires or the maxcount is hit, as well as the nested framing issues you raised. I'm probably mentally over-engineering this, though, and if it's good enough for you, then that's what you get. I'll send a PR with an implementation based on what you recommended - it should be fairly simple given those limitations. Thanks for the guidance. On Sun, Jan 11, 2015 at 6:52 PM, Rob Miller <[email protected] <mailto:[email protected]> <mailto:[email protected] <mailto:[email protected]>>> wrote: On 01/11/2015 12:34 AM, Tiru Srikantha wrote: Yeah, I'm looking at this as well. I don't like losing the bulk because that means it gets slow when your load gets high due to HTTP overhead. If I didn't care about bulk it'd just be a quick rewrite. The other problem I ran into around bulk operations on a buffered queue is that there are 4 points when you want to flush to the output: 1. Queued messages count equals count to send. 2. Queued message size with the new message added will exceed the max bulk message size, even if the count is less than max, due to large messages. 3. A timer expires to force a flush to the output target even if the count or max size hasn't been hit yet, to get the data into the output target in a timely manner. 4. Plugin is shutting down. I'm actually re-writing a lot of the plugins/buffered_output.go file in my local fork because of this and teasing out the "read the next record from the file pile" operation from the "send the next record" operation, so what I'm aiming for is something like a BulkBufferedOutput interface with a SendRecords(records [][]byte) method that must be implemented in lieu of BufferedOutput's SendRecord(record []byte) and some code that's shared between them to buffer new messages and read buffered messages. SendRecords would not advance the cursor until the bulk operation succeeded, as you might expect. I recommend reading my other message in this thread (http://is.gd/kzuhxs) for an alternate approach. I think you can achieve what you want with less effort, and better separation of concerns, by doing the batching *before* you pass data in to the BufferedOutput. Ultimately each buffer "record" is just a slice of bytes, the buffer doesn't need to know or care if that slice contains a single serialized message or an accumulated batch of them. I'll submit a PR once I finish. I always look forward to PRs, but I'll warn you that one that the approach described above will likely be rejected. I'm not very keen on introducing a separate buffering interface specifically for batches, when a simpler change can solve the same problems. -r
_______________________________________________ Heka mailing list [email protected] https://mail.mozilla.org/listinfo/heka

