Chris Sampson created NIFI-9050:
-----------------------------------
Summary: ProcessSession State methods don't always persist state
between consecutive sessions
Key: NIFI-9050
URL: https://issues.apache.org/jira/browse/NIFI-9050
Project: Apache NiFi
Issue Type: Bug
Affects Versions: 1.14.0
Reporter: Chris Sampson
During the development of NIFI-8002, the {{ProcessSession}} {{clearState}},
{{getState}} and {{setState}} were used in the {{SearchElasticsearch}}
processor that uses {{State.LOCAL}}, is set to {{TriggerSerially}} and may have
one of several business logic flows through the {{onTrigger}} method:
# New Query (no existing state)
** a paginated Elasticsearch query is constructed and exectued
** Details of the query are stored in {{State.LOCAL}} after the page of data
has been processed
** the session completes and commits in {{AbstractProcessor}} with a call to
{{commitAsync}}
*** (this async call could be the problem)
** a new session starts when the processor re-triggers *before* the
Elasticsearch paginated query has expired (Keep Alive configuration)
** existing state is read and used to query the next page of data from
Elasticsearch for the existing query
# Expire Query (existing state)
** processor triggers and reads existing state
** processor determines that the previous query has expired (the
{{State.LOCAL}} "expiration timeout" is prior to {{Instant.now}})
** {{State.LOCAL}} is cleared
** a new paginated query should be started
What was being experienced by [~jgresock] [in his
review|https://github.com/apache/nifi/pull/5193#discussion_r675650631] however
is that if the processor was scheduled to run in quick succession (1 second),
the state was not being persisted correctly. This lead to the wrong page of
data being requested at times and/or old query details being left in the State
after they should have been cleared, resulting in errors in the logs and no
data/a repeated first-page of data being output.
I think this may be a timing issue - because the {{AbstractProcessor}} uses
{{commitAsync}}, the state may not have persisted to the remote store before
the next session starts, therefore the new session sees the wrong set {{State}}
values and could result in the wrong data being processed, etc.
Changing to use the older approach of using {{StateManager}} via the
{{ProcessContext}} (which NIFI-8136 tried to simplify via the
{{ProcessSession}}), which immediately (synchronously) commits the {{State}} to
the remote, fixed the problem with this processor configuration (although a
call to {{session.commit}} at the correct place in the processors may also have
achieved, but because of the Elasticsearch processor abstractions and different
uses/business logic, it could have been tricky to get this right, e.g.
committing the session after clearing the state of a previously expired query
might cause issues in subsequent method calls, etc.).
Using the {{ProcessContext}} approach instead of the {{ProcessSession}} methods
isn't wrong, but maybe the use of the {{ProcessSession}} methods should come
with further clarity for developers, i.e. timing issues should be considered if
there's a reliance on State being passed between consecutive sessions, etc. Or
should {{AbstractProcessor}} use {{session.commit}} (synchronous) instead (but
I imagine that was change to asynchronous for a reason)?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)