Sounds very promising, thank you!! I'll share what I find out :) Are there other group-related use cases? Maybe some non-incremental statistical measures?
Regards, Matt Sent from my iPhone > On Nov 6, 2015, at 9:48 PM, Michael Moser <[email protected]> wrote: > > Matt, > > There is the MonitorActivity processor, which "Monitors the flow for > activity and sends out an indicator when the flow has not had any data for > some specified amount of time and again when the flow's activity is > restored". You could look at how MonitorActivity is coded to get ideas for > how your ReservoirSampling processor can do what you need. > > -- Mike > > > On Fri, Nov 6, 2015 at 11:49 AM, Matthew Burgess <[email protected]> > wrote: > >> No that makes sense, thanks much! >> >> So for my case, I'm thinking I'd want another attribute from GetFile called >> "lastInStream" or something? It would be set once processing of the current >> directory is complete (for the time being), and reset each time the >> onTrigger is called. At that point it's really more of a "lastInBatch", so >> maybe instead I could use the batch size somehow as a hint to the >> ReservoirSampling processor that the current reservoir is ready to send >> along? The use case is a kind of burst processing (or per-batch >> filtering), >> where FlowFiles are available in "groups", where I could sample from the >> incoming group with equal probability to give a smaller output group. >> >> >> From: Joe Witt <[email protected]> >> Reply-To: <[email protected]> >> Date: Friday, November 6, 2015 at 11:38 AM >> To: <[email protected]> >> Subject: Re: End of stream? >> >> Matt, >> >> For processors in the middle of the flow the null check is important >> for race conditions where it is told it can run but by the time it >> does there are no flowfiles left. The framework though in general >> will avoid this because it is checking if there is work to do. So, in >> short you can't use that mechanism to know there are no items left to >> process. >> >> The only way to know that a given flowfile was the last in a bunch >> would be for that fact to be an attribute on a given flow file. >> >> There is really no concept of an end of stream so to speak from a >> processor perspective. Processors are either running on not running. >> You can, as i mentioned before though, use attributes of flowfiles to >> annotate their relative position in a stream. >> >> Does that help explain it at all or did I make it more confusing? >> >> Thanks >> Joe >> >> On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <[email protected]> >> wrote: >>> Does NiFi have the concept of an "end of stream" or is it designed to >> pretty >>> much always be running? For example if I use a GetFile processor >> pointing at >>> a single directory (with remove files = true), once all the files have >> been >>> processed, can downstream processors know that? >>> >>> I'm working on a ReservoirSampling processor, and I have it successfully >>> building the reservoir from all incoming FlowFiles. However it never >> gets to >>> the logic that sends the sampled FlowFiles to the downstream processor >> (just >>> a PutFile at this point). I have the logic in a block like: >>> >>> FlowFile flowFile = session.get(); >>> if(flowFile == null) { >>> // send reservoir >>> } >>> else { >>> // build reservoir >>> } >>> >>> But the if-clause never gets entered. Is there a different approach >> and/or >>> am I misunderstanding how the data flow works? >>> >>> Thanks in advance, >>> Matt >> >> >> >>
