Repository: nifi
Updated Branches:
  refs/heads/master 6e74c10f4 -> 4723f8e24


NIFI-1829 - Create new DebugFlow processor.

Signed-off-by: Mike Moser <mose...@apache.org>
This closes #458


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4723f8e2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4723f8e2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4723f8e2

Branch: refs/heads/master
Commit: 4723f8e24c83b3fda68a091be7deabfd84f2b40a
Parents: 6e74c10
Author: Joe Skora <jsk...@apache.org>
Authored: Fri May 20 11:25:03 2016 -0400
Committer: Mike Moser <mose...@apache.org>
Committed: Thu Jun 16 17:23:19 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/DebugFlow.java     | 486 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      |  48 ++
 .../nifi/processors/standard/TestDebugFlow.java | 361 ++++++++++++++
 4 files changed, 896 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4723f8e2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
new file mode 100644
index 0000000..1374c10
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.http.annotation.ThreadSafe;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+@ThreadSafe()
+@EventDriven()
+@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
+@CapabilityDescription("The DebugFlow processor aids testing and debugging the 
FlowFile framework by allowing various "
+        + "responses to be explicitly triggered in response to the receipt of 
a FlowFile or a timer event without a "
+        + "FlowFile if using timer or cron based scheduling.  It can force 
responses needed to exercise or test "
+        + "various failure modes that can occur when a processor runs.")
+public class DebugFlow extends AbstractProcessor {
+
+    private final AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles processed successfully.")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to process.")
+            .build();
+
+    private final AtomicReference<List<PropertyDescriptor>> 
propertyDescriptors = new AtomicReference<>();
+
+    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("FlowFile Success Iterations")
+            .description("Number of FlowFiles to forward to success 
relationship.")
+            .required(true)
+            .defaultValue("1")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("FlowFile Failure Iterations")
+            .description("Number of FlowFiles to forward to failure 
relationship.")
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("FlowFile Rollback Iterations")
+            .description("Number of FlowFiles to roll back (without penalty).")
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("FlowFile Rollback Yield Iterations")
+            .description("Number of FlowFiles to roll back and yield.")
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("FlowFile Rollback Penalty Iterations")
+            .description("Number of FlowFiles to roll back with penalty.")
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("FlowFile Exception Iterations")
+            .description("Number of FlowFiles to throw exception.")
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor FF_EXCEPTION_CLASS = new 
PropertyDescriptor.Builder()
+            .name("FlowFile Exception Class")
+            .description("Exception class to be thrown (must extend 
java.lang.RuntimeException).")
+            .required(true)
+            .defaultValue("java.lang.RuntimeException")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(new Validator() {
+                @Override
+                public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+                    Class<? extends RuntimeException> klass = 
classNameToRuntimeExceptionClass(input);
+                    return new ValidationResult.Builder()
+                            .subject(subject)
+                            .input(input)
+                            .valid(klass != null && 
(RuntimeException.class.isAssignableFrom(klass)))
+                            .explanation(subject + " class must exist and 
extend java.lang.RuntimeException")
+                            .build();
+                }
+            })
+            .build();
+
+    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("No FlowFile Skip Iterations")
+            .description("Number of times to skip onTrigger if no FlowFile.")
+            .required(true)
+            .defaultValue("1")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("No FlowFile Exception Iterations")
+            .description("Number of times to throw NPE exception if no 
FlowFile.")
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new 
PropertyDescriptor.Builder()
+            .name("No FlowFile Yield Iterations")
+            .description("Number of times to yield if no FlowFile.")
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new 
PropertyDescriptor.Builder()
+            .name("No FlowFile Exception Class")
+            .description("Exception class to be thrown if no FlowFile (must 
extend java.lang.RuntimeException).")
+            .required(true)
+            .defaultValue("java.lang.RuntimeException")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(new Validator() {
+                @Override
+                public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+                    Class<? extends RuntimeException> klass = 
classNameToRuntimeExceptionClass(input);
+                    return new ValidationResult.Builder()
+                            .subject(subject)
+                            .input(input)
+                            .valid(klass != null && 
(RuntimeException.class.isAssignableFrom(klass)))
+                            .explanation(subject + " class must exist and 
extend java.lang.RuntimeException")
+                            .build();
+                }
+            })
+            .build();
+
+    private volatile Integer flowFileMaxSuccess = 0;
+    private volatile Integer flowFileMaxFailure = 0;
+    private volatile Integer flowFileMaxRollback = 0;
+    private volatile Integer flowFileMaxYield = 0;
+    private volatile Integer flowFileMaxPenalty = 0;
+    private volatile Integer flowFileMaxException = 0;
+
+    private volatile Integer noFlowFileMaxSkip = 0;
+    private volatile Integer noFlowFileMaxException = 0;
+    private volatile Integer noFlowFileMaxYield = 0;
+
+    private volatile Integer flowFileCurrSuccess = 0;
+    private volatile Integer flowFileCurrFailure = 0;
+    private volatile Integer flowFileCurrRollback = 0;
+    private volatile Integer flowFileCurrYield = 0;
+    private volatile Integer flowFileCurrPenalty = 0;
+    private volatile Integer flowFileCurrException = 0;
+
+    private volatile Integer noFlowFileCurrSkip = 0;
+    private volatile Integer noFlowFileCurrException = 0;
+    private volatile Integer noFlowFileCurrYield = 0;
+
+    private volatile Class<? extends RuntimeException> flowFileExceptionClass 
= null;
+    private volatile Class<? extends RuntimeException> 
noFlowFileExceptionClass= null;
+
+    private final FlowFileResponse curr_ff_resp = new FlowFileResponse();
+    private final NoFlowFileResponse curr_noff_resp = new NoFlowFileResponse();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        synchronized (relationships) {
+            if (relationships.get() == null) {
+                HashSet<Relationship> relSet = new HashSet<>();
+                relSet.add(REL_SUCCESS);
+                relSet.add(REL_FAILURE);
+                relationships.compareAndSet(null, 
Collections.unmodifiableSet(relSet));
+            }
+            return relationships.get();
+        }
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        synchronized (propertyDescriptors) {
+            if (propertyDescriptors.get() == null) {
+                ArrayList<PropertyDescriptor> propList = new ArrayList<>();
+                propList.add(FF_SUCCESS_ITERATIONS);
+                propList.add(FF_FAILURE_ITERATIONS);
+                propList.add(FF_ROLLBACK_ITERATIONS);
+                propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
+                propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
+                propList.add(FF_EXCEPTION_ITERATIONS);
+                propList.add(FF_EXCEPTION_CLASS);
+                propList.add(NO_FF_SKIP_ITERATIONS);
+                propList.add(NO_FF_EXCEPTION_ITERATIONS);
+                propList.add(NO_FF_YIELD_ITERATIONS);
+                propList.add(NO_FF_EXCEPTION_CLASS);
+                propertyDescriptors.compareAndSet(null, 
Collections.unmodifiableList(propList));
+            }
+            return propertyDescriptors.get();
+        }
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        flowFileMaxSuccess = 
context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
+        flowFileMaxFailure = 
context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
+        flowFileMaxYield = 
context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
+        flowFileMaxRollback = 
context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
+        flowFileMaxPenalty = 
context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
+        flowFileMaxException = 
context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
+        noFlowFileMaxException = 
context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
+        noFlowFileMaxYield = 
context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
+        noFlowFileMaxSkip = 
context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
+        curr_ff_resp.reset();
+        curr_noff_resp.reset();
+        flowFileExceptionClass = 
classNameToRuntimeExceptionClass(context.getProperty(FF_EXCEPTION_CLASS).toString());
+        noFlowFileExceptionClass = 
classNameToRuntimeExceptionClass(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final ComponentLog logger = getLogger();
+
+        FlowFile ff = session.get();
+
+        // Make up to 2 passes to allow rollover from last cycle to first.
+        // (This could be "while(true)" since responses should break out  if 
selected, but this
+        //  prevents endless loops in the event of unexpected errors or future 
changes.)
+        int pass = 2;
+        while (pass > 0) {
+            pass -= 1;
+            if (ff == null) {
+                if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
+                    if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
+                        noFlowFileCurrSkip += 1;
+                        logger.info("DebugFlow skipping with no flow file");
+                        return;
+                    } else {
+                        noFlowFileCurrSkip = 0;
+                        curr_noff_resp.getNextCycle();
+                    }
+                }
+                if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
+                    if (noFlowFileCurrException < noFlowFileMaxException) {
+                        noFlowFileCurrException += 1;
+                        logger.info("DebugFlow throwing NPE with no flow 
file");
+                        String message = "forced by " + 
this.getClass().getName();
+                        RuntimeException rte;
+                        try {
+                            rte = 
noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
+                            throw rte;
+                        } catch (InstantiationException | 
IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+                            if (logger.isErrorEnabled()) {
+                                logger.error("{} unexpected exception throwing 
DebugFlow exception: {}",
+                                        new Object[]{this, e});
+                            }
+                        }
+                    } else {
+                        noFlowFileCurrException = 0;
+                        curr_noff_resp.getNextCycle();
+                    }
+                }
+                if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_YIELD_RESPONSE) {
+                    if (noFlowFileCurrYield < noFlowFileMaxYield) {
+                        noFlowFileCurrYield += 1;
+                        logger.info("DebugFlow yielding with no flow file");
+                        context.yield();
+                        break;
+                    } else {
+                        noFlowFileCurrYield = 0;
+                        curr_noff_resp.getNextCycle();
+                    }
+                }
+                return;
+            } else {
+                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_SUCCESS_RESPONSE) {
+                    if (flowFileCurrSuccess < flowFileMaxSuccess) {
+                        flowFileCurrSuccess += 1;
+                        logger.info("DebugFlow transferring to success file={} 
UUID={}",
+                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                        
ff.getAttribute(CoreAttributes.UUID.key())});
+                        session.transfer(ff, REL_SUCCESS);
+                        session.commit();
+                        break;
+                    } else {
+                        flowFileCurrSuccess = 0;
+                        curr_ff_resp.getNextCycle();
+                    }
+                }
+                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_FAILURE_RESPONSE) {
+                    if (flowFileCurrFailure < flowFileMaxFailure) {
+                        flowFileCurrFailure += 1;
+                        logger.info("DebugFlow transferring to failure file={} 
UUID={}",
+                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                        
ff.getAttribute(CoreAttributes.UUID.key())});
+                        session.transfer(ff, REL_FAILURE);
+                        session.commit();
+                        break;
+                    } else {
+                        flowFileCurrFailure = 0;
+                        curr_ff_resp.getNextCycle();
+                    }
+                }
+                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_ROLLBACK_RESPONSE) {
+                    if (flowFileCurrRollback < flowFileMaxRollback) {
+                        flowFileCurrRollback += 1;
+                        logger.info("DebugFlow rolling back (no penalty) 
file={} UUID={}",
+                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                        
ff.getAttribute(CoreAttributes.UUID.key())});
+                        session.rollback();
+                        session.commit();
+                        break;
+                    } else {
+                        flowFileCurrRollback = 0;
+                        curr_ff_resp.getNextCycle();
+                    }
+                }
+                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_YIELD_RESPONSE) {
+                    if (flowFileCurrYield < flowFileMaxYield) {
+                        flowFileCurrYield += 1;
+                        logger.info("DebugFlow yielding file={} UUID={}",
+                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                        
ff.getAttribute(CoreAttributes.UUID.key())});
+                        session.rollback();
+                        context.yield();
+                        return;
+                    } else {
+                        flowFileCurrYield = 0;
+                        curr_ff_resp.getNextCycle();
+                    }
+                }
+                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_PENALTY_RESPONSE) {
+                    if (flowFileCurrPenalty < flowFileMaxPenalty) {
+                        flowFileCurrPenalty += 1;
+                        logger.info("DebugFlow rolling back (with penalty) 
file={} UUID={}",
+                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                        
ff.getAttribute(CoreAttributes.UUID.key())});
+                        session.rollback(true);
+                        session.commit();
+                        break;
+                    } else {
+                        flowFileCurrPenalty = 0;
+                        curr_ff_resp.getNextCycle();
+                    }
+                }
+                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_EXCEPTION_RESPONSE) {
+                    if (flowFileCurrException < flowFileMaxException) {
+                        flowFileCurrException += 1;
+                        String message = "forced by " + 
this.getClass().getName();
+                        logger.info("DebugFlow throwing NPE file={} UUID={}",
+                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                        
ff.getAttribute(CoreAttributes.UUID.key())});
+                        RuntimeException rte;
+                        try {
+                            rte = 
flowFileExceptionClass.getConstructor(String.class).newInstance(message);
+                            throw rte;
+                        } catch (InstantiationException | 
IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+                            if (logger.isErrorEnabled()) {
+                                logger.error("{} unexpected exception throwing 
DebugFlow exception: {}",
+                                        new Object[]{this, e});
+                            }
+                        }
+                    } else {
+                        flowFileCurrException = 0;
+                        curr_ff_resp.getNextCycle();
+                    }
+                }
+            }
+        }
+    }
+
+    private static Class<? extends RuntimeException> 
classNameToRuntimeExceptionClass(String name) {
+        Class<? extends RuntimeException> klass = null;
+        try {
+            Class<?> klass2 = Class.forName(name);
+            if (klass2 == RuntimeException.class || 
RuntimeException.class.isAssignableFrom(klass2)) {
+                //noinspection unchecked
+                klass = (Class<? extends RuntimeException>)klass2;
+            }
+        } catch (ClassNotFoundException e) {
+            klass = null;
+        }
+        return klass;
+    }
+
+    private enum FlowFileResponseState {
+        FF_SUCCESS_RESPONSE,
+        FF_FAILURE_RESPONSE,
+        FF_ROLLBACK_RESPONSE,
+        FF_YIELD_RESPONSE,
+        FF_PENALTY_RESPONSE,
+        FF_EXCEPTION_RESPONSE;
+
+        private FlowFileResponseState nextState;
+        static {
+            FF_SUCCESS_RESPONSE.nextState = FF_FAILURE_RESPONSE;
+            FF_FAILURE_RESPONSE.nextState = FF_ROLLBACK_RESPONSE;
+            FF_ROLLBACK_RESPONSE.nextState = FF_YIELD_RESPONSE;
+            FF_YIELD_RESPONSE.nextState = FF_PENALTY_RESPONSE;
+            FF_PENALTY_RESPONSE.nextState = FF_EXCEPTION_RESPONSE;
+            FF_EXCEPTION_RESPONSE.nextState = FF_SUCCESS_RESPONSE;
+        }
+        FlowFileResponseState next() {
+            return nextState;
+        }
+    }
+
+    private class FlowFileResponse {
+        private final AtomicReference<FlowFileResponseState> current = new 
AtomicReference<>();
+        FlowFileResponse() {
+            current.set(FlowFileResponseState.FF_SUCCESS_RESPONSE);
+        }
+        synchronized FlowFileResponseState state() {
+            return current.get();
+        }
+        synchronized void getNextCycle() {
+            current.set(current.get().next());
+        }
+        synchronized void reset() {
+            current.set(FlowFileResponseState.FF_SUCCESS_RESPONSE);
+        }
+    }
+
+    private enum NoFlowFileResponseState {
+        NO_FF_SKIP_RESPONSE,
+        NO_FF_EXCEPTION_RESPONSE,
+        NO_FF_YIELD_RESPONSE;
+
+        private NoFlowFileResponseState nextState;
+        static {
+            NO_FF_SKIP_RESPONSE.nextState = NO_FF_EXCEPTION_RESPONSE;
+            NO_FF_EXCEPTION_RESPONSE.nextState = NO_FF_YIELD_RESPONSE;
+            NO_FF_YIELD_RESPONSE.nextState = NO_FF_SKIP_RESPONSE;
+        }
+        NoFlowFileResponseState next() {
+            return nextState;
+        }
+    }
+
+    private class NoFlowFileResponse {
+        private final AtomicReference<NoFlowFileResponseState> current = new 
AtomicReference<>();
+        NoFlowFileResponse() {
+            current.set(NoFlowFileResponseState.NO_FF_SKIP_RESPONSE);
+        }
+        synchronized NoFlowFileResponseState state() {
+            return current.get();
+        }
+        synchronized void getNextCycle() {
+            current.set(current.get().next());
+        }
+        synchronized void reset() {
+            current.set(NoFlowFileResponseState.NO_FF_SKIP_RESPONSE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4723f8e2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a50fd86..bf95078 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,6 +18,7 @@ org.apache.nifi.processors.standard.CompressContent
 org.apache.nifi.processors.standard.ControlRate
 org.apache.nifi.processors.standard.ConvertCharacterSet
 org.apache.nifi.processors.standard.ConvertJSONToSQL
+org.apache.nifi.processors.standard.DebugFlow
 org.apache.nifi.processors.standard.DetectDuplicate
 org.apache.nifi.processors.standard.DistributeLoad
 org.apache.nifi.processors.standard.DuplicateFlowFile

http://git-wip-us.apache.org/repos/asf/nifi/blob/4723f8e2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DebugFlow/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DebugFlow/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DebugFlow/additionalDetails.html
new file mode 100644
index 0000000..f771d54
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DebugFlow/additionalDetails.html
@@ -0,0 +1,48 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8"/>
+        <title>DebugFlow</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css"/>
+    </head>
+
+    <body>
+        <!-- Processor Documentation 
================================================== -->
+        <p>
+            When triggered, the processor loops through the appropriate 
response list (based on whether or not it
+            received a FlowFile).  A response is produced the configured 
number of times for each pass through its
+            response list, as long as the processor is running.
+        </p><p>
+            Triggered by a FlowFile, the processor can produce the following 
responses.
+            <ol>
+                <li>transfer FlowFile to success relationship.</li>
+                <li>transfer FlowFile to failure relationship.</li>
+                <li>rollback the FlowFile without penalty.</li>
+                <li>rollback the FlowFile and yield the context.</li>
+                <li>rollback the FlowFile with penalty.</li>
+                <li>throw an exception.</li>
+            </ol>
+        </p><p>
+            Triggered without a FlowFile, the processor can produce the 
following responses.
+            <ol>
+                <li>do nothing and return.</li>
+                <li>throw an exception.</li>
+                <li>yield the context.</li>
+            </ol>
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4723f8e2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
new file mode 100644
index 0000000..8eb53aa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestDebugFlow {
+
+    private DebugFlow debugFlow;
+    private TestRunner runner;
+    private ProcessSession session;
+
+    private final Map<Integer, String> contents = new HashMap<>();
+    private final Map<Integer, Map<String, String>> attribs = new HashMap<>();
+    private Map<String, String> namesToContent = new HashMap<>();
+
+    @Rule
+    public final ExpectedException exception = ExpectedException.none();
+
+    @Before
+    public void setup() throws IOException {
+        for (int n = 0; n < 6; n++) {
+            String filename = "testFile" + (n + 1) + ".txt";
+            String content = "Hello World " + (n + 1) + "!";
+            contents.put(n, content);
+            attribs.put(n, new HashMap<String, String>());
+            attribs.get(n).put(CoreAttributes.FILENAME.key(), filename);
+            attribs.get(n).put(CoreAttributes.UUID.key(), "TESTING-FILE-" + (n 
+ 1) + "-TESTING");
+            namesToContent.put(filename, content);
+        }
+
+        debugFlow = new DebugFlow();
+        runner = TestRunners.newTestRunner(debugFlow);
+        session = runner.getProcessSessionFactory().createSession();
+
+        runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "0");
+        runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "0");
+        runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "0");
+        runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "0");
+        runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "0");
+        runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "0");
+
+        runner.setProperty(DebugFlow.NO_FF_SKIP_ITERATIONS, "0");
+        runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "0");
+        runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "0");
+    }
+
+    @Test
+    public void testGetSupportedPropertyDescriptors() throws Exception {
+        assertEquals(11, debugFlow.getPropertyDescriptors().size());
+    }
+
+    @Test
+    public void testGetRelationships() throws Exception {
+        assertEquals(2, debugFlow.getRelationships().size());
+    }
+
+    private boolean isInContents(byte[] content) {
+        for (Map.Entry entry : contents.entrySet()) {
+            if (((String)entry.getValue()).compareTo(new String(content)) == 
0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Test
+    public void testFlowFileSuccess() {
+        runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1");
+        runner.assertValid();
+
+        for (int n = 0; n < 6; n++) {
+            runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
+        }
+
+        runner.run(7);
+        runner.assertTransferCount(DebugFlow.REL_SUCCESS, 6);
+        runner.assertTransferCount(DebugFlow.REL_FAILURE, 0);
+
+        
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).toByteArray()));
+        
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(1).toByteArray()));
+        
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(2).toByteArray()));
+        
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(3).toByteArray()));
+        
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(4).toByteArray()));
+        
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(5).toByteArray()));
+    }
+
+    @Test
+    public void testFlowFileFailure() {
+        runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1");
+        runner.assertValid();
+
+        for (int n = 0; n < 6; n++) {
+            runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
+        }
+
+        runner.run(7);
+        runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0);
+        runner.assertTransferCount(DebugFlow.REL_FAILURE, 6);
+
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).assertContentEquals(contents.get(0));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(1).assertContentEquals(contents.get(1));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(2).assertContentEquals(contents.get(2));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(3).assertContentEquals(contents.get(3));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(4).assertContentEquals(contents.get(4));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(5).assertContentEquals(contents.get(5));
+    }
+
+    @Test
+    public void testFlowFileSuccessAndFailure() {
+        runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1");
+        runner.assertValid();
+
+        for (int n = 0; n < 6; n++) {
+            runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
+        }
+
+        runner.run(7);
+        runner.assertTransferCount(DebugFlow.REL_SUCCESS, 3);
+        runner.assertTransferCount(DebugFlow.REL_FAILURE, 3);
+
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).assertContentEquals(contents.get(0));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).assertContentEquals(contents.get(1));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(1).assertContentEquals(contents.get(2));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(1).assertContentEquals(contents.get(3));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(2).assertContentEquals(contents.get(4));
+        
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(2).assertContentEquals(contents.get(5));
+    }
+
+    @Test
+    public void testFlowFileRollback() throws IOException {
+        runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "1");
+        runner.assertValid();
+
+        for (int n = 0; n < 6; n++) {
+            runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
+        }
+
+        runner.run(7);
+        runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0);
+        runner.assertTransferCount(DebugFlow.REL_FAILURE, 0);
+
+        runner.assertQueueNotEmpty();
+        assertEquals(6, runner.getQueueSize().getObjectCount());
+
+        MockFlowFile ff1 = (MockFlowFile) session.get();
+        assertNotNull(ff1);
+        
assertEquals(namesToContent.get(ff1.getAttribute(CoreAttributes.FILENAME.key())),
 new String(ff1.toByteArray()));
