A little late to the game (look what I get for not reading my email first thing in the morning!), but in response to Jon’s initial question as to whether this would conflict with METRON-322:
METRON-322, as currently being worked on, uses Tick Tuple settings to do periodic checks on internal queue ages of Writer Bolts that use internal queues (specifically, the BulkMessageWriterBolt), and flush those queues if older than a configured timeout. Note that these internal queues are not related to the input and output queues maintained for each Component by Storm. I can think of some delay scenarios in which it would be appropriate to use a Writer Bolt’s internal queue as part of the buffering mechanism for the delayed data. If you end up writing such a custom Bolt, I would be happy to share my work in progress on 322. I expect to be posting it as a PR in the next week or two anyway. That said, here are my thoughts on delays: It seems to me there are several scenarios of interest, including the one you mentioned (#1 below): 1. Constant-time delay of all messages in a stream, to allow some other stream to be a constant amount “ahead”. a. Example: enrichment of the first stream by an HBase Enricher being fed from a processed second stream 2. Delay of a stream until a readiness event happens a. Example: enrichment of the stream by a Threat Intel Enricher that lazily downloads a large data set from a slow source in response to certain messages, and we want to stall the stream until we’ve received it 3. Artificial delay simulating either of the above, or other behaviors, for testing purposes Each of these might want a different implementation, and will have a different interaction with the Storm back-pressure mechanism. Are there other use cases? Cheers, --Matt On 11/4/16, 10:16 AM, "Carolyn Duby" <[email protected]> wrote: You need to plan your Kafka retention and storm capacity to support spikes in traffic. You have to do this regardless of whether you delay or not. Thanks Carolyn On 11/4/16, 1:14 PM, "James Sirota" <[email protected]> wrote: >We need to get with the Storm team and see what the new back pressure features in Storm can support. My concern is what happens when you are operating on millions of messages per second and all of a sudden some of your messages are paused and replayed and your pipelines backs up into Kafka. That generally has not worked well > >04.11.2016, 07:57, "Carolyn Duby" <[email protected]>: >> Perhaps this has already been suggested. I have not been following the thread that closely but you could approach it this way: >> >> 1. Enrichment parsers consume immediately from the queue. >> 2. Parsers using an enrichment inspect the first message in the queue. If it is less than <delay> minutes old, the parser waits to consume it. After the delay the parser consumes the message. >> >> Thanks >> Carolyn >> >> On 11/4/16, 10:50 AM, "Nick Allen" <[email protected]> wrote: >> >>>> A sleep/wait cycle is another way to do this (simply delay everything in >>>> xyz topology by 30 seconds) which isn't as nice, but is also probably way >>>> less complicated to implement. >>> >>> It definitely has the advantage of being simpler to implement. But doing a >>> sleep in a message passing architecture like Storm, doesn't feel right to >>> me. Just my gut. >>> >>> On Fri, Nov 4, 2016 at 10:43 AM, [email protected] <[email protected]> wrote: >>> >>>> I think we've come to a better way to do this which is sort of a >>>> waitUntil(exists || timeout), but the issue is checking if something exists >>>> because it requires some sort of timestamp to avoid collisions (due to >>>> source port reuse, etc.). I don't know the best way to do this offhand. >>>> Here's a general scenario: >>>> >>>> 1) ssh syslog comes in -> parses -> insert to HBase {ip_login_src, >>>> src_port, ip_login_dst, ip_login_dst_hostname, account, timestamp, >>>> success_bool} via streaming enrichment >>>> >>>> 2) Network logs come in saying ip_src_addr logged into ip_dst_addr -> >>>> parses -> enriches (checks for whitelists, then if appropriate sets >>>> is_alert = T) -> indexes >>>> >>>> What I want is something for the network logs to get enriched with the ssh >>>> hbase data (almost exactly this use case >>>> <https://cwiki.apache.org/confluence/display/METRON/ >>>> 2016/06/16/Metron+Tutorial+-+Fundamentals+Part+6%3A+Streaming+Enrichment >>>> >), >>>> using ip_src_addr, src_port, ip_dst_addr, account, and maybe some sort of >>>> fuzzy timestamp? Then we can hash them all together and use it as a lookup >>>> key, but not sure how to handle timestamps without having 3 identifiers (1 >>>> for current time +- 3 mins, 1 for previous 3 minute segment, one for future >>>> 3 minute segment). >>>> >>>> A sleep/wait cycle is another way to do this (simply delay everything in >>>> xyz topology by 30 seconds) which isn't as nice, but is also probably way >>>> less complicated to implement. >>>> >>>> We're discussing this in IRC (soon to be slack ^.^) as well. >>>> >>>> Jon >>>> >>>> On Fri, Nov 4, 2016 at 10:28 AM Otto Fowler <[email protected]> >>>> wrote: >>>> >>>> So spout orchestration/gating? >>>> >>>> Spout checks for external state flag >>>> >>>> if CURRENT - process >>>> if UPDATING - wait >>>> >>>> With the ingesting agent sets flag to updating when running? >>>> >>>> On November 4, 2016 at 09:29:16, [email protected] ([email protected]) >>>> wrote: >>>> >>>> Is there a good method (i.e. something using Stellar/ZK) to implement an >>>> intentional processing delay to all tuples in a specific topology? I plan >>>> >>>> to do some custom enrichments, but the data used to do the enrichment *may* >>>> be >>>> >>>> ingested at roughly the same time the data to be enriched is (it also may >>>> not ever be sent). So I'd like to add a delay in my cluster that applies >>>> to certain parser topologies. >>>> >>>> I took a look around in the documentation and in JIRA and didn't find >>>> anything available or being worked on, but I did see that this may conflict >>>> with METRON-322. Essentially what I'm considering is a {sleep,delay,wait} >>>> stellar function, but it could also be a delay in a parser's kafka spout >>>> (much less of a fan of the second option). >>>> >>>> I'm looking for feedback on the best way to approach this, and I'd be happy >>>> to do the work myself (if necessary) when it gets to that point. I did >>>> consider implementing this delay upstream (in the sensor itself), but after >>>> looking in more detail it doesn't seem as feasible. >>>> >>>> Jon >>>> -- >>>> >>>> Jon >>>> >>>> -- >>>> >>>> Jon >>>> >>>> Sent from my mobile device >>> >>> -- >>> Nick Allen <[email protected]> > >------------------- >Thank you, > >James Sirota >PPMC- Apache Metron (Incubating) >jsirota AT apache DOT org >
