Chris Sampson created NIFI-9050:
-----------------------------------

             Summary: ProcessSession State methods don't always persist state 
between consecutive sessions
                 Key: NIFI-9050
                 URL: https://issues.apache.org/jira/browse/NIFI-9050
             Project: Apache NiFi
          Issue Type: Bug
    Affects Versions: 1.14.0
            Reporter: Chris Sampson


During the development of NIFI-8002, the {{ProcessSession}} {{clearState}}, 
{{getState}} and {{setState}} were used in the {{SearchElasticsearch}} 
processor that uses {{State.LOCAL}}, is set to {{TriggerSerially}} and may have 
one of several business logic flows through the {{onTrigger}} method:

# New Query (no existing state)
** a paginated Elasticsearch query is constructed and exectued
** Details of the query are stored in {{State.LOCAL}} after the page of data 
has been processed
** the session completes and commits in {{AbstractProcessor}} with a call to 
{{commitAsync}}
*** (this async call could be the problem)
** a new session starts when the processor re-triggers *before* the 
Elasticsearch paginated query has expired (Keep Alive configuration)
** existing state is read and used to query the next page of data from 
Elasticsearch for the existing query

# Expire Query (existing state)
** processor triggers and reads existing state
** processor determines that the previous query has expired (the 
{{State.LOCAL}} "expiration timeout" is prior to {{Instant.now}})
** {{State.LOCAL}} is cleared
** a new paginated query should be started

What was being experienced by [~jgresock] [in his 
review|https://github.com/apache/nifi/pull/5193#discussion_r675650631] however 
is that if the processor was scheduled to run in quick succession (1 second), 
the state was not being persisted correctly. This lead to the wrong page of 
data being requested at times and/or old query details being left in the State 
after they should have been cleared, resulting in errors in the logs and no 
data/a repeated first-page of data being output.

I think this may be a timing issue - because the {{AbstractProcessor}} uses 
{{commitAsync}}, the state may not have persisted to the remote store before 
the next session starts, therefore the new session sees the wrong set {{State}} 
values and could result in the wrong data being processed, etc.

Changing to use the older approach of using {{StateManager}} via the 
{{ProcessContext}} (which NIFI-8136 tried to simplify via the 
{{ProcessSession}}), which immediately (synchronously) commits the {{State}} to 
the remote, fixed the problem with this processor configuration (although a 
call to {{session.commit}} at the correct place in the processors may also have 
achieved, but because of the Elasticsearch processor abstractions and different 
uses/business logic, it could have been tricky to get this right, e.g. 
committing the session after clearing the state of a previously expired query 
might cause issues in subsequent method calls, etc.).

Using the {{ProcessContext}} approach instead of the {{ProcessSession}} methods 
isn't wrong, but maybe the use of the {{ProcessSession}} methods should come 
with further clarity for developers, i.e. timing issues should be considered if 
there's a reliance on State being passed between consecutive sessions, etc. Or 
should {{AbstractProcessor}} use {{session.commit}} (synchronous) instead (but 
I imagine that was change to asynchronous for a reason)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to