NIFI-421: Deleted unused method; formatted whitespace
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fb8984cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fb8984cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fb8984cf Branch: refs/heads/develop Commit: fb8984cfa55654aeba0c8fe3d14be613af395a94 Parents: ad98ac5 Author: Mark Payne <[email protected]> Authored: Wed Apr 29 14:52:20 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Wed Apr 29 14:52:20 2015 -0400 ---------------------------------------------------------------------- .../processors/standard/ExecuteProcess.java | 113 ++++++++----------- 1 file changed, 46 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb8984cf/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java index 2490f0c..f6085e7 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java @@ -66,43 +66,43 @@ import org.apache.nifi.processor.util.StandardValidators; public class ExecuteProcess extends AbstractProcessor { public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder() - .name("Command") - .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") - .required(true) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Command") + .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder() - .name("Command Arguments") - .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") - .required(false) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Command Arguments") + .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder() - .name("Working Directory") - .description("The directory to use as the current working directory when executing the command") - .expressionLanguageSupported(false) - .addValidator(StandardValidators.createDirectoryExistsValidator(false, true)) - .required(false) - .build(); + .name("Working Directory") + .description("The directory to use as the current working directory when executing the command") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(false, true)) + .required(false) + .build(); public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder() - .name("Batch Duration") - .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so " - + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results " - + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results") + .name("Batch Duration") + .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so " + + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results " + + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results") .required(false) .expressionLanguageSupported(false) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder() - .name("Redirect Error Stream") - .description("If true will redirect any error stream output of the process to the output stream. " - + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.") + .name("Redirect Error Stream") + .description("If true will redirect any error stream output of the process to the output stream. " + + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.") .required(false) .allowableValues("true", "false") .defaultValue("false") @@ -111,9 +111,9 @@ public class ExecuteProcess extends AbstractProcessor { .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All created FlowFiles are routed to this relationship") - .build(); + .name("success") + .description("All created FlowFiles are routed to this relationship") + .build(); private volatile ExecutorService executor; private Future<?> longRunningProcess; @@ -138,11 +138,11 @@ public class ExecuteProcess extends AbstractProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name(propertyDescriptorName) + .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); } static List<String> splitArgs(final String input) { @@ -212,17 +212,16 @@ public class ExecuteProcess extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final long startNanos = System.nanoTime(); - - if (proxyOut==null) + if (proxyOut==null) { proxyOut = new ProxyOutputStream(getLogger()); + } final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS); final List<String> commandStrings = createCommandStrings(context); final String commandString = StringUtils.join(commandStrings, " "); - if (longRunningProcess == null || longRunningProcess.isDone()) + if (longRunningProcess == null || longRunningProcess.isDone()) { try { longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut); } catch (final IOException ioe) { @@ -230,8 +229,9 @@ public class ExecuteProcess extends AbstractProcessor { context.yield(); return; } - else + } else { getLogger().info("Read from long running process"); + } if (!isScheduled()) { getLogger().info("User stopped processor; will terminate process immediately"); @@ -239,10 +239,8 @@ public class ExecuteProcess extends AbstractProcessor { return; } - // Create a FlowFile that we can write to and set the OutputStream for - // the FlowFile - // as the delegate for the ProxyOuptutStream, then wait until the - // process finishes + // Create a FlowFile that we can write to and set the OutputStream for the FlowFile + // as the delegate for the ProxyOuptutStream, then wait until the process finishes // or until the specified amount of time FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @@ -252,8 +250,7 @@ public class ExecuteProcess extends AbstractProcessor { proxyOut.setDelegate(out); if (batchNanos == null) { - // we are not creating batches; wait until process - // terminates. + // we are not creating batches; wait until process terminates. // NB!!! Maybe get(long timeout, TimeUnit unit) should // be used to avoid waiting forever. try { @@ -271,7 +268,7 @@ public class ExecuteProcess extends AbstractProcessor { } proxyOut.setDelegate(null); // prevent from writing to this - // stream + // stream } } }); @@ -280,8 +277,7 @@ public class ExecuteProcess extends AbstractProcessor { // If no data was written to the file, remove it session.remove(flowFile); } else if (failure.get()) { - // If there was a failure processing the output of the Process, - // remove the FlowFile + // If there was a failure processing the output of the Process, remove the FlowFile session.remove(flowFile); getLogger().error("Failed to read data from Process, so will not generate FlowFile"); } else { @@ -291,13 +287,11 @@ public class ExecuteProcess extends AbstractProcessor { session.transfer(flowFile, REL_SUCCESS); } - // Commit the session so that the FlowFile is transferred to the next - // processor + // Commit the session so that the FlowFile is transferred to the next processor session.commit(); } protected List<String> createCommandStrings(final ProcessContext context) { - final String command = context.getProperty(COMMAND).getValue(); final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue()); @@ -381,8 +375,7 @@ public class ExecuteProcess extends AbstractProcessor { // setting a batch during means text. // Also, we don't want that text to get split up in the // middle of a line, so we use BufferedReader - // to read lines of text and write them as lines of - // text. + // to read lines of text and write them as lines of text. try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) { String line; @@ -402,12 +395,10 @@ public class ExecuteProcess extends AbstractProcessor { int exitCode; try { exitCode = newProcess.exitValue(); - } catch (Exception e) { + } catch (final Exception e) { exitCode = -99999; } getLogger().info("Process finished with exit code {} ", new Object[] { exitCode }); - // getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", - // new Object[]{exitCode, flowFileCount, millis}); } return null; @@ -417,18 +408,6 @@ public class ExecuteProcess extends AbstractProcessor { return future; } - // NB!!! Currently not used, Future<?> longRunningProcess is used to check whether process is done or not. - private boolean isAlive(final Process process) { - // unfortunately, java provides no straight-forward way to test if a Process is alive. - // In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7, - // so we have this solution in the mean time. - try { - process.exitValue(); - return false; - } catch (final IllegalThreadStateException itse) { - return true; - } - } /** * Output stream that is used to wrap another output stream in a way that the underlying output stream can be swapped out for a different one when needed
