Repository: nifi Updated Branches: refs/heads/master 9de73fbe1 -> f394c874e
http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index 95d0b54..5838f37 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -318,6 +318,19 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { } @Override + public void verifyTerminate(final String processorId) { + final ProcessorNode processor = locateProcessor(processorId); + processor.verifyCanTerminate(); + } + + @Override + public void terminate(final String processorId) { + final ProcessorNode processor = locateProcessor(processorId); + processor.getProcessGroup().terminateProcessor(processor); + } + + + @Override public void verifyUpdate(final ProcessorDTO processorDTO) { verifyUpdate(locateProcessor(processorDTO.getId()), processorDTO); } http://git-wip-us.apache.org/repos/asf/nifi/blob/f394c874/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java index 6e55157..93047b7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java @@ -206,7 +206,22 @@ public class DebugFlow extends AbstractProcessor { .allowableValues("true", "false") .defaultValue("false") .build(); - + static final PropertyDescriptor ON_TRIGGER_SLEEP_TIME = new PropertyDescriptor.Builder() + .name("OnTrigger Pause Time") + .description("Specifies how long the processor should sleep in the onTrigger() method, so that the processor can be forced to take a long time to perform its task") + .required(true) + .defaultValue("0 sec") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + static final PropertyDescriptor IGNORE_INTERRUPTS = new PropertyDescriptor.Builder() + .name("Ignore Interrupts When Paused") + .description("If the Processor's thread(s) are sleeping (due to one of the \"Pause Time\" properties above), and the thread is interrupted, " + + "this indicates whether the Processor should ignore the interrupt and continue sleeping or if it should allow itself to be interrupted.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); private volatile Integer flowFileMaxSuccess = 0; private volatile Integer flowFileMaxFailure = 0; @@ -273,6 +288,8 @@ public class DebugFlow extends AbstractProcessor { propList.add(ON_UNSCHEDULED_FAIL); propList.add(ON_STOPPED_SLEEP_TIME); propList.add(ON_STOPPED_FAIL); + propList.add(ON_TRIGGER_SLEEP_TIME); + propList.add(IGNORE_INTERRUPTS); propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList)); } @@ -297,25 +314,42 @@ public class DebugFlow extends AbstractProcessor { flowFileExceptionClass = (Class<? extends RuntimeException>) Class.forName(context.getProperty(FF_EXCEPTION_CLASS).toString()); noFlowFileExceptionClass = (Class<? extends RuntimeException>) Class.forName(context.getProperty(NO_FF_EXCEPTION_CLASS).toString()); - sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS)); + sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS), + context.getProperty(IGNORE_INTERRUPTS).asBoolean()); fail(context.getProperty(ON_SCHEDULED_FAIL).asBoolean(), OnScheduled.class); } @OnUnscheduled public void onUnscheduled(final ProcessContext context) throws InterruptedException { - sleep(context.getProperty(ON_UNSCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS)); + sleep(context.getProperty(ON_UNSCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS), + context.getProperty(IGNORE_INTERRUPTS).asBoolean()); fail(context.getProperty(ON_UNSCHEDULED_FAIL).asBoolean(), OnUnscheduled.class); } @OnStopped public void onStopped(final ProcessContext context) throws InterruptedException { - sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS)); + sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS), + context.getProperty(IGNORE_INTERRUPTS).asBoolean()); + fail(context.getProperty(ON_STOPPED_FAIL).asBoolean(), OnStopped.class); } - private void sleep(final long millis) throws InterruptedException { + private void sleep(final long millis, final boolean ignoreInterrupts) throws InterruptedException { if (millis > 0L) { - Thread.sleep(millis); + final long endSleep = System.currentTimeMillis() + millis; + while (System.currentTimeMillis() < endSleep) { + // Use Math.max(1, <sleep duration>) here in case + // System.currentTimeMillis() has changed since the check above + // and subtracting it from endSleep would now result in a value <= 0. + try { + Thread.sleep(Math.max(1L, endSleep - System.currentTimeMillis())); + } catch (final InterruptedException ie) { + if (!ignoreInterrupts) { + Thread.currentThread().interrupt(); + throw ie; + } + } + } } } @@ -332,164 +366,181 @@ public class DebugFlow extends AbstractProcessor { FlowFile ff = session.get(); - // Make up to 2 passes to allow rollover from last cycle to first. - // (This could be "while(true)" since responses should break out if selected, but this - // prevents endless loops in the event of unexpected errors or future changes.) - for (int pass = 2; pass > 0; pass--) { - if (ff == null) { - if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) { - if (noFlowFileCurrSkip < noFlowFileMaxSkip) { - noFlowFileCurrSkip += 1; - logger.info("DebugFlow skipping with no flow file"); - return; - } else { - noFlowFileCurrSkip = 0; - curr_noff_resp.getNextCycle(); + try { + // Make up to 2 passes to allow rollover from last cycle to first. + // (This could be "while(true)" since responses should break out if selected, but this + // prevents endless loops in the event of unexpected errors or future changes.) + for (int pass = 2; pass > 0; pass--) { + if (ff == null) { + if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) { + if (noFlowFileCurrSkip < noFlowFileMaxSkip) { + noFlowFileCurrSkip += 1; + logger.info("DebugFlow skipping with no flow file"); + return; + } else { + noFlowFileCurrSkip = 0; + curr_noff_resp.getNextCycle(); + } } - } - if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) { - if (noFlowFileCurrException < noFlowFileMaxException) { - noFlowFileCurrException += 1; - logger.info("DebugFlow throwing NPE with no flow file"); - String message = "forced by " + this.getClass().getName(); - RuntimeException rte; - try { - rte = noFlowFileExceptionClass.getConstructor(String.class).newInstance(message); - throw rte; - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - if (logger.isErrorEnabled()) { - logger.error("{} unexpected exception throwing DebugFlow exception: {}", - new Object[]{this, e}); + + if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) { + if (noFlowFileCurrException < noFlowFileMaxException) { + noFlowFileCurrException += 1; + logger.info("DebugFlow throwing NPE with no flow file"); + String message = "forced by " + this.getClass().getName(); + RuntimeException rte; + try { + rte = noFlowFileExceptionClass.getConstructor(String.class).newInstance(message); + throw rte; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + if (logger.isErrorEnabled()) { + logger.error("{} unexpected exception throwing DebugFlow exception: {}", + new Object[] {this, e}); + } } + } else { + noFlowFileCurrException = 0; + curr_noff_resp.getNextCycle(); } - } else { - noFlowFileCurrException = 0; - curr_noff_resp.getNextCycle(); } - } - if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_YIELD_RESPONSE) { - if (noFlowFileCurrYield < noFlowFileMaxYield) { - noFlowFileCurrYield += 1; - logger.info("DebugFlow yielding with no flow file"); - context.yield(); - break; - } else { - noFlowFileCurrYield = 0; - curr_noff_resp.getNextCycle(); + + if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_YIELD_RESPONSE) { + if (noFlowFileCurrYield < noFlowFileMaxYield) { + noFlowFileCurrYield += 1; + logger.info("DebugFlow yielding with no flow file"); + context.yield(); + break; + } else { + noFlowFileCurrYield = 0; + curr_noff_resp.getNextCycle(); + } } - } - return; - } else { - final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger(); - if (writeIterations > 0 && pass == 1) { - final Random random = new Random(); - - for (int i = 0; i < writeIterations; i++) { - final byte[] data = new byte[context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue()]; - random.nextBytes(data); - - ff = session.write(ff, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(data); - } - }); + + return; + } else { + final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger(); + if (writeIterations > 0 && pass == 1) { + final Random random = new Random(); + + for (int i = 0; i < writeIterations; i++) { + final byte[] data = new byte[context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue()]; + random.nextBytes(data); + + ff = session.write(ff, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(data); + } + }); + } } - } - if (curr_ff_resp.state() == FlowFileResponseState.FF_SUCCESS_RESPONSE) { - if (flowFileCurrSuccess < flowFileMaxSuccess) { - flowFileCurrSuccess += 1; - logger.info("DebugFlow transferring to success file={} UUID={}", - new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), - ff.getAttribute(CoreAttributes.UUID.key())}); - session.transfer(ff, REL_SUCCESS); - session.commit(); - break; - } else { - flowFileCurrSuccess = 0; - curr_ff_resp.getNextCycle(); + if (curr_ff_resp.state() == FlowFileResponseState.FF_SUCCESS_RESPONSE) { + if (flowFileCurrSuccess < flowFileMaxSuccess) { + flowFileCurrSuccess += 1; + logger.info("DebugFlow transferring to success file={} UUID={}", + new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.transfer(ff, REL_SUCCESS); + break; + } else { + flowFileCurrSuccess = 0; + curr_ff_resp.getNextCycle(); + } } - } - if (curr_ff_resp.state() == FlowFileResponseState.FF_FAILURE_RESPONSE) { - if (flowFileCurrFailure < flowFileMaxFailure) { - flowFileCurrFailure += 1; - logger.info("DebugFlow transferring to failure file={} UUID={}", - new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), - ff.getAttribute(CoreAttributes.UUID.key())}); - session.transfer(ff, REL_FAILURE); - session.commit(); - break; - } else { - flowFileCurrFailure = 0; - curr_ff_resp.getNextCycle(); + + if (curr_ff_resp.state() == FlowFileResponseState.FF_FAILURE_RESPONSE) { + if (flowFileCurrFailure < flowFileMaxFailure) { + flowFileCurrFailure += 1; + logger.info("DebugFlow transferring to failure file={} UUID={}", + new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.transfer(ff, REL_FAILURE); + break; + } else { + flowFileCurrFailure = 0; + curr_ff_resp.getNextCycle(); + } } - } - if (curr_ff_resp.state() == FlowFileResponseState.FF_ROLLBACK_RESPONSE) { - if (flowFileCurrRollback < flowFileMaxRollback) { - flowFileCurrRollback += 1; - logger.info("DebugFlow rolling back (no penalty) file={} UUID={}", - new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), - ff.getAttribute(CoreAttributes.UUID.key())}); - session.rollback(); - session.commit(); - break; - } else { - flowFileCurrRollback = 0; - curr_ff_resp.getNextCycle(); + + if (curr_ff_resp.state() == FlowFileResponseState.FF_ROLLBACK_RESPONSE) { + if (flowFileCurrRollback < flowFileMaxRollback) { + flowFileCurrRollback += 1; + logger.info("DebugFlow rolling back (no penalty) file={} UUID={}", + new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(); + break; + } else { + flowFileCurrRollback = 0; + curr_ff_resp.getNextCycle(); + } } - } - if (curr_ff_resp.state() == FlowFileResponseState.FF_YIELD_RESPONSE) { - if (flowFileCurrYield < flowFileMaxYield) { - flowFileCurrYield += 1; - logger.info("DebugFlow yielding file={} UUID={}", - new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), - ff.getAttribute(CoreAttributes.UUID.key())}); - session.rollback(); - context.yield(); - return; - } else { - flowFileCurrYield = 0; - curr_ff_resp.getNextCycle(); + + if (curr_ff_resp.state() == FlowFileResponseState.FF_YIELD_RESPONSE) { + if (flowFileCurrYield < flowFileMaxYield) { + flowFileCurrYield += 1; + logger.info("DebugFlow yielding file={} UUID={}", + new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(); + context.yield(); + return; + } else { + flowFileCurrYield = 0; + curr_ff_resp.getNextCycle(); + } } - } - if (curr_ff_resp.state() == FlowFileResponseState.FF_PENALTY_RESPONSE) { - if (flowFileCurrPenalty < flowFileMaxPenalty) { - flowFileCurrPenalty += 1; - logger.info("DebugFlow rolling back (with penalty) file={} UUID={}", - new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), - ff.getAttribute(CoreAttributes.UUID.key())}); - session.rollback(true); - session.commit(); - break; - } else { - flowFileCurrPenalty = 0; - curr_ff_resp.getNextCycle(); + + if (curr_ff_resp.state() == FlowFileResponseState.FF_PENALTY_RESPONSE) { + if (flowFileCurrPenalty < flowFileMaxPenalty) { + flowFileCurrPenalty += 1; + logger.info("DebugFlow rolling back (with penalty) file={} UUID={}", + new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(true); + break; + } else { + flowFileCurrPenalty = 0; + curr_ff_resp.getNextCycle(); + } } - } - if (curr_ff_resp.state() == FlowFileResponseState.FF_EXCEPTION_RESPONSE) { - if (flowFileCurrException < flowFileMaxException) { - flowFileCurrException += 1; - String message = "forced by " + this.getClass().getName(); - logger.info("DebugFlow throwing NPE file={} UUID={}", - new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), - ff.getAttribute(CoreAttributes.UUID.key())}); - RuntimeException rte; - try { - rte = flowFileExceptionClass.getConstructor(String.class).newInstance(message); - throw rte; - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - if (logger.isErrorEnabled()) { - logger.error("{} unexpected exception throwing DebugFlow exception: {}", - new Object[]{this, e}); + + if (curr_ff_resp.state() == FlowFileResponseState.FF_EXCEPTION_RESPONSE) { + if (flowFileCurrException < flowFileMaxException) { + flowFileCurrException += 1; + String message = "forced by " + this.getClass().getName(); + logger.info("DebugFlow throwing NPE file={} UUID={}", + new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + RuntimeException rte; + try { + rte = flowFileExceptionClass.getConstructor(String.class).newInstance(message); + throw rte; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + if (logger.isErrorEnabled()) { + logger.error("{} unexpected exception throwing DebugFlow exception: {}", + new Object[] {this, e}); + } } + } else { + flowFileCurrException = 0; + curr_ff_resp.getNextCycle(); } - } else { - flowFileCurrException = 0; - curr_ff_resp.getNextCycle(); } } } + } finally { + final long sleepMillis = context.getProperty(ON_TRIGGER_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS); + + try { + if (sleepMillis > 0) { + sleep(sleepMillis, context.getProperty(IGNORE_INTERRUPTS).asBoolean()); + getLogger().info("DebugFlow finishes sleeping at completion of its onTrigger() method"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } }