ijokarumawak commented on a change in pull request #3541: NIFI-6387 Implemented RetryFlowFile URL: https://github.com/apache/nifi/pull/3541#discussion_r297062025
########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RetryFlowFile.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Tags({"Retry", "FlowFile"}) +@CapabilityDescription("FlowFiles passed to this Processor have a 'Retry Attribute' value checked against a " + + "configured 'Maximum Retries' value. If the current attribute value is below the configured maximum, the " + + "FlowFile is passed to a retry relationship. The FlowFile may or may not be penalized in that condition. " + + "If the FlowFile's attribute value exceeds the configured maximum, the FlowFile will be passed to a " + + "'retries_exceeded' relationship. WARNING: If the incoming FlowFile has a non-numeric value in the " + + "configured 'Retry Attribute' attribute, it will be reset to '1'. You may choose to fail the FlowFile " + + "instead of performing the reset. Additional dynamic properties can be defined for any attributes you " + + "wish to add to the FlowFiles transferred to 'retries_exceeded'. These attributes support attribute " + + "expression language.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@SupportsBatching +@SideEffectFree +@DefaultSettings(penaltyDuration = "2 min") +@ReadsAttribute(attribute = "Retry Attribute", + description = "Will read the attribute or attribute expression language result as defined in 'Retry Attribute'") +@WritesAttribute(attribute = "Retry Attribute", + description = "User defined retry attribute is updated with the current retry count") +@DynamicProperty(name = "Exceeded FlowFile Attribute Key", + value = "The value of the attribute added to the FlowFile", + description = "One or more dynamic properties can be used to add attributes to FlowFiles passed to " + + "the 'retries_exceeded' relationship", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +public class RetryFlowFile extends AbstractProcessor { + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private String retryAttribute; + private Integer maximumRetries; + private Boolean penalizeRetried; + private Boolean failOnOverwrite; + + public static final PropertyDescriptor RETRY_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Retry Attribute") + .description("The name of the attribute that contains the current retry count for the FlowFile. " + + "WARNING: If the name matches an attribute already on the FlowFile that does not contain a " + + "numerical value, the processor will either overwrite that attribute with '1' or fail " + + "based on configuration.") Review comment: Just an idea. I imagine users may experience unexpected behavior when they use RetryFlowFile processor multiple times for a FlowFile until it gets its end of life, in case if such RetryFlowFile processors are configured to use the same `Retry Attribute` at different places, the later RetryFlowFile retries less than expected, because retry count will be accumulated. To avoid that, (and easier UX from being able to rely on the default attribute name), I'd suggest adding another property such as `Reset Previous Retry Attribute`, defaults to `true`. Add some code to write another FlowFile attribute tracking current RetryFlowFile instance identifier (via `getIdentifier()`). The attribute name can be static, e.g. `retryflowfile.processor.id`. Then at the beginning of `onTrigger()`, check if current identifier is the same with the one stored at the attribute. If different, reset `Retry Attribute` to 1. The only possible use-case I can imagine where user would need disable doing this, is when RetryFlowFiles are nested. In such a case, each instance must be configured with different `Retry Attribute`. How do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
