http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java index 3a5695e..8332082 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java @@ -37,141 +37,144 @@ public class JmsProperties { public static final String MSG_TYPE_EMPTY = "empty"; // Standard JMS Properties - public static final PropertyDescriptor JMS_PROVIDER = new PropertyDescriptor.Builder() - .name("JMS Provider") - .description("The Provider used for the JMS Server") - .required(true) - .allowableValues(ACTIVEMQ_PROVIDER) - .defaultValue(ACTIVEMQ_PROVIDER) - .build(); - public static final PropertyDescriptor URL = new PropertyDescriptor.Builder() - .name("URL") - .description("The URL of the JMS Server") - .addValidator(StandardValidators.URI_VALIDATOR) - .required(true) - .build(); - public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("The amount of time to wait when attempting to receive a message before giving up and assuming failure") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 sec") - .build(); - public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() - .name("Username") - .description("Username used for authentication and authorization") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() - .name("Password") - .description("Password used for authentication and authorization") - .required(false) - .addValidator(Validator.VALID) - .sensitive(true) - .build(); - public static final PropertyDescriptor CLIENT_ID_PREFIX = new PropertyDescriptor.Builder() - .name("Client ID Prefix") - .description("A human-readable ID that can be used to associate connections with yourself so that the maintainers of the JMS Server know who to contact if problems arise") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + public static final PropertyDescriptor JMS_PROVIDER = new PropertyDescriptor.Builder(). + name("JMS Provider"). + description("The Provider used for the JMS Server"). + required(true). + allowableValues(ACTIVEMQ_PROVIDER). + defaultValue(ACTIVEMQ_PROVIDER). + build(); + public static final PropertyDescriptor URL = new PropertyDescriptor.Builder(). + name("URL"). + description("The URL of the JMS Server"). + addValidator(StandardValidators.URI_VALIDATOR). + required(true). + build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder(). + name("Communications Timeout"). + description("The amount of time to wait when attempting to receive a message before giving up and assuming failure"). + required(true). + addValidator(StandardValidators.TIME_PERIOD_VALIDATOR). + defaultValue("30 sec"). + build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder(). + name("Username"). + description("Username used for authentication and authorization"). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder(). + name("Password"). + description("Password used for authentication and authorization"). + required(false). + addValidator(Validator.VALID). + sensitive(true). + build(); + public static final PropertyDescriptor CLIENT_ID_PREFIX = new PropertyDescriptor.Builder(). + name("Client ID Prefix"). + description("A human-readable ID that can be used to associate connections with yourself so that the maintainers of the JMS Server know who to contact if problems arise"). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); // Topic/Queue determination Properties - public static final PropertyDescriptor DESTINATION_NAME = new PropertyDescriptor.Builder() - .name("Destination Name") - .description("The name of the JMS Topic or queue to use") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder() - .name("Destination Type") - .description("The type of the JMS Destination to use") - .required(true) - .allowableValues(DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC) - .defaultValue(DESTINATION_TYPE_QUEUE) - .build(); + public static final PropertyDescriptor DESTINATION_NAME = new PropertyDescriptor.Builder(). + name("Destination Name"). + description("The name of the JMS Topic or queue to use"). + required(true). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); + public static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder(). + name("Destination Type"). + description("The type of the JMS Destination to use"). + required(true). + allowableValues(DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC). + defaultValue(DESTINATION_TYPE_QUEUE). + build(); - public static final PropertyDescriptor DURABLE_SUBSCRIPTION = new PropertyDescriptor.Builder() - .name("Use Durable Subscription") - .description("If true, connections to the specified topic will use Durable Subscription so that messages are queued when we are not pulling them") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); + public static final PropertyDescriptor DURABLE_SUBSCRIPTION = new PropertyDescriptor.Builder(). + name("Use Durable Subscription"). + description("If true, connections to the specified topic will use Durable Subscription so that messages are queued when we are not pulling them"). + required(true). + allowableValues("true", "false"). + defaultValue("false"). + build(); // JMS Publisher Properties - public static final PropertyDescriptor ATTRIBUTES_TO_JMS_PROPS = new PropertyDescriptor.Builder() - .name("Copy Attributes to JMS Properties") - .description("Whether or not FlowFile Attributes should be translated into JMS Message Properties. If true, all attributes starting with 'jms.' will be set as Properties on the JMS Message (without the 'jms.' prefix). If an attribute exists that starts with the same value but ends in '.type', that attribute will be used to determine the JMS Message Property type.") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); + public static final PropertyDescriptor ATTRIBUTES_TO_JMS_PROPS = new PropertyDescriptor.Builder(). + name("Copy Attributes to JMS Properties"). + description("Whether or not FlowFile Attributes should be translated into JMS Message Properties. If true, all " + + "attributes starting with 'jms.' will be set as Properties on the JMS Message (without the 'jms.' prefix). " + + "If an attribute exists that starts with the same value but ends in '.type', that attribute will be used " + + "to determine the JMS Message Property type."). + required(true). + allowableValues("true", "false"). + defaultValue("true"). + build(); // JMS Listener Properties - public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Message Batch Size") - .description("The number of messages to pull/push in a single iteration of the processor") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10") - .build(); - public static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder() - .name("Acknowledgement Mode") - .description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide better performance than Client Acknowledge.") - .required(true) - .allowableValues(ACK_MODE_CLIENT, ACK_MODE_AUTO) - .defaultValue(ACK_MODE_CLIENT) - .build(); - public static final PropertyDescriptor JMS_PROPS_TO_ATTRIBUTES = new PropertyDescriptor.Builder() - .name("Copy JMS Properties to Attributes") - .description("Whether or not the JMS Message Properties should be copied to the FlowFile Attributes; if so, the attribute name will be jms.XXX, where XXX is the JMS Property name") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - public static final PropertyDescriptor MESSAGE_SELECTOR = new PropertyDescriptor.Builder() - .name("Message Selector") - .description("The JMS Message Selector to use in order to narrow the messages that are pulled") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder(). + name("Message Batch Size"). + description("The number of messages to pull/push in a single iteration of the processor"). + required(true). + addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR). + defaultValue("10"). + build(); + public static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder(). + name("Acknowledgement Mode"). + description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide better performance than Client Acknowledge."). + required(true). + allowableValues(ACK_MODE_CLIENT, ACK_MODE_AUTO). + defaultValue(ACK_MODE_CLIENT). + build(); + public static final PropertyDescriptor JMS_PROPS_TO_ATTRIBUTES = new PropertyDescriptor.Builder(). + name("Copy JMS Properties to Attributes"). + description("Whether or not the JMS Message Properties should be copied to the FlowFile Attributes; if so, the attribute name will be jms.XXX, where XXX is the JMS Property name"). + required(true). + allowableValues("true", "false"). + defaultValue("true"). + build(); + public static final PropertyDescriptor MESSAGE_SELECTOR = new PropertyDescriptor.Builder(). + name("Message Selector"). + description("The JMS Message Selector to use in order to narrow the messages that are pulled"). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); // JMS Producer Properties - public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder() - .name("Message Type") - .description("The Type of JMS Message to Construct") - .required(true) - .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY) - .defaultValue(MSG_TYPE_BYTE) - .build(); - public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder() - .name("Message Priority") - .description("The Priority of the Message") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor REPLY_TO_QUEUE = new PropertyDescriptor.Builder() - .name("Reply-To Queue") - .description("The name of the queue to which a reply to should be added") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor MESSAGE_TTL = new PropertyDescriptor.Builder() - .name("Message Time to Live") - .description("The amount of time that the message should live on the destination before being removed; if not specified, the message will never expire.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Buffer Size") - .description("The maximum amount of data that can be buffered for a JMS Message. If a FlowFile's size exceeds this value, the FlowFile will be routed to failure.") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .build(); + public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder(). + name("Message Type"). + description("The Type of JMS Message to Construct"). + required(true). + allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY). + defaultValue(MSG_TYPE_BYTE). + build(); + public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder(). + name("Message Priority"). + description("The Priority of the Message"). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + expressionLanguageSupported(true). + build(); + public static final PropertyDescriptor REPLY_TO_QUEUE = new PropertyDescriptor.Builder(). + name("Reply-To Queue"). + description("The name of the queue to which a reply to should be added"). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + expressionLanguageSupported(true). + build(); + public static final PropertyDescriptor MESSAGE_TTL = new PropertyDescriptor.Builder(). + name("Message Time to Live"). + description("The amount of time that the message should live on the destination before being removed; if not specified, the message will never expire."). + required(false). + addValidator(StandardValidators.TIME_PERIOD_VALIDATOR). + build(); + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder(). + name("Max Buffer Size"). + description("The maximum amount of data that can be buffered for a JMS Message. If a FlowFile's size exceeds this value, the FlowFile will be routed to failure."). + required(true). + addValidator(StandardValidators.DATA_SIZE_VALIDATOR). + defaultValue("1 MB"). + build(); }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonPathExpressionValidator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonPathExpressionValidator.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonPathExpressionValidator.java index 61f9bbe..8a1a056 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonPathExpressionValidator.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonPathExpressionValidator.java @@ -19,7 +19,13 @@ package org.apache.nifi.processors.standard.util; import com.jayway.jsonpath.Filter; import com.jayway.jsonpath.Predicate; import com.jayway.jsonpath.internal.Utils; -import com.jayway.jsonpath.internal.token.*; +import com.jayway.jsonpath.internal.token.ArrayPathToken; +import com.jayway.jsonpath.internal.token.PathToken; +import com.jayway.jsonpath.internal.token.PredicatePathToken; +import com.jayway.jsonpath.internal.token.PropertyPathToken; +import com.jayway.jsonpath.internal.token.RootPathToken; +import com.jayway.jsonpath.internal.token.ScanPathToken; +import com.jayway.jsonpath.internal.token.WildcardPathToken; import org.apache.nifi.util.StringUtils; import java.util.ArrayList; @@ -30,11 +36,13 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; /** - * JsonPathExpressionValidator performs the same execution as com.jayway.jsonpath.internal.PathCompiler, but does not throw - * exceptions when an invalid path segment is found. - * Limited access to create JsonPath objects requires a separate flow of execution in avoiding exceptions. + * JsonPathExpressionValidator performs the same execution as + * com.jayway.jsonpath.internal.PathCompiler, but does not throw exceptions when + * an invalid path segment is found. Limited access to create JsonPath objects + * requires a separate flow of execution in avoiding exceptions. * - * @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a> + * @see + * <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a> */ public class JsonPathExpressionValidator { @@ -47,7 +55,6 @@ public class JsonPathExpressionValidator { private static final char BRACKET_CLOSE = ']'; private static final char SPACE = ' '; - /** * Performs a validation of a provided JsonPath expression. * <p/> @@ -64,8 +71,9 @@ public class JsonPathExpressionValidator { * </pre> * </code> * - * @param path to evaluate for validity - * @param filters applied to path expression; this is typically unused in the context of Processors + * @param path to evaluate for validity + * @param filters applied to path expression; this is typically unused in + * the context of Processors * @return true if the specified path is valid; false otherwise */ public static boolean isValidExpression(String path, Predicate... filters) { @@ -79,7 +87,7 @@ public class JsonPathExpressionValidator { return false; } - LinkedList<Predicate> filterList = new LinkedList<Predicate>(asList(filters)); + LinkedList<Predicate> filterList = new LinkedList<>(asList(filters)); if (path.charAt(0) != '$' && path.charAt(0) != '@') { path = "$." + path; @@ -128,15 +136,16 @@ public class JsonPathExpressionValidator { continue; } else if (positions == 1 && path.charAt(i) == '*') { - fragment = new String("[*]"); + fragment = "[*]"; } else { - fragment = PROPERTY_OPEN + path.substring(i, i + positions) + PROPERTY_CLOSE; + fragment = PROPERTY_OPEN + path. + substring(i, i + positions) + PROPERTY_CLOSE; } i += positions; } break; case ANY: - fragment = new String("[*]"); + fragment = "[*]"; i++; break; default: @@ -151,7 +160,8 @@ public class JsonPathExpressionValidator { * Analyze each component represented by a fragment. If there is a failure to properly evaluate, * a null result is returned */ - PathToken analyzedComponent = PathComponentAnalyzer.analyze(fragment, filterList); + PathToken analyzedComponent = PathComponentAnalyzer. + analyze(fragment, filterList); if (analyzedComponent == null) { return false; } @@ -162,7 +172,6 @@ public class JsonPathExpressionValidator { root.append(analyzedComponent); } - } while (i < path.length()); return true; @@ -210,7 +219,8 @@ public class JsonPathExpressionValidator { static class PathComponentAnalyzer { - private static final Pattern FILTER_PATTERN = Pattern.compile("^\\[\\s*\\?\\s*[,\\s*\\?]*?\\s*]$"); //[?] or [?, ?, ...] + private static final Pattern FILTER_PATTERN = Pattern. + compile("^\\[\\s*\\?\\s*[,\\s*\\?]*?\\s*]$"); //[?] or [?, ?, ...] private int i; private char current; @@ -228,13 +238,18 @@ public class JsonPathExpressionValidator { public PathToken analyze() { - if ("$".equals(pathFragment)) return new RootPathToken(); - else if ("..".equals(pathFragment)) return new ScanPathToken(); - else if ("[*]".equals(pathFragment)) return new WildcardPathToken(); - else if (".*".equals(pathFragment)) return new WildcardPathToken(); - else if ("[?]".equals(pathFragment)) return new PredicatePathToken(filterList.poll()); - - else if (FILTER_PATTERN.matcher(pathFragment).matches()) { + if ("$".equals(pathFragment)) { + return new RootPathToken(); + } else if ("..".equals(pathFragment)) { + return new ScanPathToken(); + } else if ("[*]".equals(pathFragment)) { + return new WildcardPathToken(); + } else if (".*".equals(pathFragment)) { + return new WildcardPathToken(); + } else if ("[?]".equals(pathFragment)) { + return new PredicatePathToken(filterList.poll()); + } else if (FILTER_PATTERN.matcher(pathFragment). + matches()) { final int criteriaCount = Utils.countMatches(pathFragment, "?"); List<Predicate> filters = new ArrayList<>(criteriaCount); for (int i = 0; i < criteriaCount; i++) { @@ -260,14 +275,12 @@ public class JsonPathExpressionValidator { break; } - } while (i < pathFragment.length()); //"Could not analyze path component: " + pathFragment return null; } - public PathToken analyzeCriteriaSequence4() { int[] bounds = findFilterBounds(); if (bounds == null) { @@ -275,7 +288,8 @@ public class JsonPathExpressionValidator { } i = bounds[1]; - return new PredicatePathToken(Filter.parse(pathFragment.substring(bounds[0], bounds[1]))); + return new PredicatePathToken(Filter.parse(pathFragment. + substring(bounds[0], bounds[1]))); } int[] findFilterBounds() { @@ -295,13 +309,19 @@ public class JsonPathExpressionValidator { char c = pathFragment.charAt(curr); switch (c) { case '(': - if (!inProp) openBrackets++; + if (!inProp) { + openBrackets++; + } break; case ')': - if (!inProp) openBrackets--; + if (!inProp) { + openBrackets--; + } break; case '[': - if (!inProp) openSquareBracket++; + if (!inProp) { + openSquareBracket++; + } break; case ']': if (!inProp) { @@ -330,10 +350,9 @@ public class JsonPathExpressionValidator { return new int[]{start, end}; } - //"['foo']" private PathToken analyzeProperty() { - List<String> properties = new ArrayList<String>(); + List<String> properties = new ArrayList<>(); StringBuilder buffer = new StringBuilder(); boolean propertyIsOpen = false; @@ -360,7 +379,6 @@ public class JsonPathExpressionValidator { return new PropertyPathToken(properties); } - //"[-1:]" sliceFrom //"[:1]" sliceTo //"[0:5]" sliceBetween @@ -369,7 +387,7 @@ public class JsonPathExpressionValidator { //"[(@.length - 1)]" private PathToken analyzeArraySequence() { StringBuilder buffer = new StringBuilder(); - List<Integer> numbers = new ArrayList<Integer>(); + List<Integer> numbers = new ArrayList<>(); boolean contextSize = (current == '@'); boolean sliceTo = false; @@ -407,7 +425,6 @@ public class JsonPathExpressionValidator { } else { - while (Character.isDigit(current) || current == ',' || current == ' ' || current == ':' || current == '-') { switch (current) { @@ -444,7 +461,8 @@ public class JsonPathExpressionValidator { sliceFrom = true; } else { sliceBetween = true; - numbers.add(Integer.parseInt(buffer.toString())); + numbers.add(Integer.parseInt(buffer. + toString())); buffer.setLength(0); } } @@ -471,12 +489,19 @@ public class JsonPathExpressionValidator { ArrayPathToken.Operation operation = null; - if (singleIndex) operation = ArrayPathToken.Operation.SINGLE_INDEX; - else if (indexSequence) operation = ArrayPathToken.Operation.INDEX_SEQUENCE; - else if (sliceFrom) operation = ArrayPathToken.Operation.SLICE_FROM; - else if (sliceTo) operation = ArrayPathToken.Operation.SLICE_TO; - else if (sliceBetween) operation = ArrayPathToken.Operation.SLICE_BETWEEN; - else if (contextSize) operation = ArrayPathToken.Operation.CONTEXT_SIZE; + if (singleIndex) { + operation = ArrayPathToken.Operation.SINGLE_INDEX; + } else if (indexSequence) { + operation = ArrayPathToken.Operation.INDEX_SEQUENCE; + } else if (sliceFrom) { + operation = ArrayPathToken.Operation.SLICE_FROM; + } else if (sliceTo) { + operation = ArrayPathToken.Operation.SLICE_TO; + } else if (sliceBetween) { + operation = ArrayPathToken.Operation.SLICE_BETWEEN; + } else if (contextSize) { + operation = ArrayPathToken.Operation.CONTEXT_SIZE; + } assert operation != null; @@ -484,4 +509,4 @@ public class JsonPathExpressionValidator { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPConnection.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPConnection.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPConnection.java index ad0b54e..b07c320 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPConnection.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPConnection.java @@ -23,10 +23,6 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -/** - * - * @author - */ public final class SFTPConnection implements Closeable { private static final Log logger = LogFactory.getLog(SFTPConnection.class); @@ -46,11 +42,6 @@ public final class SFTPConnection implements Closeable { return sftp; } - /** - * Call this method to release the connection properly. - * - * @throws IOException - */ @Override public void close() throws IOException { if (null == sftp) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index a294aa4..c8e7b78 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -50,46 +50,46 @@ import com.jcraft.jsch.SftpException; public class SFTPTransfer implements FileTransfer { - public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder() - .name("Private Key Path") - .description("The fully qualified path to the Private Key file") - .required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder() - .name("Private Key Passphrase") - .description("Password for the private key") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder() - .name("Host Key File") - .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used") - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .required(false) - .build(); - public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder() - .name("Strict Host Key Checking") - .description("Indicates whether or not strict enforcement of hosts keys should be applied") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); - public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Port") - .description("The port that the remote system is listening on for file transfers") - .addValidator(StandardValidators.PORT_VALIDATOR) - .required(true) - .defaultValue("22") - .build(); - public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder() - .name("Send Keep Alive On Timeout") - .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out") - .allowableValues("true", "false") - .defaultValue("true") - .required(true) - .build(); + public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder(). + name("Private Key Path"). + description("The fully qualified path to the Private Key file"). + required(false). + addValidator(StandardValidators.FILE_EXISTS_VALIDATOR). + build(); + public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder(). + name("Private Key Passphrase"). + description("Password for the private key"). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + sensitive(true). + build(); + public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder(). + name("Host Key File"). + description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used"). + addValidator(StandardValidators.FILE_EXISTS_VALIDATOR). + required(false). + build(); + public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder(). + name("Strict Host Key Checking"). + description("Indicates whether or not strict enforcement of hosts keys should be applied"). + allowableValues("true", "false"). + defaultValue("false"). + required(true). + build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder(). + name("Port"). + description("The port that the remote system is listening on for file transfers"). + addValidator(StandardValidators.PORT_VALIDATOR). + required(true). + defaultValue("22"). + build(); + public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder(). + name("Send Keep Alive On Timeout"). + description("Indicates whether or not to send a single Keep Alive message when SSH socket times out"). + allowableValues("true", "false"). + defaultValue("true"). + required(true). + build(); /** * Dynamic property which is used to decide if the @@ -101,13 +101,13 @@ public class SFTPTransfer implements FileTransfer { * <p> * This property is dynamic until deemed a worthy inclusion as proper. */ - public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder() - .name("Disable Directory Listing") - .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .dynamic(true) - .defaultValue("false") - .build(); + public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder(). + name("Disable Directory Listing"). + description("Disables directory listings before operations which might fail, such as configurations which create directory structures."). + addValidator(StandardValidators.BOOLEAN_VALIDATOR). + dynamic(true). + defaultValue("false"). + build(); private final ProcessorLog logger; @@ -123,8 +123,10 @@ public class SFTPTransfer implements FileTransfer { this.ctx = processContext; this.logger = logger; - final PropertyValue disableListing = processContext.getProperty(DISABLE_DIRECTORY_LISTING); - disableDirectoryListing = disableListing == null ? false : Boolean.TRUE.equals(disableListing.asBoolean()); + final PropertyValue disableListing = processContext. + getProperty(DISABLE_DIRECTORY_LISTING); + disableDirectoryListing = disableListing == null ? false : Boolean.TRUE. + equals(disableListing.asBoolean()); } @Override @@ -134,9 +136,13 @@ public class SFTPTransfer implements FileTransfer { @Override public List<FileInfo> getListing() throws IOException { - final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue(); + final String path = ctx.getProperty(FileTransfer.REMOTE_PATH). + evaluateAttributeExpressions(). + getValue(); final int depth = 0; - final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger(); + final int maxResults = ctx. + getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE). + asInteger(); final List<FileInfo> listing = new ArrayList<>(1000); getListing(path, depth, maxResults, listing); return listing; @@ -148,27 +154,43 @@ public class SFTPTransfer implements FileTransfer { } if (depth >= 100) { - logger.warn(this + " had to stop recursively searching directories at a recursive depth of " + depth + " to avoid memory issues"); + logger. + warn(this + " had to stop recursively searching directories at a recursive depth of " + depth + " to avoid memory issues"); return; } - final boolean ignoreDottedFiles = ctx.getProperty(FileTransfer.IGNORE_DOTTED_FILES).asBoolean(); - final boolean recurse = ctx.getProperty(FileTransfer.RECURSIVE_SEARCH).asBoolean(); - final String fileFilterRegex = ctx.getProperty(FileTransfer.FILE_FILTER_REGEX).getValue(); - final Pattern pattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex); - final String pathFilterRegex = ctx.getProperty(FileTransfer.PATH_FILTER_REGEX).getValue(); - final Pattern pathPattern = (!recurse || pathFilterRegex == null) ? null : Pattern.compile(pathFilterRegex); - final String remotePath = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue(); + final boolean ignoreDottedFiles = ctx. + getProperty(FileTransfer.IGNORE_DOTTED_FILES). + asBoolean(); + final boolean recurse = ctx.getProperty(FileTransfer.RECURSIVE_SEARCH). + asBoolean(); + final String fileFilterRegex = ctx. + getProperty(FileTransfer.FILE_FILTER_REGEX). + getValue(); + final Pattern pattern = (fileFilterRegex == null) ? null : Pattern. + compile(fileFilterRegex); + final String pathFilterRegex = ctx. + getProperty(FileTransfer.PATH_FILTER_REGEX). + getValue(); + final Pattern pathPattern = (!recurse || pathFilterRegex == null) ? null : Pattern. + compile(pathFilterRegex); + final String remotePath = ctx.getProperty(FileTransfer.REMOTE_PATH). + evaluateAttributeExpressions(). + getValue(); // check if this directory path matches the PATH_FILTER_REGEX boolean pathFilterMatches = true; if (pathPattern != null) { Path reldir = path == null ? Paths.get(".") : Paths.get(path); if (remotePath != null) { - reldir = Paths.get(remotePath).relativize(reldir); - } - if (reldir != null && !reldir.toString().isEmpty()) { - if (!pathPattern.matcher(reldir.toString().replace("\\", "/")).matches()) { + reldir = Paths.get(remotePath). + relativize(reldir); + } + if (reldir != null && !reldir.toString(). + isEmpty()) { + if (!pathPattern.matcher(reldir.toString(). + replace("\\", "/")). + matches()) { pathFilterMatches = false; } } @@ -197,15 +219,19 @@ public class SFTPTransfer implements FileTransfer { } // if is a directory and we're supposed to recurse - if (recurse && entry.getAttrs().isDir()) { + if (recurse && entry.getAttrs(). + isDir()) { subDirs.add(entry); return LsEntrySelector.CONTINUE; } // if is not a directory and is not a link and it matches // FILE_FILTER_REGEX - then let's add it - if (!entry.getAttrs().isDir() && !entry.getAttrs().isLink() && isPathMatch) { - if (pattern == null || pattern.matcher(entryFilename).matches()) { + if (!entry.getAttrs(). + isDir() && !entry.getAttrs(). + isLink() && isPathMatch) { + if (pattern == null || pattern.matcher(entryFilename). + matches()) { listing.add(newFileInfo(entry, path)); } } @@ -219,7 +245,8 @@ public class SFTPTransfer implements FileTransfer { }; - if (path == null || path.trim().isEmpty()) { + if (path == null || path.trim(). + isEmpty()) { sftp.ls(".", filter); } else { sftp.ls(path, filter); @@ -231,12 +258,14 @@ public class SFTPTransfer implements FileTransfer { for (final LsEntry entry : subDirs) { final String entryFilename = entry.getFilename(); final File newFullPath = new File(path, entryFilename); - final String newFullForwardPath = newFullPath.getPath().replace("\\", "/"); + final String newFullForwardPath = newFullPath.getPath(). + replace("\\", "/"); try { getListing(newFullForwardPath, depth + 1, maxResults, listing); } catch (final IOException e) { - logger.error("Unable to get listing from " + newFullForwardPath + "; skipping this subdirectory"); + logger. + error("Unable to get listing from " + newFullForwardPath + "; skipping this subdirectory"); } } } @@ -246,22 +275,29 @@ public class SFTPTransfer implements FileTransfer { return null; } final File newFullPath = new File(path, entry.getFilename()); - final String newFullForwardPath = newFullPath.getPath().replace("\\", "/"); + final String newFullForwardPath = newFullPath.getPath(). + replace("\\", "/"); - String perms = entry.getAttrs().getPermissionsString(); + String perms = entry.getAttrs(). + getPermissionsString(); if (perms.length() > 9) { perms = perms.substring(perms.length() - 9); } FileInfo.Builder builder = new FileInfo.Builder() - .filename(entry.getFilename()) - .fullPathFileName(newFullForwardPath) - .directory(entry.getAttrs().isDir()) - .size(entry.getAttrs().getSize()) - .lastModifiedTime(entry.getAttrs().getMTime() * 1000L) - .permissions(perms) - .owner(Integer.toString(entry.getAttrs().getUId())) - .group(Integer.toString(entry.getAttrs().getGId())); + .filename(entry.getFilename()). + fullPathFileName(newFullForwardPath). + directory(entry.getAttrs(). + isDir()). + size(entry.getAttrs(). + getSize()). + lastModifiedTime(entry.getAttrs(). + getMTime() * 1000L). + permissions(perms). + owner(Integer.toString(entry.getAttrs(). + getUId())). + group(Integer.toString(entry.getAttrs(). + getGId())); return builder.build(); } @@ -304,7 +340,9 @@ public class SFTPTransfer implements FileTransfer { @Override public void ensureDirectoryExists(final FlowFile flowFile, final File directoryName) throws IOException { final ChannelSftp channel = getChannel(flowFile); - final String remoteDirectory = directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", ""); + final String remoteDirectory = directoryName.getAbsolutePath(). + replace("\\", "/"). + replaceAll("^.\\:", ""); // if we disable the directory listing, we just want to blindly perform the mkdir command, // eating any exceptions thrown (like if the directory already exists). @@ -313,7 +351,8 @@ public class SFTPTransfer implements FileTransfer { channel.mkdir(remoteDirectory); } catch (SftpException e) { if (e.id != ChannelSftp.SSH_FX_FAILURE) { - throw new IOException("Could not blindly create remote directory due to " + e.getMessage(), e); + throw new IOException("Could not blindly create remote directory due to " + e. + getMessage(), e); } } return; @@ -335,10 +374,13 @@ public class SFTPTransfer implements FileTransfer { if (!exists) { // first ensure parent directories exist before creating this one - if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) { + if (directoryName.getParent() != null && !directoryName. + getParentFile(). + equals(new File(File.separator))) { ensureDirectoryExists(flowFile, directoryName.getParentFile()); } - logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory}); + logger. + debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory}); try { channel.mkdir(remoteDirectory); logger.debug("Created {}", new Object[]{remoteDirectory}); @@ -351,7 +393,9 @@ public class SFTPTransfer implements FileTransfer { private ChannelSftp getChannel(final FlowFile flowFile) throws IOException { if (sftp != null) { String sessionhost = session.getHost(); - String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + String desthost = ctx.getProperty(HOSTNAME). + evaluateAttributeExpressions(flowFile). + getValue(); if (sessionhost.equals(desthost)) { // destination matches so we can keep our current session return sftp; @@ -363,22 +407,35 @@ public class SFTPTransfer implements FileTransfer { final JSch jsch = new JSch(); try { - final Session session = jsch.getSession(ctx.getProperty(USERNAME).getValue(), - ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(), - ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue()); - - final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue(); + final Session session = jsch.getSession(ctx.getProperty(USERNAME). + getValue(), + ctx.getProperty(HOSTNAME). + evaluateAttributeExpressions(flowFile). + getValue(), + ctx.getProperty(PORT). + evaluateAttributeExpressions(flowFile). + asInteger(). + intValue()); + + final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE). + getValue(); if (hostKeyVal != null) { jsch.setKnownHosts(hostKeyVal); } final Properties properties = new Properties(); - properties.setProperty("StrictHostKeyChecking", ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean() ? "yes" : "no"); - properties.setProperty("PreferredAuthentications", "publickey,password"); - - if (ctx.getProperty(FileTransfer.USE_COMPRESSION).asBoolean()) { - properties.setProperty("compression.s2c", "[email protected],zlib,none"); - properties.setProperty("compression.c2s", "[email protected],zlib,none"); + properties.setProperty("StrictHostKeyChecking", ctx. + getProperty(STRICT_HOST_KEY_CHECKING). + asBoolean() ? "yes" : "no"); + properties. + setProperty("PreferredAuthentications", "publickey,password"); + + if (ctx.getProperty(FileTransfer.USE_COMPRESSION). + asBoolean()) { + properties. + setProperty("compression.s2c", "[email protected],zlib,none"); + properties. + setProperty("compression.c2s", "[email protected],zlib,none"); } else { properties.setProperty("compression.s2c", "none"); properties.setProperty("compression.c2s", "none"); @@ -386,32 +443,42 @@ public class SFTPTransfer implements FileTransfer { session.setConfig(properties); - final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).getValue(); + final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH). + getValue(); if (privateKeyFile != null) { - jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).getValue()); + jsch.addIdentity(privateKeyFile, ctx. + getProperty(PRIVATE_KEY_PASSPHRASE). + getValue()); } - final String password = ctx.getProperty(FileTransfer.PASSWORD).getValue(); + final String password = ctx.getProperty(FileTransfer.PASSWORD). + getValue(); if (password != null) { session.setPassword(password); } - session.setTimeout(ctx.getProperty(FileTransfer.CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + session.setTimeout(ctx.getProperty(FileTransfer.CONNECTION_TIMEOUT). + asTimePeriod(TimeUnit.MILLISECONDS). + intValue()); session.connect(); this.session = session; this.closed = false; sftp = (ChannelSftp) session.openChannel("sftp"); sftp.connect(); - session.setTimeout(ctx.getProperty(FileTransfer.DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - if (!ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean()) { + session.setTimeout(ctx.getProperty(FileTransfer.DATA_TIMEOUT). + asTimePeriod(TimeUnit.MILLISECONDS). + intValue()); + if (!ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT). + asBoolean()) { session.setServerAliveCountMax(0); // do not send keepalive message on SocketTimeoutException } this.homeDir = sftp.getHome(); return sftp; } catch (final SftpException | JSchException e) { - throw new IOException("Failed to obtain connection to remote host due to " + e.toString(), e); + throw new IOException("Failed to obtain connection to remote host due to " + e. + toString(), e); } } @@ -421,11 +488,6 @@ public class SFTPTransfer implements FileTransfer { return this.homeDir; } - /** - * Call this method to release the connection properly. - * - * @throws IOException - */ @Override public void close() throws IOException { if (closed) { @@ -438,7 +500,9 @@ public class SFTPTransfer implements FileTransfer { sftp.exit(); } } catch (final Exception ex) { - logger.warn("Failed to close ChannelSftp due to {}", new Object[]{ex.toString()}, ex); + logger. + warn("Failed to close ChannelSftp due to {}", new Object[]{ex. + toString()}, ex); } sftp = null; @@ -447,7 +511,8 @@ public class SFTPTransfer implements FileTransfer { session.disconnect(); } } catch (final Exception ex) { - logger.warn("Failed to close session due to {}", new Object[]{ex.toString()}, ex); + logger.warn("Failed to close session due to {}", new Object[]{ex. + toString()}, ex); } session = null; } @@ -487,7 +552,8 @@ public class SFTPTransfer implements FileTransfer { LsEntry matchingEntry = null; for (final LsEntry entry : vector) { - if (entry.getFilename().equalsIgnoreCase(filename)) { + if (entry.getFilename(). + equalsIgnoreCase(filename)) { matchingEntry = entry; break; } @@ -506,9 +572,12 @@ public class SFTPTransfer implements FileTransfer { : (path.endsWith("/")) ? path + filename : path + "/" + filename; // temporary path + filename - String tempFilename = ctx.getProperty(TEMP_FILENAME).evaluateAttributeExpressions(flowFile).getValue(); + String tempFilename = ctx.getProperty(TEMP_FILENAME). + evaluateAttributeExpressions(flowFile). + getValue(); if (tempFilename == null) { - final boolean dotRename = ctx.getProperty(DOT_RENAME).asBoolean(); + final boolean dotRename = ctx.getProperty(DOT_RENAME). + asBoolean(); tempFilename = dotRename ? "." + filename : filename; } final String tempPath = (path == null) @@ -521,45 +590,61 @@ public class SFTPTransfer implements FileTransfer { throw new IOException("Unable to put content to " + fullPath + " due to " + e, e); } - final String lastModifiedTime = ctx.getProperty(LAST_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue(); - if (lastModifiedTime != null && !lastModifiedTime.trim().isEmpty()) { + final String lastModifiedTime = ctx.getProperty(LAST_MODIFIED_TIME). + evaluateAttributeExpressions(flowFile). + getValue(); + if (lastModifiedTime != null && !lastModifiedTime.trim(). + isEmpty()) { try { final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); final Date fileModifyTime = formatter.parse(lastModifiedTime); int time = (int) (fileModifyTime.getTime() / 1000L); sftp.setMtime(tempPath, time); } catch (final Exception e) { - logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{tempPath, lastModifiedTime, e}); + logger. + error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{tempPath, lastModifiedTime, e}); } } - final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue(); - if (permissions != null && !permissions.trim().isEmpty()) { + final String permissions = ctx.getProperty(PERMISSIONS). + evaluateAttributeExpressions(flowFile). + getValue(); + if (permissions != null && !permissions.trim(). + isEmpty()) { try { int perms = numberPermissions(permissions); if (perms >= 0) { sftp.chmod(perms, tempPath); } } catch (final Exception e) { - logger.error("Failed to set permission on {} to {} due to {}", new Object[]{tempPath, permissions, e}); + logger. + error("Failed to set permission on {} to {} due to {}", new Object[]{tempPath, permissions, e}); } } - final String owner = ctx.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue(); - if (owner != null && !owner.trim().isEmpty()) { + final String owner = ctx.getProperty(REMOTE_OWNER). + evaluateAttributeExpressions(flowFile). + getValue(); + if (owner != null && !owner.trim(). + isEmpty()) { try { sftp.chown(Integer.parseInt(owner), tempPath); } catch (final Exception e) { - logger.error("Failed to set owner on {} to {} due to {}", new Object[]{tempPath, owner, e}); + logger. + error("Failed to set owner on {} to {} due to {}", new Object[]{tempPath, owner, e}); } } - final String group = ctx.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue(); - if (group != null && !group.trim().isEmpty()) { + final String group = ctx.getProperty(REMOTE_GROUP). + evaluateAttributeExpressions(flowFile). + getValue(); + if (group != null && !group.trim(). + isEmpty()) { try { sftp.chgrp(Integer.parseInt(group), tempPath); } catch (final Exception e) { - logger.error("Failed to set group on {} to {} due to {}", new Object[]{tempPath, group, e}); + logger. + error("Failed to set group on {} to {} due to {}", new Object[]{tempPath, group, e}); } } @@ -583,7 +668,8 @@ public class SFTPTransfer implements FileTransfer { int number = -1; final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$"); final Pattern numPattern = Pattern.compile("\\d+"); - if (rwxPattern.matcher(perms).matches()) { + if (rwxPattern.matcher(perms). + matches()) { number = 0; if (perms.charAt(0) == 'r') { number |= 0x100; @@ -612,7 +698,8 @@ public class SFTPTransfer implements FileTransfer { if (perms.charAt(8) == 'x') { number |= 0x1; } - } else if (numPattern.matcher(perms).matches()) { + } else if (numPattern.matcher(perms). + matches()) { try { number = Integer.parseInt(perms, 8); } catch (NumberFormatException ignore) { @@ -630,7 +717,8 @@ public class SFTPTransfer implements FileTransfer { @Override public void log(int level, String message) { - LoggerFactory.getLogger(SFTPTransfer.class).debug("SFTP Log: {}", message); + LoggerFactory.getLogger(SFTPTransfer.class). + debug("SFTP Log: {}", message); } }); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPUtils.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPUtils.java index ea59232..9121089 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPUtils.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPUtils.java @@ -38,86 +38,82 @@ import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; -/** - * - * @author developer - */ public class SFTPUtils { - public static final PropertyDescriptor SFTP_PRIVATEKEY_PATH = new PropertyDescriptor.Builder() - .required(false) - .description("sftp.privatekey.path") - .defaultValue(null) - .name("sftp.privatekey.path") - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .sensitive(false) - .build(); - public static final PropertyDescriptor REMOTE_PASSWORD = new PropertyDescriptor.Builder() - .required(false) - .description("remote.password") - .defaultValue(null) - .name("remote.password") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - public static final PropertyDescriptor SFTP_PRIVATEKEY_PASSPHRASE = new PropertyDescriptor.Builder() - .required(false) - .description("sftp.privatekey.passphrase") - .defaultValue(null) - .name("sftp.privatekey.passphrase") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - public static final PropertyDescriptor SFTP_PORT = new PropertyDescriptor.Builder() - .required(false) - .description("sftp.port") - .defaultValue(null) - .name("sftp.port") - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .sensitive(false) - .build(); - public static final PropertyDescriptor NETWORK_DATA_TIMEOUT = new PropertyDescriptor.Builder() - .required(false) - .description("network.data.timeout") - .defaultValue(null) - .name("network.data.timeout") - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .sensitive(false) - .build(); - public static final PropertyDescriptor SFTP_HOSTKEY_FILENAME = new PropertyDescriptor.Builder() - .required(false) - .description("sftp.hostkey.filename") - .defaultValue(null) - .name("sftp.hostkey.filename") - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .sensitive(false) - .build(); - public static final PropertyDescriptor NETWORK_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() - .required(false) - .description("network.connection.timeout") - .defaultValue(null) - .name("network.connection.timeout") - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .sensitive(false) - .build(); + public static final PropertyDescriptor SFTP_PRIVATEKEY_PATH = new PropertyDescriptor.Builder(). + required(false). + description("sftp.privatekey.path"). + defaultValue(null). + name("sftp.privatekey.path"). + addValidator(StandardValidators.FILE_EXISTS_VALIDATOR). + sensitive(false). + build(); + public static final PropertyDescriptor REMOTE_PASSWORD = new PropertyDescriptor.Builder(). + required(false). + description("remote.password"). + defaultValue(null). + name("remote.password"). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + sensitive(true). + build(); + public static final PropertyDescriptor SFTP_PRIVATEKEY_PASSPHRASE = new PropertyDescriptor.Builder(). + required(false). + description("sftp.privatekey.passphrase"). + defaultValue(null). + name("sftp.privatekey.passphrase"). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + sensitive(true). + build(); + public static final PropertyDescriptor SFTP_PORT = new PropertyDescriptor.Builder(). + required(false). + description("sftp.port"). + defaultValue(null). + name("sftp.port"). + addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR). + sensitive(false). + build(); + public static final PropertyDescriptor NETWORK_DATA_TIMEOUT = new PropertyDescriptor.Builder(). + required(false). + description("network.data.timeout"). + defaultValue(null). + name("network.data.timeout"). + addValidator(StandardValidators.INTEGER_VALIDATOR). + sensitive(false). + build(); + public static final PropertyDescriptor SFTP_HOSTKEY_FILENAME = new PropertyDescriptor.Builder(). + required(false). + description("sftp.hostkey.filename"). + defaultValue(null). + name("sftp.hostkey.filename"). + addValidator(StandardValidators.FILE_EXISTS_VALIDATOR). + sensitive(false). + build(); + public static final PropertyDescriptor NETWORK_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder(). + required(false). + description("network.connection.timeout"). + defaultValue(null). + name("network.connection.timeout"). + addValidator(StandardValidators.INTEGER_VALIDATOR). + sensitive(false). + build(); // required properties - public static final PropertyDescriptor REMOTE_HOSTNAME = new PropertyDescriptor.Builder() - .required(true) - .description("remote.hostname") - .defaultValue(null) - .name("remote.hostname") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(false) - .build(); - public static final PropertyDescriptor REMOTE_USERNAME = new PropertyDescriptor.Builder() - .required(true) - .description("remote.username") - .defaultValue(null) - .name("remote.username") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(false) - .build(); + public static final PropertyDescriptor REMOTE_HOSTNAME = new PropertyDescriptor.Builder(). + required(true). + description("remote.hostname"). + defaultValue(null). + name("remote.hostname"). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + sensitive(false). + build(); + public static final PropertyDescriptor REMOTE_USERNAME = new PropertyDescriptor.Builder(). + required(true). + description("remote.username"). + defaultValue(null). + name("remote.username"). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + sensitive(false). + build(); private static final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(); @@ -140,14 +136,6 @@ public class SFTPUtils { return propertyDescriptors; } - /** - * - * @param conf - * @return - * @throws JSchException - * @throws SftpException - * @throws IOException - */ public static SFTPConnection connectSftp(final SFTPConfiguration conf) throws JSchException, SftpException, IOException { final JSch jsch = new JSch(); final Session session = SFTPUtils.createSession(conf, jsch); @@ -161,16 +149,22 @@ public class SFTPUtils { File dir = new File(dirPath); String currentWorkingDirectory = null; boolean dirExists = false; - final String forwardPaths = dir.getPath().replaceAll(Matcher.quoteReplacement("\\"), Matcher.quoteReplacement("/")); + final String forwardPaths = dir.getPath(). + replaceAll(Matcher.quoteReplacement("\\"), Matcher. + quoteReplacement("/")); try { currentWorkingDirectory = sftp.pwd(); - logger.debug(proc + " attempting to change directory from " + currentWorkingDirectory + " to " + dir.getPath()); + logger. + debug(proc + " attempting to change directory from " + currentWorkingDirectory + " to " + dir. + getPath()); //always use forward paths for long string attempt sftp.cd(forwardPaths); dirExists = true; - logger.debug(proc + " changed working directory to '" + forwardPaths + "' from '" + currentWorkingDirectory + "'"); + logger. + debug(proc + " changed working directory to '" + forwardPaths + "' from '" + currentWorkingDirectory + "'"); } catch (final SftpException sftpe) { - logger.debug(proc + " could not change directory to '" + forwardPaths + "' from '" + currentWorkingDirectory + "' so trying the hard way."); + logger. + debug(proc + " could not change directory to '" + forwardPaths + "' from '" + currentWorkingDirectory + "' so trying the hard way."); } if (dirExists) { return; @@ -191,12 +185,14 @@ public class SFTPUtils { try { sftp.cd(dirName); } catch (final SftpException sftpe) { - logger.debug(proc + " creating new directory and changing to it " + dirName); + logger. + debug(proc + " creating new directory and changing to it " + dirName); try { sftp.mkdir(dirName); sftp.cd(dirName); } catch (final SftpException e) { - throw new IOException(proc + " could not make/change directory to [" + dirName + "] [" + e.getLocalizedMessage() + "]", e); + throw new IOException(proc + " could not make/change directory to [" + dirName + "] [" + e. + getLocalizedMessage() + "]", e); } } } @@ -209,7 +205,8 @@ public class SFTPUtils { final Hashtable<String, String> newOptions = new Hashtable<>(); - Session session = jsch.getSession(conf.username, conf.hostname, conf.port); + Session session = jsch. + getSession(conf.username, conf.hostname, conf.port); final String hostKeyVal = conf.hostkeyFile; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java index ad2cca5..84f431d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java @@ -109,7 +109,8 @@ public class UDPStreamConsumer implements StreamConsumer { } // time to make a new flow file newFlowFile = session.create(); - newFlowFile = session.putAttribute(newFlowFile, "source.stream.identifier", uniqueId); + newFlowFile = session. + putAttribute(newFlowFile, "source.stream.identifier", uniqueId); newFlowFile = session.write(newFlowFile, udpCallback); if (newFlowFile.getSize() == 0) { session.remove(newFlowFile); @@ -122,7 +123,8 @@ public class UDPStreamConsumer implements StreamConsumer { try { session.remove(newFlowFile); } catch (final Exception ex2) { - logger.warn("Unable to delete partial flow file due to: ", ex2); + logger. + warn("Unable to delete partial flow file due to: ", ex2); } } throw new IOException("Problem while processing data stream", ex); @@ -156,17 +158,21 @@ public class UDPStreamConsumer implements StreamConsumer { return false; } UDPStreamConsumer rhs = (UDPStreamConsumer) obj; - return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals(); + return new EqualsBuilder().appendSuper(super.equals(obj)). + append(uniqueId, rhs.uniqueId). + isEquals(); } @Override public final int hashCode() { - return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode(); + return new HashCodeBuilder(17, 37).append(uniqueId). + toHashCode(); } @Override public final String toString() { - return new ToStringBuilder(this).append(uniqueId).toString(); + return new ToStringBuilder(this).append(uniqueId). + toString(); } public static final class UDPConsumerCallback implements OutputStreamCallback { @@ -188,9 +194,11 @@ public class UDPStreamConsumer implements StreamConsumer { public void process(final OutputStream out) throws IOException { try { long totalBytes = 0L; - try (WritableByteChannel wbc = Channels.newChannel(new BufferedOutputStream(out))) { + try (WritableByteChannel wbc = Channels. + newChannel(new BufferedOutputStream(out))) { ByteBuffer buffer = null; - while ((buffer = filledBuffers.poll(50, TimeUnit.MILLISECONDS)) != null) { + while ((buffer = filledBuffers. + poll(50, TimeUnit.MILLISECONDS)) != null) { int bytesWrittenThisPass = 0; try { while (buffer.hasRemaining()) { @@ -201,7 +209,8 @@ public class UDPStreamConsumer implements StreamConsumer { break;// this is enough data } } finally { - bufferPool.returnBuffer(buffer, bytesWrittenThisPass); + bufferPool. + returnBuffer(buffer, bytesWrittenThisPass); } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java index f7ebeab..692947d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java @@ -22,7 +22,6 @@ import java.io.InputStream; import java.util.Arrays; import org.apache.commons.codec.binary.Base32; - /** * An InputStream that throws an IOException if any byte is read that is not a * valid Base32 character. Whitespace is considered valid. http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java index fca6a70..7d16b73 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageConsumer.java @@ -55,19 +55,22 @@ public class WrappedMessageConsumer { try { connection.close(); } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); + logger. + warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); } try { session.close(); } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); + logger. + warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); } try { consumer.close(); } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); + logger. + warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java index fc01b02..a2d7459 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/WrappedMessageProducer.java @@ -55,19 +55,22 @@ public class WrappedMessageProducer { try { connection.close(); } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); + logger. + warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); } try { session.close(); } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); + logger. + warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); } try { producer.close(); } catch (final JMSException e) { - logger.warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); + logger. + warn("unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", e); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/XmlSplitterSaxParser.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/XmlSplitterSaxParser.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/XmlSplitterSaxParser.java index d012ae0..d053f14 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/XmlSplitterSaxParser.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/XmlSplitterSaxParser.java @@ -71,7 +71,9 @@ public class XmlSplitterSaxParser extends DefaultHandler { @Override public void endElement(final String uri, final String localName, final String qName) throws SAXException { // Add the element end tag. - sb.append("</").append(qName).append(">"); + sb.append("</"). + append(qName). + append(">"); // We have finished processing this element. Decrement the depth. int newDepth = depth.decrementAndGet(); @@ -102,7 +104,12 @@ public class XmlSplitterSaxParser extends DefaultHandler { for (int i = 0; i < attCount; i++) { String attName = atts.getQName(i); String attValue = atts.getValue(i); - sb.append(" ").append(attName).append("=").append("\"").append(attValue).append("\""); + sb.append(" "). + append(attName). + append("="). + append("\""). + append(attValue). + append("\""); } sb.append(">"); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateXQuery/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateXQuery/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateXQuery/additionalDetails.html index 99c530e..e67923f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateXQuery/additionalDetails.html +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.EvaluateXQuery/additionalDetails.html @@ -27,9 +27,9 @@ <!-- Processor Documentation ================================================== --> - <p> - <strong>Examples:</strong> - </p> + <p> + <strong>Examples:</strong> + </p> <p>This processor produces one attribute or FlowFile per XQueryResult. If only one attribute or FlowFile is desired, the http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.IdentifyMimeType/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.IdentifyMimeType/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.IdentifyMimeType/additionalDetails.html index ec3179a..7166777 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.IdentifyMimeType/additionalDetails.html +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.IdentifyMimeType/additionalDetails.html @@ -55,5 +55,5 @@ <li>application/zip</li> <li>application/x-lzh</li> </ul> -</body> + </body> </html>
