Hi Mark, Thanks for the answer. You're right, I was going to use ProcessSession.get(FlowFileFilter); And I considered that I would set an expiration date on the flow file in case a standalone instance is not pulling data to ensure that the queue is not filling up. But I didn't think about the data being swapped out and, you're right, we probably don't want to change that.
The HTTP approach sounds indeed like a very good option for my use case. Thanks for mentioning it. Pierre Le sam. 15 sept. 2018 à 15:40, Mark Payne <[email protected]> a écrit : > Hey Pierre, > > I'm not sure that this is the best route to go down. There are a couple of > problems that I think > you will run into. The most important will be what happens when the data > going to that Output Port > queues up into a large queue? If a NiFi instance then requests data, I > presume that the Output Port > would determine which FlowFiles to send by calling > ProcessSession.get(FlowFileFilter); > But currently, if I'm not mistaken, that method only iterates over the > data in the 'active' queue, not > data that is swapped out. As a result, you could have the active queue > filled up with data for nodes > that are not pulling, and that would prevent any node from pulling data. > > Even if we were to change it so that the get(FlowFileFilter) method runs > through swapped out data, > the expense of doing that would likely be cost-prohibitive for this > approach, as the disk I/O to constantly > scan the swap files would be too expensive. To make that approach feasible > you'd probably also have to > change the Swap File format so that its "summary" also contains a mapping > of S2S.host to count of FlowFile > for that host. And this is already getting way beyond the scope I think of > what you want to do here. > > Additionally, I feel like where this concept is heading is difficult to > explain and is designed for a rather > specific use case, because it starts to make this into a sort of > quasi-pub-sub mechanism but not a true pub/sub. > > Rather, I would propose that when the desire is to push data to a specific > NiFi node, the preferred approach is > not ot use Site-to-Site (as that's intended to be point-to-point between > nifi instnace/clusters for well-established > endpoints). Typically, the approach that is taken for a scenario like this > would be to have a ListenHTTP processor > run on each of the instances. They can push to the central instance using > Site-to-Site. Then, rather than using an > Output Port, you'd use a PostHTTP processor to push the data back. > PostHTTP already supports Expression Language > for the URL, and it has a "Send as FlowFile" option that properly packages > the FlowFiles together with their attributes. > It also handles batching together small FlowFiles, supports two-phase > commit to minimize possibility of data duplication, etc. > This was the method that was used before Site-to-Site was added, and > worked quite well for a long time. Site-to-Site was > added for convenience so that users could just point to a given URL and be > provided the list of available ports and have it > auto-load balance across the cluster (if applicable). But in your use > case, neither of these really benefit you because you don't > know the URL to send to a priori and you already know exactly which node > to push to. > > Thanks > -Mark > > > > > > On Sep 15, 2018, at 9:05 AM, Pierre Villard <[email protected]> > wrote: > > > > Hi all, > > > > Here is my use case: I've multiple NiFi standalone instances deployed > over > > multiple sites (that could be MiNiFi instances) and a central NiFi > > cluster. The standalone instances generate data, the data is sent to the > > central cluster to be parsed and enriched before being sent back to the > > standalone instances. The data needs to go back where it's been > generated. > > > > At the moment, since RPG cannot be configured using EL and FFs > attributes, > > you need to have one port (or one RPG if the RPG is on central NiFi's > side) > > per standalone instance. And I don't think that changing the RPG to > handle > > FFs attributes scope would be a good idea in terms of implementation. > > > > Instead I'd like to change the S2S protocol to allow RPG pulling based on > > FFs attributes. > > > > On the standalone instances, we would have: > > Workflow generating data => RPG => workflow receiving enriched data from > > central cluster > > > > On the NiFi cluster, we would have: > > input port => workflow parsing and enriching data => output port > > > > The idea would be that, when configuring an output port in the RPG, it'd > be > > possible to enable "host based pulling" so that only flow files having > the > > attribute 's2s.host' matching the host of the instance hosting the RPG > > would be pulled. (the s2s.port attribute is already set when data is sent > > through S2S). > > > > I already started working on that approach and even though I don't have > > something fully functional yet, I wanted to discuss it here to be sure > this > > would be interesting for the wider community and, also, if I'm not > missing > > something obvious that would prevent it. > > > > Happy to file a JIRA if that sounds interesting. > > > > Thanks, > > Pierre > >
