Mark, In regards to your last comments > Not really related, but on the note of things you may not realize, with > that code snippet :) > If you have access to update the processor mentioned here, you should > avoid using session.putAttribute many times. > Under the hood, in order to maintain object immutability it has to create > a new FlowFile object (and a new HashMap of all attributes!) > for every call to putAttribute. So if there are 100 attributes that match > that’s potentially a huge amount of garbage getting created.
I noticed that some of the split processors SplitJson, SplitXml, and SplitAvro all have loops to create a new flow file for each split and it calls putAttribute more than once (in order to populate the split attributes FRAGMENT_ID, FRAGMENT_INDEX etc) for each flow file created. Would this all suffer from "a huge amount of garbage getting created" since putAttribute is called multiple times for each iteration of the loop? On Wed, May 8, 2024 at 5:27 PM Michael Moser <moser...@gmail.com> wrote: > Oh yeah, I do love this behavior of ProcessSession. And thanks for the > tip, it's easy to forget that there are efficiencies to be gained by using > different parts of an API. > > -- Mke > > > On Wed, May 8, 2024 at 11:04 AM Mark Payne <marka...@hotmail.com> wrote: > > > Yeah, that was something that kinda flew under the radar. Definitely > > improved the API. > > > > Not really related, but on the note of things you may not realize, with > > that code snippet :) > > If you have access to update the processor mentioned here, you should > > avoid using session.putAttribute many times. > > Under the hood, in order to maintain object immutability it has to create > > a new FlowFile object (and a new HashMap of all attributes!) > > for every call to putAttribute. So if there are 100 attributes that match > > that’s potentially a huge amount of garbage getting created. Instead, > > you could just use: > > > > ``` > > final Map<String, String> attributes = new HashMap<>(); > > flowFile.getAttributes().forEach( (key, value) -> { > > if (key.startsWith(“foo”)) { > > attributes.put(“original-“ + key, value); > > } > > } > > > > flowFIle = session.putAllAttributes(flowFile, attributes); > > ``` > > > > Thanks > > -Mark > > > > > > > > > On May 8, 2024, at 10:30 AM, Michael Moser <moser...@gmail.com> wrote: > > > > > > Wow, thanks for this information! Just last week I saw code that > > modified > > > attributes in a stream: > > > > > > flowFile.getAttributes().entrySet().stream().filter(e -> > > > e.getKey().startsWith("foo")) > > > .forEach(e -> session.putAttribute(flowFile, "original-" + e.getKey, > > > e.getValue())); > > > > > > and I wondered how that could possibly work since the return value of > > > session.putAttribute is ignored! Now I know. > > > > > > -- Mike > > > > > > On Tue, May 7, 2024 at 3:02 PM Russell Bateman <r...@windofkeltia.com> > > > wrote: > > > > > >> Yes, what you described is what was happening, Mark. I didn't display > > >> all of the code to the session methods, and I did re-read the > in-coming > > >> flowfile for different purposes than I had already read and written > it. > > >> So, I wasn't helpful enough. In the end, however, I had forgotten, > > >> immediately after the call to session.putAllAttributes(), to update > the > > >> resulting flowfile for passing to session.transfer(). That solved it > for > > >> 1.1.2 which wasn't necessary for 1.13.2 or later versions. Being > > >> helpful, the newer versions made me a spoiled, entitled child and I > will > > >> repent immediately. > > >> > > >> Thanks, guys! DevOps are happy they don't have to upgrade the > customers > > >> to NiFi 1.13.2. (In a way, I'm unhappy about that, but...). > > >> > > >> Best regards, > > >> > > >> Russ > > >> > > >> On 5/7/24 11:53, Mark Payne wrote: > > >>> The call to session.putAttribute would throw an Exception because you > > >>> provided an outdated version of the flowFile (did not capturing the > > >>> result of calling session.write) > > >>> > > >>> Now, as NiFi matured, we found that: > > >>> (a) for more complex processors that aren’t just a series of > sequential > > >>> steps it becomes difficult to manage all of that bookkeeping. > > >>> (b) it was not intuitive to require this > > >>> (c) the ProcessSession already had more or less what it needed in > order > > >>> to determine what the most up-to-date version of the FlowFile was. > > >>> > > >>> So we updated the ProcessSession to automatically grab the latest > > >>> version of the FlowFile for these methods. But since you’re trying to > > >>> run an old version, you’ll need to make sure that you capture all of > > >>> those outputs and always keep track of the most recent version of a > > >>> FlowFile. > > >> > > > > >