Hi Chris, You are correct, Wait processor has to rely on an attribute within a FlowFile to determine target signal count. I think the idea of making Wait be able to fetch target signal count from DistributedMapCache is a nice improvement.
Please create a JIRA for further discussion. I guess we will need to add a property such as "Fetch Target Signal Count from Cache Service", boolean, defaults to false. If enabled, Wait processor treats the configured "Target Signal Count" value as a key in the DistributedMapCache, then fetch the value to use as a target count. In case the key is not found, the Wait processor transfer the FlowFile to wait relationship. https://issues.apache.org/jira/projects/NIFI Adding FetchDistributedMapCache right before Wait provides the same result. But if Wait processor can fetch it, we can reduce the number of fetch operation required to process multiple FlowFiles at Wait. To avoid the race condition that Wait processes FlowFiles before the counting part finishes, I'd use two keys at the counting part. Temporary one to accumulate the count, and the final one (the signal identifier), once the counting finished. Thanks, Koji On Tue, Aug 20, 2019 at 1:08 AM Chris Lundeberg <[email protected]> wrote: > > Hi all, > > I wanted to throw out a question to the larger community before I went down > a different path. I might be looking at this wrong or making assumptions I > shouldn't. > > Recently I started working with the Wait and Notify processors a bit more. > I have a new flow which is a bit more batch in nature and these processors > seem to work nicely for being able to intelligently wait for chunks or > files to be processed, before moving on to the next step. I have one > specific pattern that I haven't solved with the inbuilt functionality, > which is: > > 1. I have an incoming zip file from SFTP. That zip contains n-number of > files within and each of those files need to be split in some way. I won't > know the number of files within the zip. > > 2. After they have been split correctly, a few transformations run on each > of the files. > > 3. At the end of the transformation process, these various files will be > merged into 5 specific outbound file formats, to be sent to an outbound > SFTP server. *Note*: I am not splitting and merging the same files back > together (I have looked at the fragment index stuff). > > I found a nice solution for being able to count the number of flowfiles > after the split, so I know exactly how many files should be transformed and > thus I know what my "Target Signal Count" should be within the Wait > processor. At the moment I have a counting process to (1) Fetch > Distributed MapCache, (2) Replace text (incrementing the count number from > the fetch, if a number is found), and (3) Put Distributed MapCache. This > process works as expected and I have a valid key/value pair in the MapCache > for that particular process (I create a BachID so its very specific for > each pull from the SFTP processor). The only way I know how to > intelligently provide that information back to the Wait processor is to > pull that value with a Fetch Distributed MapCache right before the flowfile > enters the Wait processor. In theory each flowfile waiting would have the > same attribute from the Fetch process and each attribute would be the same > count. However this doesn't always work because there could exist a > condition where the transformations happen before the counting has been > done and published to the MapCache Sever. So in this scenario you end up > with some flowfiles having a lower count than others or just not having the > "true" count. Now, I can put additional gates in place such as trying to > slow down the flowfiles at specific sections to try and allow the counting > to be done first, but its not a perfect science. > > I thought ideally it would be good to allow the Wait processor to pull > directly from the MapCache if I could provide the key it would need for a > lookup, within the "Target Signal Count" field. It could use the signal > coming from Notify to say "I have X number of Notify, for this signal" and > use the count value I have set in the MapCache to say "This is the total > number of files I need to see from Notify, for that same signal". This way, > I could run the Wait processor every few seconds and the chances of running > into a miscount condition would be far less. Is there any way currently > where this processor could pull directly from the cache, or does it have to > rely on an attribute within the flowfile itself? I think it's the latter, > but I want to make sure someone doesn't have a better idea. > > Sorry for the long message. Thanks! > > > Chris Lundeberg
