Revert "NIFI-5448 Added failure relationship to UpdateAttributes to handle bad expression language logic."
This reverts commit 32ee552ada328ed1189ed2bd0a2af18ed213ddc8. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e25b26e9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e25b26e9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e25b26e9 Branch: refs/heads/master Commit: e25b26e9cf860db038da8e23e8e64bdb1b8dba88 Parents: 6b77e7d Author: joewitt <[email protected]> Authored: Fri Oct 12 11:27:48 2018 -0400 Committer: joewitt <[email protected]> Committed: Fri Oct 12 11:27:48 2018 -0400 ---------------------------------------------------------------------- .../processors/attributes/UpdateAttribute.java | 82 ++++++-------------- .../update/attributes/TestUpdateAttribute.java | 29 ------- 2 files changed, 24 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e25b26e9/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index b5ff4ff..cee8d22 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -28,7 +28,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -108,21 +107,17 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .description("All successful FlowFiles are routed to this relationship").name("success").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .description("All flowfiles that cannot be updated are routed to this relationship").name("failure").autoTerminateDefault(true).build(); public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder() .description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build(); static { Set<Relationship> tempStatelessSet = new HashSet<>(); tempStatelessSet.add(REL_SUCCESS); - tempStatelessSet.add(REL_FAILURE); statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet); Set<Relationship> tempStatefulSet = new HashSet<>(); tempStatefulSet.add(REL_SUCCESS); - tempStatefulSet.add(REL_FAILURE); tempStatefulSet.add(REL_FAILED_SET_STATE); statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet); @@ -157,21 +152,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } }; - public static final AllowableValue FAIL_STOP = new AllowableValue("stop", "Penalize", "Penalize FlowFiles." + - "This is based on the original behavior of the processor to allow for a smooth transition."); - public static final AllowableValue FAIL_ROUTE = new AllowableValue("route", "Route to Failure Relationship", - "If chosen, failed FlowFiles will be routed to the failure relationship."); - public static final PropertyDescriptor FAILURE_BEHAVIOR = new PropertyDescriptor.Builder() - .name("update-attribute-failure-behavior") - .displayName("Failure Behavior") - .description("Control how to handle errors in Expression Language evaluation. The default behavior is to stop evaluation. It can be " + - "changed by the user to route to a failure relationship instead.") - .allowableValues(FAIL_STOP, FAIL_ROUTE) - .defaultValue(FAIL_STOP.getValue()) - .required(true) - .build(); - - // static properties public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression"; public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder() @@ -225,7 +205,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { descriptors.add(DELETE_ATTRIBUTES); descriptors.add(STORE_STATE); descriptors.add(STATEFUL_VARIABLES_INIT_VALUE); - descriptors.add(FAILURE_BEHAVIOR); return Collections.unmodifiableList(descriptors); } @@ -473,51 +452,39 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { Map<String, Action> defaultActions = this.defaultActions; List<FlowFile> flowFilesToTransfer = new LinkedList<>(); - boolean routeToFailure = context.getProperty(FAILURE_BEHAVIOR).getValue().equals(FAIL_ROUTE.getValue()); - try { - // if there is update criteria specified, evaluate it - if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { - // apply the actions for each rule and transfer the flowfile - for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) { - FlowFile match = entry.getKey(); - final List<Rule> rules = entry.getValue(); - boolean updateWorking = incomingFlowFile.equals(match); - - // execute each matching rule(s) - match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes); - - if (updateWorking) { - incomingFlowFile = match; - } + // if there is update criteria specified, evaluate it + if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { + // apply the actions for each rule and transfer the flowfile + for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) { + FlowFile match = entry.getKey(); + final List<Rule> rules = entry.getValue(); + boolean updateWorking = incomingFlowFile.equals(match); - if (debugEnabled) { - logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); - } + // execute each matching rule(s) + match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes); - // add the match to the list to transfer - flowFilesToTransfer.add(match); + if (updateWorking) { + incomingFlowFile = match; } - } else { - // Either we're running without any rules or the FlowFile didn't match any - incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes); if (debugEnabled) { - logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()}); + logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); } - // add the flowfile to the list to transfer - flowFilesToTransfer.add(incomingFlowFile); + // add the match to the list to transfer + flowFilesToTransfer.add(match); } - } catch (ProcessException pe) { - if (routeToFailure) { - session.transfer(incomingFlowFile, REL_FAILURE); - getLogger().error("Failed to update flowfile attribute(s).", pe); - return; - } else { - throw pe; + } else { + // Either we're running without any rules or the FlowFile didn't match any + incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes); + + if (debugEnabled) { + logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()}); } - } + // add the flowfile to the list to transfer + flowFilesToTransfer.add(incomingFlowFile); + } if (stateInitialAttributes != null) { try { @@ -779,8 +746,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { final Map<String, Action> defaultActions = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { - if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE - && entry.getKey() != FAILURE_BEHAVIOR) { + if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) { final Action action = new Action(); action.setAttribute(entry.getKey().getName()); action.setValue(entry.getValue()); http://git-wip-us.apache.org/repos/asf/nifi/blob/e25b26e9/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index 35b5536..50938e6 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -1005,33 +1005,4 @@ public class TestUpdateAttribute { } } - @Test - public void testInvalidExpressionLanguage() { - final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setVariable("test", "Squirrel!!1!"); - runner.setProperty("bad_attr", "${test:toDate('yyyy-MM-dd')}"); - runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_ROUTE); - runner.assertValid(); - - runner.enqueue("Test"); - runner.run(); - - runner.assertTransferCount(UpdateAttribute.REL_SUCCESS, 0); - runner.assertTransferCount(UpdateAttribute.REL_FAILURE, 1); - - runner.clearTransferState(); - - Throwable ex = null; - try { - runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_STOP); - runner.enqueue("Test"); - runner.run(); - } catch (Throwable t) { - ex = t; - } finally { - Assert.assertNotNull(ex); - Assert.assertTrue(ex.getCause() instanceof ProcessException); - runner.assertQueueNotEmpty(); - } - } }
