I think you've done a good job of organizing the use cases that I have considered, and even came up with a new one seems appears valid.
Jon On Fri, Nov 4, 2016, 18:01 Matt Foley <[email protected]> wrote: > A little late to the game (look what I get for not reading my email first > thing in the morning!), but in response to Jon’s initial question as to > whether this would conflict with METRON-322: > > METRON-322, as currently being worked on, uses Tick Tuple settings to do > periodic checks on internal queue ages of Writer Bolts that use internal > queues (specifically, the BulkMessageWriterBolt), and flush those queues if > older than a configured timeout. > Note that these internal queues are not related to the input and output > queues maintained for each Component by Storm. > > I can think of some delay scenarios in which it would be appropriate to > use a Writer Bolt’s internal queue as part of the buffering mechanism for > the delayed data. If you end up writing such a custom Bolt, I would be > happy to share my work in progress on 322. I expect to be posting it as a > PR in the next week or two anyway. > > That said, here are my thoughts on delays: > It seems to me there are several scenarios of interest, including the one > you mentioned (#1 below): > 1. Constant-time delay of all messages in a stream, to allow some other > stream to be a constant amount “ahead”. > a. Example: enrichment of the first stream by an HBase Enricher being fed > from a processed second stream > 2. Delay of a stream until a readiness event happens > a. Example: enrichment of the stream by a Threat Intel Enricher that > lazily downloads a large data set from a slow source in response to certain > messages, and we want to stall the stream until we’ve received it > 3. Artificial delay simulating either of the above, or other behaviors, > for testing purposes > > Each of these might want a different implementation, and will have a > different interaction with the Storm back-pressure mechanism. > Are there other use cases? > > Cheers, > --Matt > > > On 11/4/16, 10:16 AM, "Carolyn Duby" <[email protected]> wrote: > > You need to plan your Kafka retention and storm capacity to support > spikes in traffic. You have to do this regardless of whether you delay or > not. > > Thanks > Carolyn > > > > > On 11/4/16, 1:14 PM, "James Sirota" <[email protected]> wrote: > > >We need to get with the Storm team and see what the new back pressure > features in Storm can support. My concern is what happens when you are > operating on millions of messages per second and all of a sudden some of > your messages are paused and replayed and your pipelines backs up into > Kafka. That generally has not worked well > > > >04.11.2016, 07:57, "Carolyn Duby" <[email protected]>: > >> Perhaps this has already been suggested. I have not been following > the thread that closely but you could approach it this way: > >> > >> 1. Enrichment parsers consume immediately from the queue. > >> 2. Parsers using an enrichment inspect the first message in the > queue. If it is less than <delay> minutes old, the parser waits to consume > it. After the delay the parser consumes the message. > >> > >> Thanks > >> Carolyn > >> > >> On 11/4/16, 10:50 AM, "Nick Allen" <[email protected]> wrote: > >> > >>>> A sleep/wait cycle is another way to do this (simply delay > everything in > >>>> xyz topology by 30 seconds) which isn't as nice, but is also > probably way > >>>> less complicated to implement. > >>> > >>> It definitely has the advantage of being simpler to implement. But > doing a > >>> sleep in a message passing architecture like Storm, doesn't feel > right to > >>> me. Just my gut. > >>> > >>> On Fri, Nov 4, 2016 at 10:43 AM, [email protected] < > [email protected]> wrote: > >>> > >>>> I think we've come to a better way to do this which is sort of a > >>>> waitUntil(exists || timeout), but the issue is checking if > something exists > >>>> because it requires some sort of timestamp to avoid collisions > (due to > >>>> source port reuse, etc.). I don't know the best way to do this > offhand. > >>>> Here's a general scenario: > >>>> > >>>> 1) ssh syslog comes in -> parses -> insert to HBase > {ip_login_src, > >>>> src_port, ip_login_dst, ip_login_dst_hostname, account, > timestamp, > >>>> success_bool} via streaming enrichment > >>>> > >>>> 2) Network logs come in saying ip_src_addr logged into > ip_dst_addr -> > >>>> parses -> enriches (checks for whitelists, then if appropriate > sets > >>>> is_alert = T) -> indexes > >>>> > >>>> What I want is something for the network logs to get enriched > with the ssh > >>>> hbase data (almost exactly this use case > >>>> <https://cwiki.apache.org/confluence/display/METRON/ > >>>> > 2016/06/16/Metron+Tutorial+-+Fundamentals+Part+6%3A+Streaming+Enrichment > >>>> >), > >>>> using ip_src_addr, src_port, ip_dst_addr, account, and maybe > some sort of > >>>> fuzzy timestamp? Then we can hash them all together and use it > as a lookup > >>>> key, but not sure how to handle timestamps without having 3 > identifiers (1 > >>>> for current time +- 3 mins, 1 for previous 3 minute segment, one > for future > >>>> 3 minute segment). > >>>> > >>>> A sleep/wait cycle is another way to do this (simply delay > everything in > >>>> xyz topology by 30 seconds) which isn't as nice, but is also > probably way > >>>> less complicated to implement. > >>>> > >>>> We're discussing this in IRC (soon to be slack ^.^) as well. > >>>> > >>>> Jon > >>>> > >>>> On Fri, Nov 4, 2016 at 10:28 AM Otto Fowler < > [email protected]> > >>>> wrote: > >>>> > >>>> So spout orchestration/gating? > >>>> > >>>> Spout checks for external state flag > >>>> > >>>> if CURRENT - process > >>>> if UPDATING - wait > >>>> > >>>> With the ingesting agent sets flag to updating when running? > >>>> > >>>> On November 4, 2016 at 09:29:16, [email protected] ( > [email protected]) > >>>> wrote: > >>>> > >>>> Is there a good method (i.e. something using Stellar/ZK) to > implement an > >>>> intentional processing delay to all tuples in a specific > topology? I plan > >>>> > >>>> to do some custom enrichments, but the data used to do the > enrichment *may* > >>>> be > >>>> > >>>> ingested at roughly the same time the data to be enriched is (it > also may > >>>> not ever be sent). So I'd like to add a delay in my cluster that > applies > >>>> to certain parser topologies. > >>>> > >>>> I took a look around in the documentation and in JIRA and didn't > find > >>>> anything available or being worked on, but I did see that this > may conflict > >>>> with METRON-322. Essentially what I'm considering is a > {sleep,delay,wait} > >>>> stellar function, but it could also be a delay in a parser's > kafka spout > >>>> (much less of a fan of the second option). > >>>> > >>>> I'm looking for feedback on the best way to approach this, and > I'd be happy > >>>> to do the work myself (if necessary) when it gets to that point. > I did > >>>> consider implementing this delay upstream (in the sensor > itself), but after > >>>> looking in more detail it doesn't seem as feasible. > >>>> > >>>> Jon > >>>> -- > >>>> > >>>> Jon > >>>> > >>>> -- > >>>> > >>>> Jon > >>>> > >>>> Sent from my mobile device > >>> > >>> -- > >>> Nick Allen <[email protected]> > > > >------------------- > >Thank you, > > > >James Sirota > >PPMC- Apache Metron (Incubating) > >jsirota AT apache DOT org > > > > > > -- Jon Sent from my mobile device
