A little late to the game (look what I get for not reading my email first thing 
in the morning!), but in response to Jon’s initial question as to whether this 
would conflict with METRON-322:

METRON-322, as currently being worked on, uses Tick Tuple settings to do 
periodic checks on internal queue ages of Writer Bolts that use internal queues 
(specifically, the BulkMessageWriterBolt), and flush those queues if older than 
a configured timeout.
Note that these internal queues are not related to the input and output queues 
maintained for each Component by Storm.

I can think of some delay scenarios in which it would be appropriate to use a 
Writer Bolt’s internal queue as part of the buffering mechanism for the delayed 
data.  If you end up writing such a custom Bolt, I would be happy to share my 
work in progress on 322.  I expect to be posting it as a PR in the next week or 
two anyway.

That said, here are my thoughts on delays:
It seems to me there are several scenarios of interest, including the one you 
mentioned (#1 below):
1. Constant-time delay of all messages in a stream, to allow some other stream 
to be a constant amount “ahead”.
a. Example: enrichment of the first stream by an HBase Enricher being fed from 
a processed second stream
2. Delay of a stream until a readiness event happens 
a. Example: enrichment of the stream by a Threat Intel Enricher that lazily 
downloads a large data set from a slow source in response to certain messages, 
and we want to stall the stream until we’ve received it
3. Artificial delay simulating either of the above, or other behaviors, for 
testing purposes

Each of these might want a different implementation, and will have a different 
interaction with the Storm back-pressure mechanism.
Are there other use cases?

Cheers,
--Matt 


On 11/4/16, 10:16 AM, "Carolyn Duby" <[email protected]> wrote:

    You need to plan your Kafka retention and storm capacity to support spikes 
in traffic.  You have to do this regardless of whether you delay or not.
    
    Thanks
    Carolyn
    
    
    
    
    On 11/4/16, 1:14 PM, "James Sirota" <[email protected]> wrote:
    
    >We need to get with the Storm team and see what the new back pressure 
features in Storm can support.  My concern is what happens when you are 
operating on millions of messages per second and all of a sudden some of your 
messages are paused and replayed and your pipelines backs up into Kafka. That 
generally has not worked well 
    >
    >04.11.2016, 07:57, "Carolyn Duby" <[email protected]>:
    >> Perhaps this has already been suggested. I have not been following the 
thread that closely but you could approach it this way:
    >>
    >> 1. Enrichment parsers consume immediately from the queue.
    >> 2. Parsers using an enrichment inspect the first message in the queue. 
If it is less than <delay> minutes old, the parser waits to consume it. After 
the delay the parser consumes the message.
    >>
    >> Thanks
    >> Carolyn
    >>
    >> On 11/4/16, 10:50 AM, "Nick Allen" <[email protected]> wrote:
    >>
    >>>>  A sleep/wait cycle is another way to do this (simply delay everything 
in
    >>>>  xyz topology by 30 seconds) which isn't as nice, but is also probably 
way
    >>>>  less complicated to implement.
    >>>
    >>> It definitely has the advantage of being simpler to implement. But 
doing a
    >>> sleep in a message passing architecture like Storm, doesn't feel right 
to
    >>> me. Just my gut.
    >>>
    >>> On Fri, Nov 4, 2016 at 10:43 AM, [email protected] <[email protected]> 
wrote:
    >>>
    >>>>  I think we've come to a better way to do this which is sort of a
    >>>>  waitUntil(exists || timeout), but the issue is checking if something 
exists
    >>>>  because it requires some sort of timestamp to avoid collisions (due to
    >>>>  source port reuse, etc.). I don't know the best way to do this 
offhand.
    >>>>  Here's a general scenario:
    >>>>
    >>>>  1) ssh syslog comes in -> parses -> insert to HBase {ip_login_src,
    >>>>  src_port, ip_login_dst, ip_login_dst_hostname, account, timestamp,
    >>>>  success_bool} via streaming enrichment
    >>>>
    >>>>  2) Network logs come in saying ip_src_addr logged into ip_dst_addr ->
    >>>>  parses -> enriches (checks for whitelists, then if appropriate sets
    >>>>  is_alert = T) -> indexes
    >>>>
    >>>>  What I want is something for the network logs to get enriched with 
the ssh
    >>>>  hbase data (almost exactly this use case
    >>>>  <https://cwiki.apache.org/confluence/display/METRON/
    >>>>  
2016/06/16/Metron+Tutorial+-+Fundamentals+Part+6%3A+Streaming+Enrichment
    >>>>  >),
    >>>>  using ip_src_addr, src_port, ip_dst_addr, account, and maybe some 
sort of
    >>>>  fuzzy timestamp? Then we can hash them all together and use it as a 
lookup
    >>>>  key, but not sure how to handle timestamps without having 3 
identifiers (1
    >>>>  for current time +- 3 mins, 1 for previous 3 minute segment, one for 
future
    >>>>  3 minute segment).
    >>>>
    >>>>  A sleep/wait cycle is another way to do this (simply delay everything 
in
    >>>>  xyz topology by 30 seconds) which isn't as nice, but is also probably 
way
    >>>>  less complicated to implement.
    >>>>
    >>>>  We're discussing this in IRC (soon to be slack ^.^) as well.
    >>>>
    >>>>  Jon
    >>>>
    >>>>  On Fri, Nov 4, 2016 at 10:28 AM Otto Fowler <[email protected]>
    >>>>  wrote:
    >>>>
    >>>>  So spout orchestration/gating?
    >>>>
    >>>>  Spout checks for external state flag
    >>>>
    >>>>  if CURRENT - process
    >>>>  if UPDATING - wait
    >>>>
    >>>>  With the ingesting agent sets flag to updating when running?
    >>>>
    >>>>  On November 4, 2016 at 09:29:16, [email protected] ([email protected])
    >>>>  wrote:
    >>>>
    >>>>  Is there a good method (i.e. something using Stellar/ZK) to implement 
an
    >>>>  intentional processing delay to all tuples in a specific topology? I 
plan
    >>>>
    >>>>  to do some custom enrichments, but the data used to do the enrichment 
*may*
    >>>>  be
    >>>>
    >>>>  ingested at roughly the same time the data to be enriched is (it also 
may
    >>>>  not ever be sent). So I'd like to add a delay in my cluster that 
applies
    >>>>  to certain parser topologies.
    >>>>
    >>>>  I took a look around in the documentation and in JIRA and didn't find
    >>>>  anything available or being worked on, but I did see that this may 
conflict
    >>>>  with METRON-322. Essentially what I'm considering is a 
{sleep,delay,wait}
    >>>>  stellar function, but it could also be a delay in a parser's kafka 
spout
    >>>>  (much less of a fan of the second option).
    >>>>
    >>>>  I'm looking for feedback on the best way to approach this, and I'd be 
happy
    >>>>  to do the work myself (if necessary) when it gets to that point. I did
    >>>>  consider implementing this delay upstream (in the sensor itself), but 
after
    >>>>  looking in more detail it doesn't seem as feasible.
    >>>>
    >>>>  Jon
    >>>>  --
    >>>>
    >>>>  Jon
    >>>>
    >>>>  --
    >>>>
    >>>>  Jon
    >>>>
    >>>>  Sent from my mobile device
    >>>
    >>> --
    >>> Nick Allen <[email protected]>
    >
    >------------------- 
    >Thank you,
    >
    >James Sirota
    >PPMC- Apache Metron (Incubating)
    >jsirota AT apache DOT org
    >
    


Reply via email to