[ 
https://issues.apache.org/jira/browse/NIFI-1829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332383#comment-15332383
 ] 

ASF GitHub Bot commented on NIFI-1829:
--------------------------------------

Github user mosermw commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r67230370
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
 ---
    @@ -0,0 +1,499 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.http.annotation.ThreadSafe;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@ThreadSafe()
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
    +@CapabilityDescription("The DebugFlow processor aids testing and debugging 
the FlowFile framework by allowing various "
    +        + "responses to be explicitly triggered in response to the receipt 
of a FlowFile or a timer event without a "
    +        + "FlowFile if using timer or cron based scheduling.  It can force 
responses needed to exercise or test "
    +        + "various failure modes that can occur when a processor runs.\n"
    +        + "\n"
    +        + "When triggered, the processor loops through the appropriate 
response list (based on whether or not it "
    +        + "received a FlowFile).  A response is produced the configured 
number of times for each pass through its"
    +        + "response list, as long as the processor is running.\n"
    +        + "\n"
    +        + "Triggered by a FlowFile, the processor can produce the 
following responses."
    +        + "  1. transfer FlowFile to success relationship.\n"
    +        + "  2. transfer FlowFile to failure relationship.\n"
    +        + "  3. rollback the FlowFile without penalty.\n"
    +        + "  4. rollback the FlowFile and yield the context.\n"
    +        + "  5. rollback the FlowFile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "Triggered without a FlowFile, the processor can produce the 
following responses."
    +        + "  1. do nothing and return.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. yield the context.\n")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private final AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed to process.")
    +            .build();
    +
    +    private final AtomicReference<List<PropertyDescriptor>> 
propertyDescriptors = new AtomicReference<>();
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("FlowFile Success Iterations")
    +            .description("Number of FlowFiles to forward to success 
relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("FlowFile Failure Iterations")
    +            .description("Number of FlowFiles to forward to failure 
relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Iterations")
    +            .description("Number of FlowFiles to roll back (without 
penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Yield Iterations")
    +            .description("Number of FlowFiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("FlowFile Rollback Penalty Iterations")
    +            .description("Number of FlowFiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("FlowFile Exception Iterations")
    +            .description("Number of FlowFiles to throw exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_CLASS = new 
PropertyDescriptor.Builder()
    +            .name("FlowFile Exception Class")
    +            .description("Exception class to be thrown (must extend 
java.lang.RuntimeException).")
    +            .required(true)
    +            .defaultValue("java.lang.RuntimeException")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String 
input, ValidationContext context) {
    +                    Class<? extends RuntimeException> klass = 
classNameToRuntimeExceptionClass(input);
    +                    return new ValidationResult.Builder()
    +                            .subject(subject)
    +                            .input(input)
    +                            .valid(klass != null && 
(RuntimeException.class.isAssignableFrom(klass)))
    +                            .explanation(subject + " class must exist and 
extend java.lang.RuntimeException")
    +                            .build();
    +                }
    +            })
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("No FlowFile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no 
FlowFile.")
    +            .required(true)
    +            .defaultValue("1")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("No FlowFile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no 
FlowFile.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("No FlowFile Yield Iterations")
    +            .description("Number of times to yield if no FlowFile.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new 
PropertyDescriptor.Builder()
    +            .name("No FlowFile Exception Class")
    +            .description("Exception class to be thrown if no FlowFile 
(must extend java.lang.RuntimeException).")
    +            .required(true)
    +            .defaultValue("java.lang.RuntimeException")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String 
input, ValidationContext context) {
    +                    Class<? extends RuntimeException> klass = 
classNameToRuntimeExceptionClass(input);
    +                    return new ValidationResult.Builder()
    +                            .subject(subject)
    +                            .input(input)
    +                            .valid(klass != null && 
(RuntimeException.class.isAssignableFrom(klass)))
    +                            .explanation(subject + " class must exist and 
extend java.lang.RuntimeException")
    +                            .build();
    +                }
    +            })
    +            .build();
    +
    +
    +    private volatile Integer flowFileMaxSuccess = 0;
    +    private volatile Integer flowFileMaxFailure = 0;
    +    private volatile Integer flowFileMaxRollback = 0;
    +    private volatile Integer flowFileMaxYield = 0;
    +    private volatile Integer flowFileMaxPenalty = 0;
    +    private volatile Integer flowFileMaxException = 0;
    +
    +    private volatile Integer noFlowFileMaxSkip = 0;
    +    private volatile Integer noFlowFileMaxException = 0;
    +    private volatile Integer noFlowFileMaxYield = 0;
    +
    +    private volatile Integer flowFileCurrSuccess = 0;
    +    private volatile Integer flowFileCurrFailure = 0;
    +    private volatile Integer flowFileCurrRollback = 0;
    +    private volatile Integer flowFileCurrYield = 0;
    +    private volatile Integer flowFileCurrPenalty = 0;
    +    private volatile Integer flowFileCurrException = 0;
    +
    +    private volatile Integer noFlowFileCurrSkip = 0;
    +    private volatile Integer noFlowFileCurrException = 0;
    +    private volatile Integer noFlowFileCurrYield = 0;
    +
    +    private volatile Class<? extends RuntimeException> 
flowFileExceptionClass = null;
    +    private volatile Class<? extends RuntimeException> 
noFlowFileExceptionClass= null;
    +
    +    private final FlowFileResponse curr_ff_resp = new FlowFileResponse();
    +    private final NoFlowFileResponse curr_noff_resp = new 
NoFlowFileResponse();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        synchronized (relationships) {
    +            if (relationships.get() == null) {
    +                HashSet<Relationship> relSet = new HashSet<>();
    +                relSet.add(REL_SUCCESS);
    +                relSet.add(REL_FAILURE);
    +                relationships.compareAndSet(null, 
Collections.unmodifiableSet(relSet));
    +            }
    +            return relationships.get();
    +        }
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        synchronized (propertyDescriptors) {
    +            if (propertyDescriptors.get() == null) {
    +                ArrayList<PropertyDescriptor> propList = new ArrayList<>();
    +                propList.add(FF_SUCCESS_ITERATIONS);
    +                propList.add(FF_FAILURE_ITERATIONS);
    +                propList.add(FF_ROLLBACK_ITERATIONS);
    +                propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
    +                propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
    +                propList.add(FF_EXCEPTION_ITERATIONS);
    +                propList.add(FF_EXCEPTION_CLASS);
    +                propList.add(NO_FF_EXCEPTION_ITERATIONS);
    +                propList.add(NO_FF_YIELD_ITERATIONS);
    +                propList.add(NO_FF_SKIP_ITERATIONS);
    +                propList.add(NO_FF_EXCEPTION_CLASS);
    +                propertyDescriptors.compareAndSet(null, 
Collections.unmodifiableList(propList));
    +            }
    +            return propertyDescriptors.get();
    +        }
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        flowFileMaxSuccess = 
context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
    +        flowFileMaxFailure = 
context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
    +        flowFileMaxYield = 
context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
    +        flowFileMaxRollback = 
context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
    +        flowFileMaxPenalty = 
context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
    +        flowFileMaxException = 
context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
    +        noFlowFileMaxException = 
context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
    +        noFlowFileMaxYield = 
context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
    +        noFlowFileMaxSkip = 
context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
    +        curr_ff_resp.reset();
    +        curr_noff_resp.reset();
    +        flowFileExceptionClass = 
classNameToRuntimeExceptionClass(context.getProperty(FF_EXCEPTION_CLASS).toString());
    +        noFlowFileExceptionClass = 
classNameToRuntimeExceptionClass(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        final ComponentLog logger = getLogger();
    +
    +        FlowFile ff = session.get();
    +
    +        // Make up to 2 passes to allow rollover from last cycle to first.
    +        // (This could be "while(true)" since responses should break out  
if selected, but this
    +        //  prevents endless loops in the event of unexpected errors or 
future changes.)
    +        int pass = 2;
    +        while (pass > 0) {
    +            pass -= 1;
    +            if (ff == null) {
    +                if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
    +                    if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
    +                        noFlowFileCurrSkip += 1;
    +                        logger.info("DebugFlow skipping with no flow 
file");
    +                        return;
    +                    } else {
    +                        noFlowFileCurrSkip = 0;
    +                        curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
    +                    if (noFlowFileCurrException < noFlowFileMaxException) {
    +                        noFlowFileCurrException += 1;
    +                        logger.info("DebugFlow throwing NPE with no flow 
file");
    +                        String message = "forced by " + 
this.getClass().getName();
    +                        RuntimeException rte;
    +                        try {
    +                            rte = 
noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
    +                            throw rte;
    +                        } catch (InstantiationException | 
IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
    +                            e.printStackTrace();
    --- End diff --
    
    Eeek e.printStackTrace() should be changed to use logger.


> Create FlowDebugger processor
> -----------------------------
>
>                 Key: NIFI-1829
>                 URL: https://issues.apache.org/jira/browse/NIFI-1829
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joe Skora
>            Assignee: Joe Skora
>            Priority: Minor
>             Fix For: 1.0.0, 0.7.0
>
>
> The FlowDebugger processor allows a variety of Processor responses and 
> failures to be simulated for testing, debugging, and troubleshooting the 
> framework.
> Responses it can produce on receipt of a flowfile include Transfer to 
> Success, Transfer to Failure, Rollback without Penalty, Rollback and Yield, 
> Rollback with Penalty, and Throw an Exception.  The properties indicate how 
> many times that response should be thrown, for example if configured with 
> Success=10 and Failure=40, it will transfer the first 10 flowfiles to 
> Success, transfer the next 40 to Failure, and then repeat.
> Similarly, responses it can produce when triggered without a flowfile include 
> Throw an Exception, Yield, and Return (do nothing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to