Repository: nifi
Updated Branches:
  refs/heads/master 9de73fbe1 -> f394c874e


http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index 95d0b54..5838f37 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -318,6 +318,19 @@ public class StandardProcessorDAO extends ComponentDAO 
implements ProcessorDAO {
     }
 
     @Override
+    public void verifyTerminate(final String processorId) {
+        final ProcessorNode processor = locateProcessor(processorId);
+        processor.verifyCanTerminate();
+    }
+
+    @Override
+    public void terminate(final String processorId) {
+        final ProcessorNode processor = locateProcessor(processorId);
+        processor.getProcessGroup().terminateProcessor(processor);
+    }
+
+
+    @Override
     public void verifyUpdate(final ProcessorDTO processorDTO) {
         verifyUpdate(locateProcessor(processorDTO.getId()), processorDTO);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
index 6e55157..93047b7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
@@ -206,7 +206,22 @@ public class DebugFlow extends AbstractProcessor {
         .allowableValues("true", "false")
         .defaultValue("false")
         .build();
-
+    static final PropertyDescriptor ON_TRIGGER_SLEEP_TIME = new 
PropertyDescriptor.Builder()
+        .name("OnTrigger Pause Time")
+        .description("Specifies how long the processor should sleep in the 
onTrigger() method, so that the processor can be forced to take a long time to 
perform its task")
+        .required(true)
+        .defaultValue("0 sec")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
+    static final PropertyDescriptor IGNORE_INTERRUPTS = new 
PropertyDescriptor.Builder()
+        .name("Ignore Interrupts When Paused")
+        .description("If the Processor's thread(s) are sleeping (due to one of 
the \"Pause Time\" properties above), and the thread is interrupted, "
+            + "this indicates whether the Processor should ignore the 
interrupt and continue sleeping or if it should allow itself to be 
interrupted.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
 
     private volatile Integer flowFileMaxSuccess = 0;
     private volatile Integer flowFileMaxFailure = 0;
@@ -273,6 +288,8 @@ public class DebugFlow extends AbstractProcessor {
                 propList.add(ON_UNSCHEDULED_FAIL);
                 propList.add(ON_STOPPED_SLEEP_TIME);
                 propList.add(ON_STOPPED_FAIL);
+                propList.add(ON_TRIGGER_SLEEP_TIME);
+                propList.add(IGNORE_INTERRUPTS);
 
                 propertyDescriptors.compareAndSet(null, 
Collections.unmodifiableList(propList));
             }
@@ -297,25 +314,42 @@ public class DebugFlow extends AbstractProcessor {
         flowFileExceptionClass = (Class<? extends RuntimeException>) 
Class.forName(context.getProperty(FF_EXCEPTION_CLASS).toString());
         noFlowFileExceptionClass = (Class<? extends RuntimeException>) 
Class.forName(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
 
-        
sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+        
sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
+            context.getProperty(IGNORE_INTERRUPTS).asBoolean());
         fail(context.getProperty(ON_SCHEDULED_FAIL).asBoolean(), 
OnScheduled.class);
     }
 
     @OnUnscheduled
     public void onUnscheduled(final ProcessContext context) throws 
InterruptedException {
-        
sleep(context.getProperty(ON_UNSCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+        
sleep(context.getProperty(ON_UNSCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
+            context.getProperty(IGNORE_INTERRUPTS).asBoolean());
         fail(context.getProperty(ON_UNSCHEDULED_FAIL).asBoolean(), 
OnUnscheduled.class);
     }
 
     @OnStopped
     public void onStopped(final ProcessContext context) throws 
InterruptedException {
-        
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+        
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
+            context.getProperty(IGNORE_INTERRUPTS).asBoolean());
+
         fail(context.getProperty(ON_STOPPED_FAIL).asBoolean(), 
OnStopped.class);
     }
 
-    private void sleep(final long millis) throws InterruptedException {
+    private void sleep(final long millis, final boolean ignoreInterrupts) 
throws InterruptedException {
         if (millis > 0L) {
-            Thread.sleep(millis);
+            final long endSleep = System.currentTimeMillis() + millis;
+            while (System.currentTimeMillis() < endSleep) {
+                // Use Math.max(1, <sleep duration>) here in case
+                // System.currentTimeMillis() has changed since the check above
+                // and subtracting it from endSleep would now result in a 
value <= 0.
+                try {
+                    Thread.sleep(Math.max(1L, endSleep - 
System.currentTimeMillis()));
+                } catch (final InterruptedException ie) {
+                    if (!ignoreInterrupts) {
+                        Thread.currentThread().interrupt();
+                        throw ie;
+                    }
+                }
+            }
         }
     }
 
@@ -332,164 +366,181 @@ public class DebugFlow extends AbstractProcessor {
 
         FlowFile ff = session.get();
 
-        // Make up to 2 passes to allow rollover from last cycle to first.
-        // (This could be "while(true)" since responses should break out  if 
selected, but this
-        //  prevents endless loops in the event of unexpected errors or future 
changes.)
-        for (int pass = 2; pass > 0; pass--) {
-            if (ff == null) {
-                if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
-                    if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
-                        noFlowFileCurrSkip += 1;
-                        logger.info("DebugFlow skipping with no flow file");
-                        return;
-                    } else {
-                        noFlowFileCurrSkip = 0;
-                        curr_noff_resp.getNextCycle();
+        try {
+            // Make up to 2 passes to allow rollover from last cycle to first.
+            // (This could be "while(true)" since responses should break out  
if selected, but this
+            //  prevents endless loops in the event of unexpected errors or 
future changes.)
+            for (int pass = 2; pass > 0; pass--) {
+                if (ff == null) {
+                    if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
+                        if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
+                            noFlowFileCurrSkip += 1;
+                            logger.info("DebugFlow skipping with no flow 
file");
+                            return;
+                        } else {
+                            noFlowFileCurrSkip = 0;
+                            curr_noff_resp.getNextCycle();
+                        }
                     }
-                }
-                if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
-                    if (noFlowFileCurrException < noFlowFileMaxException) {
-                        noFlowFileCurrException += 1;
-                        logger.info("DebugFlow throwing NPE with no flow 
file");
-                        String message = "forced by " + 
this.getClass().getName();
-                        RuntimeException rte;
-                        try {
-                            rte = 
noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
-                            throw rte;
-                        } catch (InstantiationException | 
IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-                            if (logger.isErrorEnabled()) {
-                                logger.error("{} unexpected exception throwing 
DebugFlow exception: {}",
-                                        new Object[]{this, e});
+
+                    if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
+                        if (noFlowFileCurrException < noFlowFileMaxException) {
+                            noFlowFileCurrException += 1;
+                            logger.info("DebugFlow throwing NPE with no flow 
file");
+                            String message = "forced by " + 
this.getClass().getName();
+                            RuntimeException rte;
+                            try {
+                                rte = 
noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
+                                throw rte;
+                            } catch (InstantiationException | 
IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+                                if (logger.isErrorEnabled()) {
+                                    logger.error("{} unexpected exception 
throwing DebugFlow exception: {}",
+                                        new Object[] {this, e});
+                                }
                             }
+                        } else {
+                            noFlowFileCurrException = 0;
+                            curr_noff_resp.getNextCycle();
                         }
-                    } else {
-                        noFlowFileCurrException = 0;
-                        curr_noff_resp.getNextCycle();
                     }
-                }
-                if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_YIELD_RESPONSE) {
-                    if (noFlowFileCurrYield < noFlowFileMaxYield) {
-                        noFlowFileCurrYield += 1;
-                        logger.info("DebugFlow yielding with no flow file");
-                        context.yield();
-                        break;
-                    } else {
-                        noFlowFileCurrYield = 0;
-                        curr_noff_resp.getNextCycle();
+
+                    if (curr_noff_resp.state() == 
NoFlowFileResponseState.NO_FF_YIELD_RESPONSE) {
+                        if (noFlowFileCurrYield < noFlowFileMaxYield) {
+                            noFlowFileCurrYield += 1;
+                            logger.info("DebugFlow yielding with no flow 
file");
+                            context.yield();
+                            break;
+                        } else {
+                            noFlowFileCurrYield = 0;
+                            curr_noff_resp.getNextCycle();
+                        }
                     }
-                }
-                return;
-            } else {
-                final int writeIterations = 
context.getProperty(WRITE_ITERATIONS).asInteger();
-                if (writeIterations > 0 && pass == 1) {
-                    final Random random = new Random();
-
-                    for (int i = 0; i < writeIterations; i++) {
-                        final byte[] data = new 
byte[context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue()];
-                        random.nextBytes(data);
-
-                        ff = session.write(ff, new OutputStreamCallback() {
-                            @Override
-                            public void process(final OutputStream out) throws 
IOException {
-                                out.write(data);
-                            }
-                        });
+
+                    return;
+                } else {
+                    final int writeIterations = 
context.getProperty(WRITE_ITERATIONS).asInteger();
+                    if (writeIterations > 0 && pass == 1) {
+                        final Random random = new Random();
+
+                        for (int i = 0; i < writeIterations; i++) {
+                            final byte[] data = new 
byte[context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue()];
+                            random.nextBytes(data);
+
+                            ff = session.write(ff, new OutputStreamCallback() {
+                                @Override
+                                public void process(final OutputStream out) 
throws IOException {
+                                    out.write(data);
+                                }
+                            });
+                        }
                     }
-                }
 
-                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_SUCCESS_RESPONSE) {
-                    if (flowFileCurrSuccess < flowFileMaxSuccess) {
-                        flowFileCurrSuccess += 1;
-                        logger.info("DebugFlow transferring to success file={} 
UUID={}",
-                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
-                                        
ff.getAttribute(CoreAttributes.UUID.key())});
-                        session.transfer(ff, REL_SUCCESS);
-                        session.commit();
-                        break;
-                    } else {
-                        flowFileCurrSuccess = 0;
-                        curr_ff_resp.getNextCycle();
+                    if (curr_ff_resp.state() == 
FlowFileResponseState.FF_SUCCESS_RESPONSE) {
+                        if (flowFileCurrSuccess < flowFileMaxSuccess) {
+                            flowFileCurrSuccess += 1;
+                            logger.info("DebugFlow transferring to success 
file={} UUID={}",
+                                new Object[] 
{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                    
ff.getAttribute(CoreAttributes.UUID.key())});
+                            session.transfer(ff, REL_SUCCESS);
+                            break;
+                        } else {
+                            flowFileCurrSuccess = 0;
+                            curr_ff_resp.getNextCycle();
+                        }
                     }
-                }
-                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_FAILURE_RESPONSE) {
-                    if (flowFileCurrFailure < flowFileMaxFailure) {
-                        flowFileCurrFailure += 1;
-                        logger.info("DebugFlow transferring to failure file={} 
UUID={}",
-                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
-                                        
ff.getAttribute(CoreAttributes.UUID.key())});
-                        session.transfer(ff, REL_FAILURE);
-                        session.commit();
-                        break;
-                    } else {
-                        flowFileCurrFailure = 0;
-                        curr_ff_resp.getNextCycle();
+
+                    if (curr_ff_resp.state() == 
FlowFileResponseState.FF_FAILURE_RESPONSE) {
+                        if (flowFileCurrFailure < flowFileMaxFailure) {
+                            flowFileCurrFailure += 1;
+                            logger.info("DebugFlow transferring to failure 
file={} UUID={}",
+                                new Object[] 
{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                    
ff.getAttribute(CoreAttributes.UUID.key())});
+                            session.transfer(ff, REL_FAILURE);
+                            break;
+                        } else {
+                            flowFileCurrFailure = 0;
+                            curr_ff_resp.getNextCycle();
+                        }
                     }
-                }
-                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_ROLLBACK_RESPONSE) {
-                    if (flowFileCurrRollback < flowFileMaxRollback) {
-                        flowFileCurrRollback += 1;
-                        logger.info("DebugFlow rolling back (no penalty) 
file={} UUID={}",
-                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
-                                        
ff.getAttribute(CoreAttributes.UUID.key())});
-                        session.rollback();
-                        session.commit();
-                        break;
-                    } else {
-                        flowFileCurrRollback = 0;
-                        curr_ff_resp.getNextCycle();
+
+                    if (curr_ff_resp.state() == 
FlowFileResponseState.FF_ROLLBACK_RESPONSE) {
+                        if (flowFileCurrRollback < flowFileMaxRollback) {
+                            flowFileCurrRollback += 1;
+                            logger.info("DebugFlow rolling back (no penalty) 
file={} UUID={}",
+                                new Object[] 
{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                    
ff.getAttribute(CoreAttributes.UUID.key())});
+                            session.rollback();
+                            break;
+                        } else {
+                            flowFileCurrRollback = 0;
+                            curr_ff_resp.getNextCycle();
+                        }
                     }
-                }
-                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_YIELD_RESPONSE) {
-                    if (flowFileCurrYield < flowFileMaxYield) {
-                        flowFileCurrYield += 1;
-                        logger.info("DebugFlow yielding file={} UUID={}",
-                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
-                                        
ff.getAttribute(CoreAttributes.UUID.key())});
-                        session.rollback();
-                        context.yield();
-                        return;
-                    } else {
-                        flowFileCurrYield = 0;
-                        curr_ff_resp.getNextCycle();
+
+                    if (curr_ff_resp.state() == 
FlowFileResponseState.FF_YIELD_RESPONSE) {
+                        if (flowFileCurrYield < flowFileMaxYield) {
+                            flowFileCurrYield += 1;
+                            logger.info("DebugFlow yielding file={} UUID={}",
+                                new Object[] 
{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                    
ff.getAttribute(CoreAttributes.UUID.key())});
+                            session.rollback();
+                            context.yield();
+                            return;
+                        } else {
+                            flowFileCurrYield = 0;
+                            curr_ff_resp.getNextCycle();
+                        }
                     }
-                }
-                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_PENALTY_RESPONSE) {
-                    if (flowFileCurrPenalty < flowFileMaxPenalty) {
-                        flowFileCurrPenalty += 1;
-                        logger.info("DebugFlow rolling back (with penalty) 
file={} UUID={}",
-                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
-                                        
ff.getAttribute(CoreAttributes.UUID.key())});
-                        session.rollback(true);
-                        session.commit();
-                        break;
-                    } else {
-                        flowFileCurrPenalty = 0;
-                        curr_ff_resp.getNextCycle();
+
+                    if (curr_ff_resp.state() == 
FlowFileResponseState.FF_PENALTY_RESPONSE) {
+                        if (flowFileCurrPenalty < flowFileMaxPenalty) {
+                            flowFileCurrPenalty += 1;
+                            logger.info("DebugFlow rolling back (with penalty) 
file={} UUID={}",
+                                new Object[] 
{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                    
ff.getAttribute(CoreAttributes.UUID.key())});
+                            session.rollback(true);
+                            break;
+                        } else {
+                            flowFileCurrPenalty = 0;
+                            curr_ff_resp.getNextCycle();
+                        }
                     }
-                }
-                if (curr_ff_resp.state() == 
FlowFileResponseState.FF_EXCEPTION_RESPONSE) {
-                    if (flowFileCurrException < flowFileMaxException) {
-                        flowFileCurrException += 1;
-                        String message = "forced by " + 
this.getClass().getName();
-                        logger.info("DebugFlow throwing NPE file={} UUID={}",
-                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
-                                        
ff.getAttribute(CoreAttributes.UUID.key())});
-                        RuntimeException rte;
-                        try {
-                            rte = 
flowFileExceptionClass.getConstructor(String.class).newInstance(message);
-                            throw rte;
-                        } catch (InstantiationException | 
IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-                            if (logger.isErrorEnabled()) {
-                                logger.error("{} unexpected exception throwing 
DebugFlow exception: {}",
-                                        new Object[]{this, e});
+
+                    if (curr_ff_resp.state() == 
FlowFileResponseState.FF_EXCEPTION_RESPONSE) {
+                        if (flowFileCurrException < flowFileMaxException) {
+                            flowFileCurrException += 1;
+                            String message = "forced by " + 
this.getClass().getName();
+                            logger.info("DebugFlow throwing NPE file={} 
UUID={}",
+                                new Object[] 
{ff.getAttribute(CoreAttributes.FILENAME.key()),
+                                    
ff.getAttribute(CoreAttributes.UUID.key())});
+                            RuntimeException rte;
+                            try {
+                                rte = 
flowFileExceptionClass.getConstructor(String.class).newInstance(message);
+                                throw rte;
+                            } catch (InstantiationException | 
IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+                                if (logger.isErrorEnabled()) {
+                                    logger.error("{} unexpected exception 
throwing DebugFlow exception: {}",
+                                        new Object[] {this, e});
+                                }
                             }
+                        } else {
+                            flowFileCurrException = 0;
+                            curr_ff_resp.getNextCycle();
                         }
-                    } else {
-                        flowFileCurrException = 0;
-                        curr_ff_resp.getNextCycle();
                     }
                 }
             }
+        } finally {
+            final long sleepMillis = 
context.getProperty(ON_TRIGGER_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+
+            try {
+                if (sleepMillis > 0) {
+                    sleep(sleepMillis, 
context.getProperty(IGNORE_INTERRUPTS).asBoolean());
+                    getLogger().info("DebugFlow finishes sleeping at 
completion of its onTrigger() method");
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 

Reply via email to