Thanks for the feedback, Koji - I appreciate it.  I will submit a Jira for
the below and implement an alternate solution for the potential race
condition.

Thanks!

Chris Lundeberg




On Mon, Aug 19, 2019 at 8:16 PM Koji Kawamura <[email protected]>
wrote:

> Hi Chris,
>
> You are correct, Wait processor has to rely on an attribute within a
> FlowFile to determine target signal count.
> I think the idea of making Wait be able to fetch target signal count
> from DistributedMapCache is a nice improvement.
>
> Please create a JIRA for further discussion. I guess we will need to
> add a property such as "Fetch Target Signal Count from Cache Service",
> boolean, defaults to false. If enabled, Wait processor treats the
> configured "Target Signal Count" value as a key in the
> DistributedMapCache, then fetch the value to use as a target count. In
> case the key is not found, the Wait processor transfer the FlowFile to
> wait relationship.
> https://issues.apache.org/jira/projects/NIFI
>
> Adding FetchDistributedMapCache right before Wait provides the same
> result. But if Wait processor can fetch it, we can reduce the number
> of fetch operation required to process multiple FlowFiles at Wait.
>
> To avoid the race condition that Wait processes FlowFiles before the
> counting part finishes, I'd use two keys at the counting part.
> Temporary one to accumulate the count, and the final one (the signal
> identifier), once the counting finished.
>
> Thanks,
> Koji
>
> On Tue, Aug 20, 2019 at 1:08 AM Chris Lundeberg <[email protected]>
> wrote:
> >
> > Hi all,
> >
> > I wanted to throw out a question to the larger community before I went
> down
> > a different path.  I might be looking at this wrong or making
> assumptions I
> > shouldn't.
> >
> > Recently I started working with the Wait and Notify processors a bit
> more.
> > I have a new flow which is a bit more batch in nature and these
> processors
> > seem to work nicely for being able to intelligently wait for chunks or
> > files to be processed, before moving on to the next step.  I have one
> > specific pattern that I haven't solved with the inbuilt functionality,
> > which is:
> >
> > 1. I have an incoming zip file from SFTP.  That zip contains n-number of
> > files within and each of those files need to be split in some way.  I
> won't
> > know the number of files within the zip.
> >
> > 2.  After they have been split correctly, a few transformations run on
> each
> > of the files.
> >
> > 3.  At the end of the transformation process, these various files will be
> > merged into 5 specific outbound file formats, to be sent to an outbound
> > SFTP server.  *Note*: I am not splitting and merging the same files back
> > together (I have looked at the fragment index stuff).
> >
> > I found a nice solution for being able to count the number of flowfiles
> > after the split, so I know exactly how many files should be transformed
> and
> > thus I know what my "Target Signal Count" should be within the Wait
> > processor.  At the moment I have a counting process to (1) Fetch
> > Distributed MapCache, (2) Replace text (incrementing the count number
> from
> > the fetch, if a number is found), and (3) Put Distributed MapCache.  This
> > process works as expected and I have a valid key/value pair in the
> MapCache
> > for that particular process (I create a BachID so its very specific for
> > each pull from the SFTP processor).  The only way I know how to
> > intelligently provide that information back to the Wait processor is to
> > pull that value with a Fetch Distributed MapCache right before the
> flowfile
> > enters the Wait processor.  In theory each flowfile waiting would have
> the
> > same attribute from the Fetch process and each attribute would be the
> same
> > count.  However this doesn't always work because there could exist a
> > condition where the transformations happen before the counting has been
> > done and published to the MapCache Sever.  So in this scenario you end up
> > with some flowfiles having a lower count than others or just not having
> the
> > "true" count.  Now, I can put additional gates in place such as trying to
> > slow down the flowfiles at specific sections to try and allow the
> counting
> > to be done first, but its not a perfect science.
> >
> > I thought ideally it would be good to allow the Wait processor to pull
> > directly from the MapCache if I could provide the key it would need for a
> > lookup, within the "Target Signal Count" field.  It could use the signal
> > coming from Notify to say "I have X number of Notify, for this signal"
> and
> > use the count value I have set in the MapCache to say "This is the total
> > number of files I need to see from Notify, for that same signal". This
> way,
> > I could run the Wait processor every few seconds and the chances of
> running
> > into a miscount condition would be far less.  Is there any way currently
> > where this processor could pull directly from the cache, or does it have
> to
> > rely on an attribute within the flowfile itself?  I think it's the
> latter,
> > but I want to make sure someone doesn't have a better idea.
> >
> > Sorry for the long message. Thanks!
> >
> >
> > Chris Lundeberg
>

Reply via email to