http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index c1af0ea..11e75ed 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -91,7 +91,7 @@ import org.apache.nifi.util.StopWatch; @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"}) @CapabilityDescription("Fetches a file via HTTP") -@WritesAttribute(attribute="filename", description="the filename is set to the name of the file on the remote server") +@WritesAttribute(attribute = "filename", description = "the filename is set to the name of the file on the remote server") public class GetHTTP extends AbstractSessionFactoryProcessor { static final int PERSISTENCE_INTERVAL_MSEC = 10000; @@ -112,8 +112,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { .build(); public static final PropertyDescriptor FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() .name("Follow Redirects") - .description( - "If we receive a 3xx HTTP Status Code from the server, indicates whether or not we should follow the redirect that the server specifies") + .description("If we receive a 3xx HTTP Status Code from the server, indicates whether or not we should follow the redirect that the server specifies") .defaultValue("false") .allowableValues("true", "false") .build(); @@ -132,8 +131,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { .build(); public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder() .name("Data Timeout") - .description( - "How long to wait between receiving segments of data from the remote server before giving up and discarding the partial file") + .description("How long to wait between receiving segments of data from the remote server before giving up and discarding the partial file") .required(true) .defaultValue("30 sec") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) @@ -170,8 +168,10 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { .identifiesControllerService(SSLContextService.class) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") - .description("All files are transferred to the success relationship").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All files are transferred to the success relationship") + .build(); public static final String LAST_MODIFIED_DATE_PATTERN_RFC1123 = "EEE, dd MMM yyyy HH:mm:ss zzz"; @@ -275,28 +275,23 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { return results; } - - private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, - CertificateException, KeyManagementException, UnrecoverableKeyException - { - final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); + private SSLContext createSSLContext(final SSLContextService service) + throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException, UnrecoverableKeyException { + final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) { truststore.load(in, service.getTrustStorePassword().toCharArray()); } - - final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); + + final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) { keystore.load(in, service.getKeyStorePassword().toCharArray()); } - - SSLContext sslContext = SSLContexts.custom() - .loadTrustMaterial(truststore, new TrustSelfSignedStrategy()) - .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()) - .build(); - + + SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build(); + return sslContext; } - + @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { final ProcessorLog logger = getLogger(); @@ -318,13 +313,13 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { } catch (URISyntaxException swallow) { // this won't happen as the url has already been validated } - + // get the ssl context service final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - + // create the connection manager final HttpClientConnectionManager conMan; - if ( sslContextService == null ) { + if (sslContextService == null) { conMan = new BasicHttpClientConnectionManager(); } else { final SSLContext sslContext; @@ -333,16 +328,14 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { } catch (final Exception e) { throw new ProcessException(e); } - - final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null, - SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER); - - final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() - .register("https", sslsf).build(); - + + final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[]{"TLSv1"}, null, SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER); + + final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslsf).build(); + conMan = new BasicHttpClientConnectionManager(socketFactoryRegistry); } - + try { // build the request configuration final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); @@ -351,25 +344,25 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { requestConfigBuilder.setRedirectsEnabled(false); requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); requestConfigBuilder.setRedirectsEnabled(context.getProperty(FOLLOW_REDIRECTS).asBoolean()); - + // build the http client final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); clientBuilder.setConnectionManager(conMan); - + // include the user agent final String userAgent = context.getProperty(USER_AGENT).getValue(); if (userAgent != null) { clientBuilder.setUserAgent(userAgent); } - + // set the ssl context if necessary if (sslContextService != null) { clientBuilder.setSslcontext(sslContextService.createSSLContext(ClientAuth.REQUIRED)); } - + final String username = context.getProperty(USERNAME).getValue(); final String password = context.getProperty(PASSWORD).getValue(); - + // set the credentials if appropriate if (username != null) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); @@ -383,7 +376,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { // create the http client final HttpClient client = clientBuilder.build(); - + // create request final HttpGet get = new HttpGet(url); get.setConfig(requestConfigBuilder.build());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java index ecec781..94b49fe 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java @@ -112,8 +112,8 @@ public class GetJMSTopic extends JmsConsumer { return; } - // decrypt the passwords so the persisted and current properties can be compared... - // we can modify this properties instance since the unsubscribe method will reload + // decrypt the passwords so the persisted and current properties can be compared... + // we can modify this properties instance since the unsubscribe method will reload // the properties from disk decryptPassword(persistedProps, context); decryptPassword(currentProps, context); @@ -168,13 +168,12 @@ public class GetJMSTopic extends JmsConsumer { } /** - * Attempts to locate the password in the specified properties. If found, - * decrypts it using the specified context. + * Attempts to locate the password in the specified properties. If found, decrypts it using the specified context. * - * @param properties - * @param context + * @param properties properties + * @param context context */ - public void decryptPassword(final Properties properties, final ProcessContext context) { + protected void decryptPassword(final Properties properties, final ProcessContext context) { final String encryptedPassword = properties.getProperty(PASSWORD.getName()); // if the is in the properties, decrypt it @@ -192,8 +191,8 @@ public class GetJMSTopic extends JmsConsumer { /** * Persists the subscription details for future use. * - * @param context - * @throws IOException + * @param context context + * @throws IOException ex */ private void persistSubscriptionInfo(final ProcessContext context) throws IOException { final Properties props = getSubscriptionPropertiesFromContext(context); @@ -203,11 +202,10 @@ public class GetJMSTopic extends JmsConsumer { } /** - * Returns the subscription details from the specified context. Note: if a - * password is set, the resulting entry will be encrypted. + * Returns the subscription details from the specified context. Note: if a password is set, the resulting entry will be encrypted. * - * @param context - * @return + * @param context context + * @return Returns the subscription details from the specified context */ private Properties getSubscriptionPropertiesFromContext(final ProcessContext context) { final String unencryptedPassword = context.getProperty(PASSWORD).getValue(); @@ -235,11 +233,10 @@ public class GetJMSTopic extends JmsConsumer { } /** - * Loads the subscription details from disk. Since the details are coming - * from disk, if a password is set, the resulting entry will be encrypted. + * Loads the subscription details from disk. Since the details are coming from disk, if a password is set, the resulting entry will be encrypted. * - * @return - * @throws IOException + * @return properties + * @throws IOException ex */ private Properties getSubscriptionPropertiesFromFile() throws IOException { final Path subscriptionPath = getSubscriptionPath(); @@ -257,11 +254,10 @@ public class GetJMSTopic extends JmsConsumer { } /** - * Loads subscription info from the Subscription File and unsubscribes from - * the subscription, if the file exists; otherwise, does nothing + * Loads subscription info from the Subscription File and unsubscribes from the subscription, if the file exists; otherwise, does nothing * - * @throws IOException - * @throws JMSException + * @throws IOException ex + * @throws JMSException ex */ private void unsubscribe(final ProcessContext context) throws IOException, JMSException { final Properties props = getSubscriptionPropertiesFromFile(); @@ -272,7 +268,8 @@ public class GetJMSTopic extends JmsConsumer { final String serverUrl = props.getProperty(URL.getName()); final String username = props.getProperty(USERNAME.getName()); final String encryptedPassword = props.getProperty(PASSWORD.getName()); - final String subscriptionName = props.getProperty(SUBSCRIPTION_NAME_PROPERTY); + final String subscriptionName = props. + getProperty(SUBSCRIPTION_NAME_PROPERTY); final String jmsProvider = props.getProperty(JMS_PROVIDER.getName()); final String password = encryptedPassword == null ? null : context.decrypt(encryptedPassword); @@ -281,15 +278,6 @@ public class GetJMSTopic extends JmsConsumer { unsubscribe(serverUrl, username, password, subscriptionName, jmsProvider, timeoutMillis); } - /** - * Unsubscribes from a subscription using the supplied parameters - * - * @param url - * @param username - * @param password - * @param subscriptionId - * @throws JMSException - */ private void unsubscribe(final String url, final String username, final String password, final String subscriptionId, final String jmsProvider, final int timeoutMillis) throws JMSException { final Connection connection; if (username == null && password == null) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java index b9c348b..a157316 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java @@ -38,13 +38,18 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; @SideEffectFree @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) @CapabilityDescription("Fetches files from an SFTP Server and creates FlowFiles from them") -@WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"), - @WritesAttribute(attribute = "path", description = "The path is set to the path of the file's directory on the remote server. For example, if the <Remote Path> property is set to /tmp, files picked up from /tmp will have the path attribute set to /tmp. If the <Search Recursively> property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to /tmp/abc/1/2/3"), +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"), + @WritesAttribute(attribute = "path", description = "The path is set to the path of the file's directory on the remote server. " + + "For example, if the <Remote Path> property is set to /tmp, files picked up from /tmp will have the path attribute set " + + "to /tmp. If the <Search Recursively> property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path " + + "attribute will be set to /tmp/abc/1/2/3"), @WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the source file was last modified"), @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"), @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"), @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"), - @WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' attribute is still populated, but may be a relative path")}) + @WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' " + + "attribute is still populated, but may be a relative path")}) @SeeAlso(PutSFTP.class) public class GetSFTP extends GetFileTransfer { @@ -90,7 +95,10 @@ public class GetSFTP extends GetFileTransfer { final boolean privateKeySpecified = context.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).getValue() != null; if (!passwordSpecified && !privateKeySpecified) { - results.add(new ValidationResult.Builder().subject("Password").explanation("Either the Private Key Passphrase or the Password must be supplied").valid(false).build()); + results.add(new ValidationResult.Builder().subject("Password") + .explanation("Either the Private Key Passphrase or the Password must be supplied") + .valid(false) + .build()); } return results; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index 76bfadf..e7f28be 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -76,141 +76,156 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import com.sun.jersey.api.client.ClientResponse.Status; @Tags({"http", "https", "request", "listen", "ingress", "web service"}) -@CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. This Processor is designed to be used in conjunction with the HandleHttpResponse Processor in order to create a Web Service") -@WritesAttributes({@WritesAttribute(attribute = "http.context.identifier", description="An identifier that allows the HandleHttpRequest and HandleHttpResponse to coordinate which FlowFile belongs to which HTTP Request/Response."), - @WritesAttribute(attribute = "mime.type", description="The MIME Type of the data, according to the HTTP Header \"Content-Type\""), - @WritesAttribute(attribute = "http.servlet.path", description="The part of the request URL that is considered the Servlet Path"), - @WritesAttribute(attribute = "http.context.path", description="The part of the request URL that is considered to be the Context Path"), - @WritesAttribute(attribute = "http.method", description="The HTTP Method that was used for the request, such as GET or POST"), - @WritesAttribute(attribute = "http.query.string", description="The query string portion of hte Request URL"), - @WritesAttribute(attribute = "http.remote.host", description="The hostname of the requestor"), - @WritesAttribute(attribute = "http.remote.addr", description="The hostname:port combination of the requestor"), - @WritesAttribute(attribute = "http.remote.user", description="The username of the requestor"), - @WritesAttribute(attribute = "http.request.uri", description="The full Request URL"), - @WritesAttribute(attribute = "http.auth.type", description="The type of HTTP Authorization used"), - @WritesAttribute(attribute = "http.principal.name", description="The name of the authenticated user making the request"), - @WritesAttribute(attribute = "http.subject.dn", description="The Distinguished Name of the requestor. This value will not be populated unless the Processor is configured to use an SSLContext Service"), - @WritesAttribute(attribute = "http.issuer.dn", description="The Distinguished Name of the entity that issued the Subject's certificate. This value will not be populated unless the Processor is configured to use an SSLContext Service"), - @WritesAttribute(attribute = "http.headers.XXX", description="Each of the HTTP Headers that is received in the request will be added as an attribute, prefixed with \"http.headers.\" For example, if the request contains an HTTP Header named \"x-my-header\", then the value will be added to an attribute named \"http.headers.x-my-header\"")}) -@SeeAlso(value={HandleHttpResponse.class}, classNames={"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"}) +@CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. " + + "This Processor is designed to be used in conjunction with the HandleHttpResponse Processor in order to create a Web Service") +@WritesAttributes({ + @WritesAttribute(attribute = "http.context.identifier", description = "An identifier that allows the HandleHttpRequest and HandleHttpResponse " + + "to coordinate which FlowFile belongs to which HTTP Request/Response."), + @WritesAttribute(attribute = "mime.type", description = "The MIME Type of the data, according to the HTTP Header \"Content-Type\""), + @WritesAttribute(attribute = "http.servlet.path", description = "The part of the request URL that is considered the Servlet Path"), + @WritesAttribute(attribute = "http.context.path", description = "The part of the request URL that is considered to be the Context Path"), + @WritesAttribute(attribute = "http.method", description = "The HTTP Method that was used for the request, such as GET or POST"), + @WritesAttribute(attribute = "http.query.string", description = "The query string portion of hte Request URL"), + @WritesAttribute(attribute = "http.remote.host", description = "The hostname of the requestor"), + @WritesAttribute(attribute = "http.remote.addr", description = "The hostname:port combination of the requestor"), + @WritesAttribute(attribute = "http.remote.user", description = "The username of the requestor"), + @WritesAttribute(attribute = "http.request.uri", description = "The full Request URL"), + @WritesAttribute(attribute = "http.auth.type", description = "The type of HTTP Authorization used"), + @WritesAttribute(attribute = "http.principal.name", description = "The name of the authenticated user making the request"), + @WritesAttribute(attribute = "http.subject.dn", description = "The Distinguished Name of the requestor. This value will not be populated " + + "unless the Processor is configured to use an SSLContext Service"), + @WritesAttribute(attribute = "http.issuer.dn", description = "The Distinguished Name of the entity that issued the Subject's certificate. " + + "This value will not be populated unless the Processor is configured to use an SSLContext Service"), + @WritesAttribute(attribute = "http.headers.XXX", description = "Each of the HTTP Headers that is received in the request will be added as an " + + "attribute, prefixed with \"http.headers.\" For example, if the request contains an HTTP Header named \"x-my-header\", then the value " + + "will be added to an attribute named \"http.headers.x-my-header\"")}) +@SeeAlso(value = {HandleHttpResponse.class}, + classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"}) public class HandleHttpRequest extends AbstractProcessor { + public static final String HTTP_CONTEXT_ID = "http.context.identifier"; - private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&"); - + private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern. + compile("&"); + // Allowable values for client auth - public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication", "Processor will not authenticate clients. Anyone can communicate with this Processor anonymously"); - public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Want Authentication", "Processor will try to verify the client but if unable to verify will allow the client to communicate anonymously"); - public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Need Authentication", "Processor will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore specified in the SSL Context Service"); - - + public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication", + "Processor will not authenticate clients. Anyone can communicate with this Processor anonymously"); + public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Want Authentication", + "Processor will try to verify the client but if unable to verify will allow the client to communicate anonymously"); + public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Need Authentication", + "Processor will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore" + + "specified in the SSL Context Service"); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Listening Port") - .description("The Port to listen on for incoming HTTP requests") - .required(true) - .addValidator(StandardValidators.createLongValidator(0L, 65535L, true)) - .expressionLanguageSupported(false) - .defaultValue("80") - .build(); + .name("Listening Port") + .description("The Port to listen on for incoming HTTP requests") + .required(true) + .addValidator(StandardValidators.createLongValidator(0L, 65535L, true)) + .expressionLanguageSupported(false) + .defaultValue("80") + .build(); public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() - .name("Hostname") - .description("The Hostname to bind to. If not specified, will bind to all hosts") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Hostname") + .description("The Hostname to bind to. If not specified, will bind to all hosts") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor HTTP_CONTEXT_MAP = new PropertyDescriptor.Builder() - .name("HTTP Context Map") - .description("The HTTP Context Map Controller Service to use for caching the HTTP Request Information") - .required(true) - .identifiesControllerService(HttpContextMap.class) - .build(); + .name("HTTP Context Map") + .description("The HTTP Context Map Controller Service to use for caching the HTTP Request Information") + .required(true) + .identifiesControllerService(HttpContextMap.class) + .build(); public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("The SSL Context Service to use in order to secure the server. If specified, the server will accept only HTTPS requests; otherwise, the server will accept only HTTP requests") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); + .name("SSL Context Service") + .description("The SSL Context Service to use in order to secure the server. If specified, the server will accept only HTTPS requests; " + + "otherwise, the server will accept only HTTP requests") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); public static final PropertyDescriptor URL_CHARACTER_SET = new PropertyDescriptor.Builder() - .name("Default URL Character Set") - .description("The character set to use for decoding URL parameters if the HTTP Request does not supply one") - .required(true) - .defaultValue("UTF-8") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .build(); + .name("Default URL Character Set") + .description("The character set to use for decoding URL parameters if the HTTP Request does not supply one") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); public static final PropertyDescriptor PATH_REGEX = new PropertyDescriptor.Builder() - .name("Allowed Paths") - .description("A Regular Expression that specifies the valid HTTP Paths that are allowed in the incoming URL Requests. If this value is specified and the path of the HTTP Requests does not match this Regular Expression, the Processor will respond with a 404: NotFound") - .required(false) - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Allowed Paths") + .description("A Regular Expression that specifies the valid HTTP Paths that are allowed in the incoming URL Requests. If this value is " + + "specified and the path of the HTTP Requests does not match this Regular Expression, the Processor will respond with a " + + "404: NotFound") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor ALLOW_GET = new PropertyDescriptor.Builder() - .name("Allow GET") - .description("Allow HTTP GET Method") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Allow GET") + .description("Allow HTTP GET Method") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); public static final PropertyDescriptor ALLOW_POST = new PropertyDescriptor.Builder() - .name("Allow POST") - .description("Allow HTTP POST Method") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - public static final PropertyDescriptor ALLOW_PUT = new PropertyDescriptor.Builder() - .name("Allow PUT") - .description("Allow HTTP PUT Method") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Allow POST") + .description("Allow HTTP POST Method") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + public static final PropertyDescriptor ALLOW_PUT = new PropertyDescriptor.Builder(). + name("Allow PUT"). + description("Allow HTTP PUT Method"). + required(true). + allowableValues("true", "false"). + defaultValue("true"). + build(); public static final PropertyDescriptor ALLOW_DELETE = new PropertyDescriptor.Builder() - .name("Allow DELETE") - .description("Allow HTTP DELETE Method") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Allow DELETE") + .description("Allow HTTP DELETE Method") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); public static final PropertyDescriptor ALLOW_HEAD = new PropertyDescriptor.Builder() - .name("Allow HEAD") - .description("Allow HTTP HEAD Method") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); + .name("Allow HEAD") + .description("Allow HTTP HEAD Method") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); public static final PropertyDescriptor ALLOW_OPTIONS = new PropertyDescriptor.Builder() - .name("Allow OPTIONS") - .description("Allow HTTP OPTIONS Method") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); + .name("Allow OPTIONS") + .description("Allow HTTP OPTIONS Method") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); public static final PropertyDescriptor ADDITIONAL_METHODS = new PropertyDescriptor.Builder() - .name("Additional HTTP Methods") - .description("A comma-separated list of non-standard HTTP Methods that should be allowed") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Additional HTTP Methods") + .description("A comma-separated list of non-standard HTTP Methods that should be allowed") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() - .name("Client Authentication") - .description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.") - .required(true) - .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED) - .defaultValue(CLIENT_NONE.getValue()) - .build(); - - + .name("Client Authentication") + .description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> " + + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.") + .required(true) + .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED) + .defaultValue(CLIENT_NONE.getValue()) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All content that is received is routed to the 'success' relationship") - .build(); - + .name("success") + .description("All content that is received is routed to the 'success' relationship") + .build(); + private volatile Server server; private final BlockingQueue<HttpRequestContainer> containerQueue = new LinkedBlockingQueue<>(50); - - + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> descriptors = new ArrayList<>(); @@ -231,39 +246,38 @@ public class HandleHttpRequest extends AbstractProcessor { return descriptors; } - + @Override public Set<Relationship> getRelationships() { return Collections.singleton(REL_SUCCESS); } - - + @OnScheduled public void initializeServer(final ProcessContext context) throws Exception { final String host = context.getProperty(HOSTNAME).getValue(); final int port = context.getProperty(PORT).asInteger(); final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); - + final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); final boolean need; final boolean want; - if ( CLIENT_NEED.equals(clientAuthValue) ) { + if (CLIENT_NEED.equals(clientAuthValue)) { need = true; want = false; - } else if ( CLIENT_WANT.equals(clientAuthValue) ) { + } else if (CLIENT_WANT.equals(clientAuthValue)) { need = false; want = true; } else { need = false; want = false; } - + final SslContextFactory sslFactory = (sslService == null) ? null : createSslFactory(sslService, need, want); final Server server = new Server(port); - + // create the http configuration final HttpConfiguration httpConfiguration = new HttpConfiguration(); - if ( sslFactory == null ) { + if (sslFactory == null) { // create the connector final ServerConnector http = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration)); @@ -274,7 +288,7 @@ public class HandleHttpRequest extends AbstractProcessor { http.setPort(port); // add this connector - server.setConnectors(new Connector[] {http}); + server.setConnectors(new Connector[]{http}); } else { // add some secure config final HttpConfiguration httpsConfiguration = new HttpConfiguration(httpConfiguration); @@ -283,9 +297,7 @@ public class HandleHttpRequest extends AbstractProcessor { httpsConfiguration.addCustomizer(new SecureRequestCustomizer()); // build the connector - final ServerConnector https = new ServerConnector(server, - new SslConnectionFactory(sslFactory, "http/1.1"), - new HttpConnectionFactory(httpsConfiguration)); + final ServerConnector https = new ServerConnector(server, new SslConnectionFactory(sslFactory, "http/1.1"), new HttpConnectionFactory(httpsConfiguration)); // set host and port if (StringUtils.isNotBlank(host)) { @@ -294,87 +306,88 @@ public class HandleHttpRequest extends AbstractProcessor { https.setPort(port); // add this connector - server.setConnectors(new Connector[] {https}); + server.setConnectors(new Connector[]{https}); } - + final Set<String> allowedMethods = new HashSet<>(); - if ( context.getProperty(ALLOW_GET).asBoolean() ) { + if (context.getProperty(ALLOW_GET).asBoolean()) { allowedMethods.add("GET"); } - if ( context.getProperty(ALLOW_POST).asBoolean() ) { + if (context.getProperty(ALLOW_POST).asBoolean()) { allowedMethods.add("POST"); } - if ( context.getProperty(ALLOW_PUT).asBoolean() ) { + if (context.getProperty(ALLOW_PUT).asBoolean()) { allowedMethods.add("PUT"); } - if ( context.getProperty(ALLOW_DELETE).asBoolean() ) { + if (context.getProperty(ALLOW_DELETE).asBoolean()) { allowedMethods.add("DELETE"); } - if ( context.getProperty(ALLOW_HEAD).asBoolean() ) { + if (context.getProperty(ALLOW_HEAD).asBoolean()) { allowedMethods.add("HEAD"); } - if ( context.getProperty(ALLOW_OPTIONS).asBoolean() ) { + if (context.getProperty(ALLOW_OPTIONS).asBoolean()) { allowedMethods.add("OPTIONS"); } - + final String additionalMethods = context.getProperty(ADDITIONAL_METHODS).getValue(); - if ( additionalMethods != null ) { - for ( final String additionalMethod : additionalMethods.split(",") ) { + if (additionalMethods != null) { + for (final String additionalMethod : additionalMethods.split(",")) { final String trimmed = additionalMethod.trim(); - if ( !trimmed.isEmpty() ) { + if (!trimmed.isEmpty()) { allowedMethods.add(trimmed.toUpperCase()); } } } - + final String pathRegex = context.getProperty(PATH_REGEX).getValue(); final Pattern pathPattern = (pathRegex == null) ? null : Pattern.compile(pathRegex); - + server.setHandler(new AbstractHandler() { @Override - public void handle(final String target, final Request baseRequest, final HttpServletRequest request, - final HttpServletResponse response) throws IOException, ServletException { - + public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) + throws IOException, ServletException { + final String requestUri = request.getRequestURI(); - if ( !allowedMethods.contains(request.getMethod().toUpperCase()) ) { - getLogger().info("Sending back METHOD_NOT_ALLOWED response to {}; method was {}; request URI was {}", - new Object[] {request.getRemoteAddr(), request.getMethod(), requestUri}); + if (!allowedMethods.contains(request.getMethod().toUpperCase())) { + getLogger().info("Sending back METHOD_NOT_ALLOWED response to {}; method was {}; request URI was {}", + new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri}); response.sendError(Status.METHOD_NOT_ALLOWED.getStatusCode()); return; } - - if ( pathPattern != null ) { + + if (pathPattern != null) { final URI uri; try { uri = new URI(requestUri); } catch (final URISyntaxException e) { throw new ServletException(e); } - - if ( !pathPattern.matcher(uri.getPath()).matches() ) { + + if (!pathPattern.matcher(uri.getPath()).matches()) { response.sendError(Status.NOT_FOUND.getStatusCode()); - getLogger().info("Sending back NOT_FOUND response to {}; request was {} {}", - new Object[] {request.getRemoteAddr(), request.getMethod(), requestUri}); + getLogger().info("Sending back NOT_FOUND response to {}; request was {} {}", + new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri}); return; } } - + // If destination queues full, send back a 503: Service Unavailable. - if ( context.getAvailableRelationships().isEmpty() ) { + if (context.getAvailableRelationships().isEmpty()) { response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return; } - + // Right now, that information, though, is only in the ProcessSession, not the ProcessContext, // so it is not known to us. Should see if it can be added to the ProcessContext. final AsyncContext async = baseRequest.startAsync(); final boolean added = containerQueue.offer(new HttpRequestContainer(request, response, async)); - - if ( added ) { - getLogger().debug("Added Http Request to queue for {} {} from {}", new Object[] {request.getMethod(), requestUri, request.getRemoteAddr()}); + + if (added) { + getLogger().debug("Added Http Request to queue for {} {} from {}", + new Object[]{request.getMethod(), requestUri, request.getRemoteAddr()}); } else { - getLogger().info("Sending back a SERVICE_UNAVAILABLE response to {}; request was {} {}", - new Object[] {request.getRemoteAddr(), request.getMethod(), request.getRemoteAddr()}); + getLogger().info("Sending back a SERVICE_UNAVAILABLE response to {}; request was {} {}", + new Object[]{request.getRemoteAddr(), request.getMethod(), request.getRemoteAddr()}); response.sendError(Status.SERVICE_UNAVAILABLE.getStatusCode()); response.flushBuffer(); @@ -382,63 +395,63 @@ public class HandleHttpRequest extends AbstractProcessor { } } }); - + this.server = server; server.start(); - + getLogger().info("Server started and listening on port " + getPort()); } - + protected int getPort() { - for ( final Connector connector : server.getConnectors() ) { - if ( connector instanceof ServerConnector ) { + for (final Connector connector : server.getConnectors()) { + if (connector instanceof ServerConnector) { return ((ServerConnector) connector).getLocalPort(); } } - + throw new IllegalStateException("Server is not listening on any ports"); } - + protected int getRequestQueueSize() { return containerQueue.size(); } - + private SslContextFactory createSslFactory(final SSLContextService sslService, final boolean needClientAuth, final boolean wantClientAuth) { final SslContextFactory sslFactory = new SslContextFactory(); - + sslFactory.setNeedClientAuth(needClientAuth); sslFactory.setWantClientAuth(wantClientAuth); - - if ( sslService.isKeyStoreConfigured() ) { + + if (sslService.isKeyStoreConfigured()) { sslFactory.setKeyStorePath(sslService.getKeyStoreFile()); sslFactory.setKeyStorePassword(sslService.getKeyStorePassword()); sslFactory.setKeyStoreType(sslService.getKeyStoreType()); } - if ( sslService.isTrustStoreConfigured() ) { + if (sslService.isTrustStoreConfigured()) { sslFactory.setTrustStorePath(sslService.getTrustStoreFile()); sslFactory.setTrustStorePassword(sslService.getTrustStorePassword()); sslFactory.setTrustStoreType(sslService.getTrustStoreType()); } - + return sslFactory; } - + @OnStopped public void shutdown() throws Exception { - if ( server != null ) { + if (server != null) { getLogger().debug("Shutting down server"); server.stop(); server.destroy(); server.join(); - getLogger().info("Shut down {}", new Object[] {server}); + getLogger().info("Shut down {}", new Object[]{server}); } } - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final HttpRequestContainer container = containerQueue.poll(); - if ( container == null ) { + if (container == null) { return; } @@ -448,13 +461,14 @@ public class HandleHttpRequest extends AbstractProcessor { try { flowFile = session.importFrom(request.getInputStream(), flowFile); } catch (final IOException e) { - getLogger().error("Failed to receive content from HTTP Request from {} due to {}", new Object[] {request.getRemoteAddr(), e}); + getLogger().error("Failed to receive content from HTTP Request from {} due to {}", + new Object[]{request.getRemoteAddr(), e}); session.remove(flowFile); return; } - + final String charset = request.getCharacterEncoding() == null ? context.getProperty(URL_CHARACTER_SET).getValue() : request.getCharacterEncoding(); - + final String contextIdentifier = UUID.randomUUID().toString(); final Map<String, String> attributes = new HashMap<>(); try { @@ -465,7 +479,7 @@ public class HandleHttpRequest extends AbstractProcessor { putAttribute(attributes, "http.method", request.getMethod()); putAttribute(attributes, "http.local.addr", request.getLocalAddr()); putAttribute(attributes, "http.local.name", request.getLocalName()); - if ( request.getQueryString() != null ) { + if (request.getQueryString() != null) { putAttribute(attributes, "http.query.string", URLDecoder.decode(request.getQueryString(), charset)); } putAttribute(attributes, "http.remote.host", request.getRemoteHost()); @@ -474,26 +488,26 @@ public class HandleHttpRequest extends AbstractProcessor { putAttribute(attributes, "http.request.uri", request.getRequestURI()); putAttribute(attributes, "http.request.url", request.getRequestURL().toString()); putAttribute(attributes, "http.auth.type", request.getAuthType()); - + putAttribute(attributes, "http.requested.session.id", request.getRequestedSessionId()); - if ( request.getDispatcherType() != null ) { + if (request.getDispatcherType() != null) { putAttribute(attributes, "http.dispatcher.type", request.getDispatcherType().name()); } putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding()); putAttribute(attributes, "http.locale", request.getLocale()); putAttribute(attributes, "http.server.name", request.getServerName()); putAttribute(attributes, "http.server.port", request.getServerPort()); - + final Enumeration<String> paramEnumeration = request.getParameterNames(); - while ( paramEnumeration.hasMoreElements() ) { + while (paramEnumeration.hasMoreElements()) { final String paramName = paramEnumeration.nextElement(); final String value = request.getParameter(paramName); attributes.put("http.param." + paramName, value); } - + final Cookie[] cookies = request.getCookies(); - if ( cookies != null ) { - for ( final Cookie cookie : cookies ) { + if (cookies != null) { + for (final Cookie cookie : cookies) { final String name = cookie.getName(); final String cookiePrefix = "http.cookie." + name + "."; attributes.put(cookiePrefix + "value", cookie.getValue()); @@ -504,25 +518,25 @@ public class HandleHttpRequest extends AbstractProcessor { attributes.put(cookiePrefix + "secure", String.valueOf(cookie.getSecure())); } } - + final String queryString = request.getQueryString(); - if ( queryString != null ) { + if (queryString != null) { final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString); - for ( final String keyValueString : params ) { + for (final String keyValueString : params) { final int indexOf = keyValueString.indexOf("="); - if ( indexOf < 0 ) { + if (indexOf < 0) { // no =, then it's just a key with no value attributes.put("http.query.param." + URLDecoder.decode(keyValueString, charset), ""); } else { final String key = keyValueString.substring(0, indexOf); final String value; - - if ( indexOf == keyValueString.length() - 1 ) { + + if (indexOf == keyValueString.length() - 1) { value = ""; } else { value = keyValueString.substring(indexOf + 1); } - + attributes.put("http.query.param." + URLDecoder.decode(key, charset), URLDecoder.decode(value, charset)); } } @@ -530,79 +544,81 @@ public class HandleHttpRequest extends AbstractProcessor { } catch (final UnsupportedEncodingException uee) { throw new ProcessException("Invalid character encoding", uee); // won't happen because charset has been validated } - + final Enumeration<String> headerNames = request.getHeaderNames(); - while ( headerNames.hasMoreElements() ) { + while (headerNames.hasMoreElements()) { final String headerName = headerNames.nextElement(); final String headerValue = request.getHeader(headerName); putAttribute(attributes, "http.headers." + headerName, headerValue); } - + final Principal principal = request.getUserPrincipal(); - if ( principal != null ) { + if (principal != null) { putAttribute(attributes, "http.principal.name", principal.getName()); } - + final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); final String subjectDn; - if ( certs != null && certs.length > 0 ) { + if (certs != null && certs.length > 0) { final X509Certificate cert = certs[0]; subjectDn = cert.getSubjectDN().getName(); final String issuerDn = cert.getIssuerDN().getName(); - + putAttribute(attributes, "http.subject.dn", subjectDn); putAttribute(attributes, "http.issuer.dn", issuerDn); } else { subjectDn = null; } - + flowFile = session.putAllAttributes(flowFile, attributes); - + final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class); - final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext()); - - if ( !registered ) { - getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE", new Object[] {request.getRemoteAddr()}); - + final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext()); + + if (!registered) { + getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE", + new Object[]{request.getRemoteAddr()}); + try { container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode()); container.getResponse().flushBuffer(); container.getContext().complete(); } catch (final Exception e) { - getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}", new Object[] {request.getRemoteAddr(), e}); + getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}", + new Object[]{request.getRemoteAddr(), e}); } - + return; } - + final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); session.getProvenanceReporter().receive(flowFile, request.getRequestURI(), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Transferring {} to 'success'; received from {}", new Object[] {flowFile, request.getRemoteAddr()}); + getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()}); } - + private void putAttribute(final Map<String, String> map, final String key, final Object value) { - if ( value == null ) { + if (value == null) { return; } - + putAttribute(map, key, value.toString()); } - + private void putAttribute(final Map<String, String> map, final String key, final String value) { - if ( value == null ) { + if (value == null) { return; } - + map.put(key, value); } - - + private static class HttpRequestContainer { + private final HttpServletRequest request; private final HttpServletResponse response; private final AsyncContext context; - + public HttpRequestContainer(final HttpServletRequest request, final HttpServletResponse response, final AsyncContext async) { this.request = request; this.response = response; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java index bfe53ef..0201730 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java @@ -42,38 +42,41 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @Tags({"http", "https", "response", "egress", "web service"}) -@CapabilityDescription("Sends an HTTP Response to the Requestor that generated a FlowFile. This Processor is designed to be used in conjunction with the HandleHttpRequest in order to create a web service.") -@DynamicProperty(name="An HTTP header name", value="An HTTP header value", description="These HTTPHeaders are set in the HTTP Response") -@ReadsAttribute(attribute="http.context.identifier", description="The value of this attribute is used to lookup the HTTP Response so that the proper message can be sent back to the requestor. If this attribute is missing, the FlowFile will be routed to 'failure.'") -@SeeAlso(value={HandleHttpRequest.class}, classNames={"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"}) +@CapabilityDescription("Sends an HTTP Response to the Requestor that generated a FlowFile. This Processor is designed to be used in conjunction with " + + "the HandleHttpRequest in order to create a web service.") +@DynamicProperty(name = "An HTTP header name", value = "An HTTP header value", description = "These HTTPHeaders are set in the HTTP Response") +@ReadsAttribute(attribute = "http.context.identifier", description = "The value of this attribute is used to lookup the HTTP Response so that the " + + "proper message can be sent back to the requestor. If this attribute is missing, the FlowFile will be routed to 'failure.'") +@SeeAlso(value = {HandleHttpRequest.class}, classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"}) public class HandleHttpResponse extends AbstractProcessor { + public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+"); public static final String HTTP_CONTEXT_ID = "http.context.identifier"; public static final PropertyDescriptor STATUS_CODE = new PropertyDescriptor.Builder() - .name("HTTP Status Code") - .description("The HTTP Status Code to use when responding to the HTTP Request. See Section 10 of RFC 2616 for more information.") - .required(true) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("HTTP Status Code") + .description("The HTTP Status Code to use when responding to the HTTP Request. See Section 10 of RFC 2616 for more information.") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor HTTP_CONTEXT_MAP = new PropertyDescriptor.Builder() - .name("HTTP Context Map") - .description("The HTTP Context Map Controller Service to use for caching the HTTP Request Information") - .required(true) - .identifiesControllerService(HttpContextMap.class) - .build(); - + .name("HTTP Context Map") + .description("The HTTP Context Map Controller Service to use for caching the HTTP Request Information") + .required(true) + .identifiesControllerService(HttpContextMap.class) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles will be routed to this Relationship after the response has been successfully sent to the requestor") - .build(); + .name("success") + .description("FlowFiles will be routed to this Relationship after the response has been successfully sent to the requestor") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles will be routed to this Relationship if the Processor is unable to respond to the requestor. This may happen, for instance, if the connection times out or if NiFi is restarted before responding to the HTTP Request.") - .build(); + .name("failure") + .description("FlowFiles will be routed to this Relationship if the Processor is unable to respond to the requestor. This may happen, " + + "for instance, if the connection times out or if NiFi is restarted before responding to the HTTP Request.") + .build(); - @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(); @@ -81,7 +84,7 @@ public class HandleHttpResponse extends AbstractProcessor { properties.add(HTTP_CONTEXT_MAP); return properties; } - + @Override public Set<Relationship> getRelationships() { final Set<Relationship> relationships = new HashSet<>(); @@ -89,96 +92,96 @@ public class HandleHttpResponse extends AbstractProcessor { relationships.add(REL_FAILURE); return relationships; } - + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .description("Specifies the value to send for the '" + propertyDescriptorName + "' HTTP Header") - .name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .dynamic(true) - .expressionLanguageSupported(true) - .build(); + .description("Specifies the value to send for the '" + propertyDescriptorName + "' HTTP Header") + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); } - - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); - if ( flowFile == null ) { + if (flowFile == null) { return; } - + final String contextIdentifier = flowFile.getAttribute(HTTP_CONTEXT_ID); - if ( contextIdentifier == null ) { + if (contextIdentifier == null) { session.transfer(flowFile, REL_FAILURE); - getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an " - + "'http.context.identifier' attribute", new Object[] {flowFile}); + getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an 'http.context.identifier' attribute", + new Object[]{flowFile}); return; } - + final String statusCodeValue = context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue(); - if ( !isNumber(statusCodeValue) ) { + if (!isNumber(statusCodeValue)) { session.transfer(flowFile, REL_FAILURE); - getLogger().error("Failed to response to HTTP request for {} because status code was '{}', which is not a valid number", new Object[] {flowFile, statusCodeValue}); + getLogger().error("Failed to response to HTTP request for {} because status code was '{}', which is not a valid number", new Object[]{flowFile, statusCodeValue}); } - + final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class); final HttpServletResponse response = contextMap.getResponse(contextIdentifier); - if ( response == null ) { + if (response == null) { session.transfer(flowFile, REL_FAILURE); - getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' " - + "attribute of {} but could not find an HTTP Response Object for this identifier", new Object[] {flowFile, HTTP_CONTEXT_ID, contextIdentifier}); + getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' attribute of {} but could not find an HTTP Response Object for this identifier", + new Object[]{flowFile, HTTP_CONTEXT_ID, contextIdentifier}); return; } - + final int statusCode = Integer.parseInt(statusCodeValue); response.setStatus(statusCode); - - for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) { + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); - if ( descriptor.isDynamic() ) { + if (descriptor.isDynamic()) { final String headerName = descriptor.getName(); final String headerValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); - - if ( !headerValue.trim().isEmpty() ) { + + if (!headerValue.trim().isEmpty()) { response.setHeader(headerName, headerValue); } } } - + try { session.exportTo(flowFile, response.getOutputStream()); response.flushBuffer(); } catch (final IOException ioe) { session.transfer(flowFile, REL_FAILURE); - getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[] {flowFile, ioe}); + getLogger(). + error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, ioe}); return; } - + try { contextMap.complete(contextIdentifier); } catch (final IllegalStateException ise) { - getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[] {flowFile, ise}); + getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ise}); session.transfer(flowFile, REL_FAILURE); return; } - + session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[] {flowFile, statusCode}); + getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[]{flowFile, statusCode}); } private static boolean isNumber(final String value) { - if ( value.length() == 0 ) { + if (value.length() == 0) { return false; } - - for (int i=0; i < value.length(); i++) { - if ( !Character.isDigit(value.charAt(i)) ) { + + for (int i = 0; i < value.length(); i++) { + if (!Character.isDigit(value.charAt(i))) { return false; } } - + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java index c5493e8..9187aad 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java @@ -115,12 +115,14 @@ import org.apache.nifi.processor.util.StandardValidators; + "and the value of the property is a regular expression that, if matched by the attribute value, will cause that attribute " + "to be used as part of the hash. If the regular expression contains a capturing group, only the value of the capturing " + "group will be used.") -@WritesAttribute(attribute="<Hash Value Attribute Key>", description="This Processor adds an attribute whose value is the result of Hashing the existing FlowFile attributes. The name of this attribute is specified by the <Hash Value Attribute Key> property.") -@DynamicProperty(name="A flowfile attribute key for attribute inspection", value="A Regular Expression", description="This regular expression is evaluated against the " + - "flowfile attribute values. If the regular expression contains a capturing " + - "group, the value of that group will be used when comparing flow file " + - "attributes. Otherwise, the original flow file attribute's value will be used " + - "if and only if the value matches the given regular expression.") +@WritesAttribute(attribute = "<Hash Value Attribute Key>", description = "This Processor adds an attribute whose value is the result of " + + "Hashing the existing FlowFile attributes. The name of this attribute is specified by the <Hash Value Attribute Key> property.") +@DynamicProperty(name = "A flowfile attribute key for attribute inspection", value = "A Regular Expression", + description = "This regular expression is evaluated against the " + + "flowfile attribute values. If the regular expression contains a capturing " + + "group, the value of that group will be used when comparing flow file " + + "attributes. Otherwise, the original flow file attribute's value will be used " + + "if and only if the value matches the given regular expression.") public class HashAttribute extends AbstractProcessor { public static final PropertyDescriptor HASH_VALUE_ATTRIBUTE = new PropertyDescriptor.Builder() @@ -130,12 +132,19 @@ public class HashAttribute extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Used for FlowFiles that have a hash value added").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Used for FlowFiles that are missing required attributes").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Used for FlowFiles that have a hash value added") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Used for FlowFiles that are missing required attributes") + .build(); private Set<Relationship> relationships; private List<PropertyDescriptor> properties; - private final AtomicReference<Map<String, Pattern>> regexMapRef = new AtomicReference<>(Collections.<String, Pattern>emptyMap()); + private final AtomicReference<Map<String, Pattern>> regexMapRef = new AtomicReference<>(Collections. + <String, Pattern>emptyMap()); @Override protected void init(final ProcessorInitializationContext context) { @@ -162,11 +171,12 @@ public class HashAttribute extends AbstractProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .addValidator(StandardValidators.createRegexValidator(0, 1, false)) - .required(false) - .dynamic(true) - .build(); + .name(propertyDescriptorName). + addValidator(StandardValidators. + createRegexValidator(0, 1, false)). + required(false). + dynamic(true). + build(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java index 99fd58b..526754e 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java @@ -51,8 +51,10 @@ import org.apache.nifi.util.ObjectHolder; @EventDriven @SupportsBatching @Tags({"hash", "content", "MD5", "SHA-1", "SHA-256"}) -@CapabilityDescription("Calculates a hash value for the Content of a FlowFile and puts that hash value on the FlowFile as an attribute whose name is determined by the <Hash Attribute Name> property") -@WritesAttribute(attribute="<Hash Attribute Name>", description="This Processor adds an attribute whose value is the result of Hashing the existing FlowFile attributes. The name of this attribute is specified by the <Hash Attribute Name> property") +@CapabilityDescription("Calculates a hash value for the Content of a FlowFile and puts that hash value on the FlowFile as an attribute whose name " + + "is determined by the <Hash Attribute Name> property") +@WritesAttribute(attribute = "<Hash Attribute Name>", description = "This Processor adds an attribute whose value is the result of Hashing the " + + "existing FlowFile attributes. The name of this attribute is specified by the <Hash Attribute Name> property") public class HashContent extends AbstractProcessor { public static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder() @@ -71,10 +73,14 @@ public class HashContent extends AbstractProcessor { .defaultValue("MD5") .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") - .description("FlowFiles that are process successfully will be sent to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("Any FlowFile that cannot be processed successfully will be sent to this relationship without any attribute being added").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are process successfully will be sent to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be processed successfully will be sent to this relationship without any attribute being added") + .build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java index e31a465..5f16ff3 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/IdentifyMimeType.java @@ -47,17 +47,14 @@ import org.apache.tika.mime.MediaType; import org.apache.tika.mime.MimeType; import org.apache.tika.mime.MimeTypeException; - /** * <p> - * Attempts to detect the MIME Type of a FlowFile by examining its contents. If - * the MIME Type is determined, it is added to an attribute with the name - * mime.type. In addition, mime.extension is set if a common file extension is known. + * Attempts to detect the MIME Type of a FlowFile by examining its contents. If the MIME Type is determined, it is added to an attribute with the name mime.type. In addition, mime.extension is set if + * a common file extension is known. * </p> * * <p> - * MIME Type detection is performed by Apache Tika; more information about - * detection is available at http://tika.apache.org. + * MIME Type detection is performed by Apache Tika; more information about detection is available at http://tika.apache.org. * * <ul> * <li>application/flowfile-v3</li> @@ -73,10 +70,14 @@ import org.apache.tika.mime.MimeTypeException; + "an attribute with the name 'mime.type' is added with the value being the MIME Type. If the MIME Type cannot be determined, " + "the value will be set to 'application/octet-stream'. In addition, the attribute mime.extension will be set if a common file " + "extension for the MIME Type is known.") -@WritesAttribute(attribute="mime.type", description="This Processor sets the FlowFile's mime.type attribute to the detected MIME Type. If unable to detect the MIME Type, the attribute's value will be set to application/octet-stream") +@WritesAttribute(attribute = "mime.type", description = "This Processor sets the FlowFile's mime.type attribute to the detected MIME Type. " + + "If unable to detect the MIME Type, the attribute's value will be set to application/octet-stream") public class IdentifyMimeType extends AbstractProcessor { - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to success").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are routed to success") + .build(); public static final MediaType FLOWFILE_V1 = new MediaType("application", "flowfile-v1"); public static final MediaType FLOWFILE_V3 = new MediaType("application", "flowfile-v3");
