[ 
https://issues.apache.org/jira/browse/NIFI-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15785927#comment-15785927
 ] 

ASF GitHub Bot commented on NIFI-1582:
--------------------------------------

Github user brosander commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1371#discussion_r94170726
  
    --- Diff: 
nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
 ---
    @@ -424,59 +419,104 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
             // because is the original flowfile is used for all matching 
rules. in this
             // case the order of the matching rules is preserved in the list
             final Map<FlowFile, List<Rule>> matchedRules = new HashMap<>();
    -        Map<String, String> statefulAttributes = null;
    +
    +        final Map<String, String> stateInitialAttributes;
    +        final Map<String, String> stateWorkingAttributes;
    +        StateMap stateMap = null;
     
             matchedRules.clear();
     
             try {
                 if (stateful) {
    -                statefulAttributes = new 
HashMap<>(context.getStateManager().getState(Scope.LOCAL).toMap());
    +                stateMap = context.getStateManager().getState(Scope.LOCAL);
    +                stateInitialAttributes = stateMap.toMap();
    +                stateWorkingAttributes = new  HashMap<>(stateMap.toMap());
                 } else {
    -                statefulAttributes = null;
    +                stateInitialAttributes = null;
    +                stateWorkingAttributes = null;
                 }
             } catch (IOException e) {
    -            logger.error("Failed to update attributes for {} due to 
failing to get state; transferring FlowFile back to '{}'", new 
Object[]{flowFile, Relationship.SELF.getName()}, e);
    -            session.transfer(flowFile);
    +            logger.error("Failed to get the initial state when processing 
{}; transferring FlowFile back to it's incoming queue", new 
Object[]{incomingFlowFile, Relationship.SELF.getName()}, e);
    --- End diff --
    
    typo nitpick it's -> its


> New processor to update attributes with state
> ---------------------------------------------
>
>                 Key: NIFI-1582
>                 URL: https://issues.apache.org/jira/browse/NIFI-1582
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>             Fix For: 1.2.0
>
>
> This idea was sparked by a thread on the user list and should allow basic 
> data science:
> I expect that in the future I’ll need something a little more sophisticated 
> but for now my problem is very simple:
> I want to be able to trigger an alert (only once) when an attribute in an 
> incoming stream, for instance, goes over a predefined threshold. The 
> Processor should then trigger (only once again) another trigger when the 
> signal goes back to normal (below threshold). Basically a RouteByAttribute 
> but with memory.
> Thanks 
> Claudio
> ------------------------------------------------
> Hello Claudio,
> Your use-case actually could leverage a couple of recently added features to 
> create a really cool open-source processor. The two key features that were 
> added are State Management and the ability to reference processor specific 
> variables in expression language. You can take a look at RouteText to see 
> both in action. 
> By utilizing both you can create a processor that is configured with multiple 
> Expression language expressions. There would be dynamic properties which 
> would accept expression language and then store the evaluated value via state 
> management. Then there would be a routing property (that supports expression 
> language) that could simply add an attribute to the flowfile with the 
> evaluated value which would allow it to be used by flowing processors for 
> routing.
> This would allow you to do your use-case where you store the value for the 
> incoming stream and route differently once you go over a threshold. It could 
> even allow more complex use-cases. One instance, I believe, would be possible 
> is to have a running average and standard deviation and route data to 
> different locations based on it's standard deviation.
> You can think of this like an UpdateAttribute with the ability to store and 
> calculate variables using expression language.
> Joe



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to