Mark Payne created NIFI-8136:
--------------------------------
Summary: Allow State Management to be tied to Process Session
Key: NIFI-8136
URL: https://issues.apache.org/jira/browse/NIFI-8136
Project: Apache NiFi
Issue Type: New Feature
Components: Core Framework
Reporter: Mark Payne
Assignee: Mark Payne
We have many processors currently that store state using NiFi's built-in state
management capabilities. To do this, processors need to do something like:
{code:java}
Map<String, String> state = new HashMap<>();
state.put("key", "value1");
state.put("key2", "value2");
if (flowFile != null) {
...
state.put("key2", updatedValue2);
}
session.commit();
context.getStateManager().setState(state, Scope.LOCAL);{code}
Which is not a terrible API but comes with a few downfalls.
If using a processor that has the @SupportsBatching annotation, calls to
ProcessContext.getStateManager().getState(Scope) can be costly to invoke for
each FlowFile. To avoid this, processors typically end up having to cache the
values themselves.
Depending on the code, management of the state map can be difficult and there's
no ability to rollback the state changes once applied because the call to
setState() immediately updates the remote state.
If executing within a different context, in which we want to store state
atomically with the FlowFiles that resulted in the state change, there's no way
to do that currently.
To overcome these problems, we should allow for setting, getting, clearing, and
replacing state to be done via the ProcessSession, in addition to the State
Manager. I.e., a Processor developer may do either of:
{code:java}
context.getStateManager().setState(...);{code}
Or
{code:java}
session.setState(...); {code}
The former would behave as it does now, immediately updating state on the
remote system (zookeeper, for example). The latter would simply update an
in-memory copy of the state in the Process Session. When
ProcessSession.commit() is called, it would push the new state to the remote
system. If the session is rolled back, it would simply not update the state.
This allows the state to be set in the middle of the processor's algorithm,
rather than requiring that it be held onto until after session commit is
successful. If the session is then checkpointed (via session.commit while
running with a Run Duration greater than 0 ms), then the Session Checkpoint
will keep the state. Rolling back the session but not the checkpoint would then
result in the checkpointed state still be pushed out.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)