+        session.rollback();
+    }
+
+    @Test
+    public void testFlowFileRollbackYield() {
+        runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "1");
+        runner.assertValid();
+
+        for (int n = 0; n < 6; n++) {
+            runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
+        }
+
+        runner.run(7);
+        runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0);
+        runner.assertTransferCount(DebugFlow.REL_FAILURE, 0);
+
+        runner.assertQueueNotEmpty();
+        assertEquals(6, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testFlowFileRollbackPenalty() {
+        runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "1");
+        runner.assertValid();
+
+        for (int n = 0; n < 6; n++) {
+            runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
+        }
+
+        runner.run(7);
+        runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0);
+        runner.assertTransferCount(DebugFlow.REL_FAILURE, 0);
+
+        runner.assertQueueNotEmpty();
+        assertEquals(6, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testFlowFileDefaultException() {
+        runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
+        runner.assertValid();
+
+        runner.enqueue(contents.get(0).getBytes(), attribs.get(0));
+
+        exception.expectMessage(CoreMatchers.containsString("forced by 
org.apache.nifi.processors.standard.DebugFlow"));
+        exception.expectCause(CoreMatchers.isA(RuntimeException.class));
+        runner.run(2);
+    }
+
+    @Test
+    public void testFlowFileNonDefaultException() {
+        runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, 
"java.lang.RuntimeException");
+        runner.assertValid();
+
+        runner.enqueue(contents.get(0).getBytes(), attribs.get(0));
+
+        exception.expectMessage(CoreMatchers.containsString("forced by 
org.apache.nifi.processors.standard.DebugFlow"));
+        exception.expectCause(CoreMatchers.isA(RuntimeException.class));
+        runner.run(2);
+    }
+
+    @Test
+    public void testFlowFileNPEException() {
+        runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, 
"java.lang.NullPointerException");
+        runner.assertValid();
+
+        runner.enqueue(contents.get(0).getBytes(), attribs.get(0));
+
+        exception.expectMessage(CoreMatchers.containsString("forced by 
org.apache.nifi.processors.standard.DebugFlow"));
+        exception.expectCause(CoreMatchers.isA(NullPointerException.class));
+        runner.run(2);
+    }
+
+    @Test
+    public void testFlowFileBadException() {
+        runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, 
"java.lang.NonExistantException");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testFlowFileExceptionRollover() {
+        runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "2");
+        runner.assertValid();
+
+        for (int n = 0; n < 6; n++) {
+            runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
+        }
+
+        exception.expectMessage(CoreMatchers.containsString("forced by 
org.apache.nifi.processors.standard.DebugFlow"));
+        exception.expectCause(CoreMatchers.isA(RuntimeException.class));
+        runner.run(8);
+    }
+
+    @Test
+    public void testFlowFileAll() {
+        runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
+        runner.assertValid();
+
+        for (int n = 0; n < 6; n++) {
+            runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
+        }
+
+        runner.run(5);
+        runner.assertTransferCount(DebugFlow.REL_SUCCESS, 1);
+        runner.assertTransferCount(DebugFlow.REL_FAILURE, 1);
+
+        assertEquals(4, runner.getQueueSize().getObjectCount());
+        
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).toByteArray()));
+        
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).toByteArray()));
+
+        runner.run(2);
+    }
+
+    @Test
+    public void testNoFlowFileZeroIterations() {
+        runner.run(4);
+    }
+
+    @Test
+    public void testNoFlowFileSkip() {
+        runner.setProperty(DebugFlow.NO_FF_SKIP_ITERATIONS, "1");
+        runner.assertValid();
+
+        runner.run(4);
+    }
+
+    @Test
+    public void testNoFlowFileDefaultException() {
+        runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1");
+        runner.assertValid();
+
+        exception.expectMessage(CoreMatchers.containsString("forced by 
org.apache.nifi.processors.standard.DebugFlow"));
+        exception.expectCause(CoreMatchers.isA(RuntimeException.class));
+        runner.run(3);
+    }
+
+    @Test
+    public void testNoFlowFileNonDefaultException() {
+        runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, 
"java.lang.RuntimeException");
+        runner.assertValid();
+
+        exception.expectMessage(CoreMatchers.containsString("forced by 
org.apache.nifi.processors.standard.DebugFlow"));
+        exception.expectCause(CoreMatchers.isA(RuntimeException.class));
+        runner.run(3);
+    }
+
+    @Test
+    public void testNoFlowFileOtherException() {
+        runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, 
"java.lang.NullPointerException");
+        runner.assertValid();
+
+        exception.expectMessage(CoreMatchers.containsString("forced by 
org.apache.nifi.processors.standard.DebugFlow"));
+        exception.expectCause(CoreMatchers.isA(NullPointerException.class));
+        runner.run(3);
+    }
+
+    @Test
+    public void testNoFlowFileBadException() {
+        runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1");
+        runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, 
"java.lang.NonExistantException");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testNoFlowFileYield() {
+        runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "1");
+        runner.assertValid();
+
+        runner.run(4);
+    }
+}

Reply via email to