Mark Payne created NIFI-8136:
--------------------------------

             Summary: Allow State Management to be tied to Process Session
                 Key: NIFI-8136
                 URL: https://issues.apache.org/jira/browse/NIFI-8136
             Project: Apache NiFi
          Issue Type: New Feature
          Components: Core Framework
            Reporter: Mark Payne
            Assignee: Mark Payne


We have many processors currently that store state using NiFi's built-in state 
management capabilities. To do this, processors need to do something like:
{code:java}
Map<String, String> state = new HashMap<>();
state.put("key", "value1");
state.put("key2", "value2");

if (flowFile != null) {
    ...
    state.put("key2", updatedValue2);
}

session.commit();
context.getStateManager().setState(state, Scope.LOCAL);{code}
Which is not a terrible API but comes with a few downfalls.

If using a processor that has the @SupportsBatching annotation, calls to 
ProcessContext.getStateManager().getState(Scope) can be costly to invoke for 
each FlowFile. To avoid this, processors typically end up having to cache the 
values themselves.

Depending on the code, management of the state map can be difficult and there's 
no ability to rollback the state changes once applied because the call to 
setState() immediately updates the remote state.

If executing within a different context, in which we want to store state 
atomically with the FlowFiles that resulted in the state change, there's no way 
to do that currently.

To overcome these problems, we should allow for setting, getting, clearing, and 
replacing state to be done via the ProcessSession, in addition to the State 
Manager. I.e., a Processor developer may do either of:
{code:java}
context.getStateManager().setState(...);{code}
Or
{code:java}
session.setState(...); {code}
The former would behave as it does now, immediately updating state on the 
remote system (zookeeper, for example). The latter would simply update an 
in-memory copy of the state in the Process Session. When 
ProcessSession.commit() is called, it would push the new state to the remote 
system. If the session is rolled back, it would simply not update the state. 
This allows the state to be set in the middle of the processor's algorithm, 
rather than requiring that it be held onto until after session commit is 
successful. If the session is then checkpointed (via session.commit while 
running with a Run Duration greater than 0 ms), then the Session Checkpoint 
will keep the state. Rolling back the session but not the checkpoint would then 
result in the checkpointed state still be pushed out.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to