[
https://issues.apache.org/jira/browse/NIFI-1829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332383#comment-15332383
]
ASF GitHub Bot commented on NIFI-1829:
--------------------------------------
Github user mosermw commented on a diff in the pull request:
https://github.com/apache/nifi/pull/458#discussion_r67230370
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
---
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.http.annotation.ThreadSafe;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+@ThreadSafe()
+@EventDriven()
+@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
+@CapabilityDescription("The DebugFlow processor aids testing and debugging
the FlowFile framework by allowing various "
+ + "responses to be explicitly triggered in response to the receipt
of a FlowFile or a timer event without a "
+ + "FlowFile if using timer or cron based scheduling. It can force
responses needed to exercise or test "
+ + "various failure modes that can occur when a processor runs.\n"
+ + "\n"
+ + "When triggered, the processor loops through the appropriate
response list (based on whether or not it "
+ + "received a FlowFile). A response is produced the configured
number of times for each pass through its"
+ + "response list, as long as the processor is running.\n"
+ + "\n"
+ + "Triggered by a FlowFile, the processor can produce the
following responses."
+ + " 1. transfer FlowFile to success relationship.\n"
+ + " 2. transfer FlowFile to failure relationship.\n"
+ + " 3. rollback the FlowFile without penalty.\n"
+ + " 4. rollback the FlowFile and yield the context.\n"
+ + " 5. rollback the FlowFile with penalty.\n"
+ + " 6. throw an exception.\n"
+ + "\n"
+ + "Triggered without a FlowFile, the processor can produce the
following responses."
+ + " 1. do nothing and return.\n"
+ + " 2. throw an exception.\n"
+ + " 3. yield the context.\n")
+public class DebugFlow extends AbstractProcessor {
+
+ private final AtomicReference<Set<Relationship>> relationships = new
AtomicReference<>();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles processed successfully.")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that failed to process.")
+ .build();
+
+ private final AtomicReference<List<PropertyDescriptor>>
propertyDescriptors = new AtomicReference<>();
+
+ static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("FlowFile Success Iterations")
+ .description("Number of FlowFiles to forward to success
relationship.")
+ .required(true)
+ .defaultValue("1")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor FF_FAILURE_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("FlowFile Failure Iterations")
+ .description("Number of FlowFiles to forward to failure
relationship.")
+ .required(true)
+ .defaultValue("0")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("FlowFile Rollback Iterations")
+ .description("Number of FlowFiles to roll back (without
penalty).")
+ .required(true)
+ .defaultValue("0")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("FlowFile Rollback Yield Iterations")
+ .description("Number of FlowFiles to roll back and yield.")
+ .required(true)
+ .defaultValue("0")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("FlowFile Rollback Penalty Iterations")
+ .description("Number of FlowFiles to roll back with penalty.")
+ .required(true)
+ .defaultValue("0")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("FlowFile Exception Iterations")
+ .description("Number of FlowFiles to throw exception.")
+ .required(true)
+ .defaultValue("0")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor FF_EXCEPTION_CLASS = new
PropertyDescriptor.Builder()
+ .name("FlowFile Exception Class")
+ .description("Exception class to be thrown (must extend
java.lang.RuntimeException).")
+ .required(true)
+ .defaultValue("java.lang.RuntimeException")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String
input, ValidationContext context) {
+ Class<? extends RuntimeException> klass =
classNameToRuntimeExceptionClass(input);
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(klass != null &&
(RuntimeException.class.isAssignableFrom(klass)))
+ .explanation(subject + " class must exist and
extend java.lang.RuntimeException")
+ .build();
+ }
+ })
+ .build();
+
+ static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("No FlowFile Skip Iterations")
+ .description("Number of times to skip onTrigger if no
FlowFile.")
+ .required(true)
+ .defaultValue("1")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("No FlowFile Exception Iterations")
+ .description("Number of times to throw NPE exception if no
FlowFile.")
+ .required(true)
+ .defaultValue("0")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new
PropertyDescriptor.Builder()
+ .name("No FlowFile Yield Iterations")
+ .description("Number of times to yield if no FlowFile.")
+ .required(true)
+ .defaultValue("0")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+ static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new
PropertyDescriptor.Builder()
+ .name("No FlowFile Exception Class")
+ .description("Exception class to be thrown if no FlowFile
(must extend java.lang.RuntimeException).")
+ .required(true)
+ .defaultValue("java.lang.RuntimeException")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String
input, ValidationContext context) {
+ Class<? extends RuntimeException> klass =
classNameToRuntimeExceptionClass(input);
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(klass != null &&
(RuntimeException.class.isAssignableFrom(klass)))
+ .explanation(subject + " class must exist and
extend java.lang.RuntimeException")
+ .build();
+ }
+ })
+ .build();
+
+
+ private volatile Integer flowFileMaxSuccess = 0;
+ private volatile Integer flowFileMaxFailure = 0;
+ private volatile Integer flowFileMaxRollback = 0;
+ private volatile Integer flowFileMaxYield = 0;
+ private volatile Integer flowFileMaxPenalty = 0;
+ private volatile Integer flowFileMaxException = 0;
+
+ private volatile Integer noFlowFileMaxSkip = 0;
+ private volatile Integer noFlowFileMaxException = 0;
+ private volatile Integer noFlowFileMaxYield = 0;
+
+ private volatile Integer flowFileCurrSuccess = 0;
+ private volatile Integer flowFileCurrFailure = 0;
+ private volatile Integer flowFileCurrRollback = 0;
+ private volatile Integer flowFileCurrYield = 0;
+ private volatile Integer flowFileCurrPenalty = 0;
+ private volatile Integer flowFileCurrException = 0;
+
+ private volatile Integer noFlowFileCurrSkip = 0;
+ private volatile Integer noFlowFileCurrException = 0;
+ private volatile Integer noFlowFileCurrYield = 0;
+
+ private volatile Class<? extends RuntimeException>
flowFileExceptionClass = null;
+ private volatile Class<? extends RuntimeException>
noFlowFileExceptionClass= null;
+
+ private final FlowFileResponse curr_ff_resp = new FlowFileResponse();
+ private final NoFlowFileResponse curr_noff_resp = new
NoFlowFileResponse();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ synchronized (relationships) {
+ if (relationships.get() == null) {
+ HashSet<Relationship> relSet = new HashSet<>();
+ relSet.add(REL_SUCCESS);
+ relSet.add(REL_FAILURE);
+ relationships.compareAndSet(null,
Collections.unmodifiableSet(relSet));
+ }
+ return relationships.get();
+ }
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ synchronized (propertyDescriptors) {
+ if (propertyDescriptors.get() == null) {
+ ArrayList<PropertyDescriptor> propList = new ArrayList<>();
+ propList.add(FF_SUCCESS_ITERATIONS);
+ propList.add(FF_FAILURE_ITERATIONS);
+ propList.add(FF_ROLLBACK_ITERATIONS);
+ propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
+ propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
+ propList.add(FF_EXCEPTION_ITERATIONS);
+ propList.add(FF_EXCEPTION_CLASS);
+ propList.add(NO_FF_EXCEPTION_ITERATIONS);
+ propList.add(NO_FF_YIELD_ITERATIONS);
+ propList.add(NO_FF_SKIP_ITERATIONS);
+ propList.add(NO_FF_EXCEPTION_CLASS);
+ propertyDescriptors.compareAndSet(null,
Collections.unmodifiableList(propList));
+ }
+ return propertyDescriptors.get();
+ }
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ flowFileMaxSuccess =
context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
+ flowFileMaxFailure =
context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
+ flowFileMaxYield =
context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
+ flowFileMaxRollback =
context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
+ flowFileMaxPenalty =
context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
+ flowFileMaxException =
context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
+ noFlowFileMaxException =
context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
+ noFlowFileMaxYield =
context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
+ noFlowFileMaxSkip =
context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
+ curr_ff_resp.reset();
+ curr_noff_resp.reset();
+ flowFileExceptionClass =
classNameToRuntimeExceptionClass(context.getProperty(FF_EXCEPTION_CLASS).toString());
+ noFlowFileExceptionClass =
classNameToRuntimeExceptionClass(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final ComponentLog logger = getLogger();
+
+ FlowFile ff = session.get();
+
+ // Make up to 2 passes to allow rollover from last cycle to first.
+ // (This could be "while(true)" since responses should break out
if selected, but this
+ // prevents endless loops in the event of unexpected errors or
future changes.)
+ int pass = 2;
+ while (pass > 0) {
+ pass -= 1;
+ if (ff == null) {
+ if (curr_noff_resp.state() ==
NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
+ if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
+ noFlowFileCurrSkip += 1;
+ logger.info("DebugFlow skipping with no flow
file");
+ return;
+ } else {
+ noFlowFileCurrSkip = 0;
+ curr_noff_resp.getNextCycle();
+ }
+ }
+ if (curr_noff_resp.state() ==
NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
+ if (noFlowFileCurrException < noFlowFileMaxException) {
+ noFlowFileCurrException += 1;
+ logger.info("DebugFlow throwing NPE with no flow
file");
+ String message = "forced by " +
this.getClass().getName();
+ RuntimeException rte;
+ try {
+ rte =
noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
+ throw rte;
+ } catch (InstantiationException |
IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ e.printStackTrace();
--- End diff --
Eeek e.printStackTrace() should be changed to use logger.
> Create FlowDebugger processor
> -----------------------------
>
> Key: NIFI-1829
> URL: https://issues.apache.org/jira/browse/NIFI-1829
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Joe Skora
> Assignee: Joe Skora
> Priority: Minor
> Fix For: 1.0.0, 0.7.0
>
>
> The FlowDebugger processor allows a variety of Processor responses and
> failures to be simulated for testing, debugging, and troubleshooting the
> framework.
> Responses it can produce on receipt of a flowfile include Transfer to
> Success, Transfer to Failure, Rollback without Penalty, Rollback and Yield,
> Rollback with Penalty, and Throw an Exception. The properties indicate how
> many times that response should be thrown, for example if configured with
> Success=10 and Failure=40, it will transfer the first 10 flowfiles to
> Success, transfer the next 40 to Failure, and then repeat.
> Similarly, responses it can produce when triggered without a flowfile include
> Throw an Exception, Yield, and Return (do nothing).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)