patricker commented on a change in pull request #3514: NIFI-6344 Add
UpdateAttribute Failure Relationship
URL: https://github.com/apache/nifi/pull/3514#discussion_r291016209
##########
File path:
nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
##########
@@ -474,38 +496,49 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
Map<String, Action> defaultActions = this.defaultActions;
List<FlowFile> flowFilesToTransfer = new LinkedList<>();
- // if there is update criteria specified, evaluate it
- if (criteria != null && evaluateCriteria(session, context, criteria,
incomingFlowFile, matchedRules, stateInitialAttributes)) {
- // apply the actions for each rule and transfer the flowfile
- for (final Map.Entry<FlowFile, List<Rule>> entry :
matchedRules.entrySet()) {
- FlowFile match = entry.getKey();
- final List<Rule> rules = entry.getValue();
- boolean updateWorking = incomingFlowFile.equals(match);
+ try {
+ // if there is update criteria specified, evaluate it
+ if (criteria != null && evaluateCriteria(session, context,
criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
+ // apply the actions for each rule and transfer the flowfile
+ for (final Map.Entry<FlowFile, List<Rule>> entry :
matchedRules.entrySet()) {
+ FlowFile match = entry.getKey();
+ final List<Rule> rules = entry.getValue();
+ boolean updateWorking = incomingFlowFile.equals(match);
+
+ // execute each matching rule(s)
+ match = executeActions(session, context, rules,
defaultActions, match, stateInitialAttributes, stateWorkingAttributes);
+
+ if (updateWorking) {
+ incomingFlowFile = match;
+ }
- // execute each matching rule(s)
- match = executeActions(session, context, rules,
defaultActions, match, stateInitialAttributes, stateWorkingAttributes);
+ if (debugEnabled) {
+ logger.debug("Updated attributes for {}; transferring
to '{}'", new Object[]{match, REL_SUCCESS.getName()});
+ }
- if (updateWorking) {
- incomingFlowFile = match;
+ // add the match to the list to transfer
+ flowFilesToTransfer.add(match);
}
+ } else {
+ // Either we're running without any rules or the FlowFile
didn't match any
+ incomingFlowFile = executeActions(session, context, null,
defaultActions, incomingFlowFile, stateInitialAttributes,
stateWorkingAttributes);
if (debugEnabled) {
- logger.debug("Updated attributes for {}; transferring to
'{}'", new Object[]{match, REL_SUCCESS.getName()});
+ logger.debug("Updated attributes for {}; transferring to
'{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
}
- // add the match to the list to transfer
- flowFilesToTransfer.add(match);
+ // add the flowfile to the list to transfer
+ flowFilesToTransfer.add(incomingFlowFile);
}
- } else {
- // Either we're running without any rules or the FlowFile didn't
match any
- incomingFlowFile = executeActions(session, context, null,
defaultActions, incomingFlowFile, stateInitialAttributes,
stateWorkingAttributes);
+ } catch (Exception e){
+ final String failureAction =
context.getProperty(FAILURE_ACTION).getValue();
- if (debugEnabled) {
- logger.debug("Updated attributes for {}; transferring to
'{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
+ if(failureAction.equals(FAILURE_ACTION_ROLLBACK)) {
+ throw e;
}
- // add the flowfile to the list to transfer
- flowFilesToTransfer.add(incomingFlowFile);
+ session.penalize(incomingFlowFile);
Review comment:
I'll remove it from here. In a rollback scenario, penalization is handled
automatically at the session level by throwing the exception (you'll note it is
not something I moved/removed).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services