NIFI-1086 Provide refactoring of InvokeHTTP NIFI-980 Add support for HTTP Digest authentication to InvokeHttp NIFI-1080 Provide additional InvokeHttp unit tests NIFI-1133 InvokeHTTP Processor does not save Location header for 3xx responses NIFI-1009 InvokeHTTP should be able to be scheduled without any incoming connection for GET operations NIFI-61 Multiple improvements for InvokeHTTP inclusive of providing unique tx.id across clusters, dynamic HTTP header properties
Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8c2323dc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8c2323dc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8c2323dc Branch: refs/heads/master Commit: 8c2323dc8d0e107f1a99898370c7515fa9603122 Parents: fb335ea Author: Joseph Percivall <joeperciv...@yahoo.com> Authored: Mon Nov 2 15:45:20 2015 -0500 Committer: Aldrin Piri <ald...@apache.org> Committed: Thu Nov 19 01:40:21 2015 -0500 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 52 +- .../nifi/processors/standard/InvokeHTTP.java | 1119 ++++++++++-------- .../processors/standard/TestInvokeHTTP.java | 91 +- .../processors/standard/TestInvokeHttpSSL.java | 3 +- .../standard/util/TestInvokeHttpCommon.java | 1029 ++++++++++++---- pom.xml | 38 + 6 files changed, 1603 insertions(+), 729 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 76f9daf..0427927 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -126,11 +126,6 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> <artifactId>nifi-socket-utils</artifactId> </dependency> <dependency> @@ -138,11 +133,6 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-load-distribution-service-api</artifactId> </dependency> <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-distributed-cache-client-service</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> @@ -154,21 +144,11 @@ language governing permissions and limitations under the License. --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-broker</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> </dependency> <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-core</artifactId> <version>1.7</version> @@ -189,7 +169,37 @@ language governing permissions and limitations under the License. --> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> </dependency> - + <dependency> + <groupId>com.squareup.okhttp</groupId> + <artifactId>okhttp</artifactId> + <version>2.5.0</version> + </dependency> + <dependency> + <groupId>com.burgstaller</groupId> + <artifactId>okhttp-digest</artifactId> + <version>0.4</version> + <type>jar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index d827658..2a9760d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -18,13 +18,8 @@ package org.apache.nifi.processors.standard; import static org.apache.commons.lang3.StringUtils.trimToEmpty; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.Proxy; import java.net.Proxy.Type; @@ -41,30 +36,49 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; -import org.apache.commons.codec.binary.Base64; +import com.burgstaller.okhttp.AuthenticationCacheInterceptor; +import com.burgstaller.okhttp.CachingAuthenticatorDecorator; +import com.burgstaller.okhttp.DispatchingAuthenticator; +import com.burgstaller.okhttp.digest.CachingAuthenticator; +import com.burgstaller.okhttp.digest.DigestAuthenticator; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; + +import com.squareup.okhttp.ResponseBody; +import okio.BufferedSink; +import org.apache.commons.io.input.TeeInputStream; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -72,118 +86,61 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService.ClientAuth; +import org.apache.nifi.stream.io.StreamUtils; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @SupportsBatching @Tags({"http", "https", "rest", "client"}) -@InputRequirement(Requirement.INPUT_REQUIRED) +@InputRequirement(Requirement.INPUT_ALLOWED) +@TriggerWhenEmpty @CapabilityDescription("An HTTP client processor which converts FlowFile attributes to HTTP headers, with configurable HTTP method, url, etc.") @WritesAttributes({ @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"), @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"), - @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"), + @WritesAttribute(attribute = "invokehttp.response.body", description = "In the instance where the status code received is not a success (2xx) " + + "then the response body will be put to the 'invokehttp.response.body' attribute of the request FlowFile."), @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"), @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"), - @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")}) -@DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted " - + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.") + @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server"), + @WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to " + + "will become the attribute key and the value would be the body of the HTTP response.")}) +@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", supportsExpressionLanguage = true, description = "Send request header " + + "with a key matching the Dynamic Property Key and a value created by evaluating the Attribute Expression Language set in the value " + + "of the Dynamic Property.") public final class InvokeHTTP extends AbstractProcessor { - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return Config.PROPERTIES; - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { - if (Config.PROP_TRUSTED_HOSTNAME.getName().equalsIgnoreCase(propertyDescriptorName)) { - return Config.PROP_TRUSTED_HOSTNAME; - } - return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName); - } - - @Override - public Set<Relationship> getRelationships() { - return Config.RELATIONSHIPS; - } - - private volatile Pattern attributesToSend = null; - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - final String trimmedValue = StringUtils.trimToEmpty(newValue); - - // compile the attributes-to-send filter pattern - if (Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) { - if (newValue.isEmpty()) { - attributesToSend = null; - } else { - attributesToSend = Pattern.compile(trimmedValue); - } - } - } - - @Override - protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { - final List<ValidationResult> results = new ArrayList<>(1); - final boolean proxyHostSet = validationContext.getProperty(Config.PROP_PROXY_HOST).isSet(); - final boolean proxyPortSet = validationContext.getProperty(Config.PROP_PROXY_PORT).isSet(); - - if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) { - results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build()); - } - - return results; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final SSLContextService sslService = context.getProperty(Config.PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE); - - Transaction transaction = new Transaction(getLogger(), sslContext, attributesToSend, context, session, flowFile); - transaction.process(); - } - - /** - * Stores properties, relationships, configuration values, hard coded strings, magic numbers, etc. - */ - public interface Config { - // flowfile attribute keys returned after reading the response - String STATUS_CODE = "invokehttp.status.code"; - String STATUS_MESSAGE = "invokehttp.status.message"; - String RESPONSE_BODY = "invokehttp.response.body"; - String REQUEST_URL = "invokehttp.request.url"; - String TRANSACTION_ID = "invokehttp.tx.id"; - String REMOTE_DN = "invokehttp.remote.dn"; - - // Set of flowfile attributes which we generally always ignore during - // processing, including when converting http headers, copying attributes, etc. - // This set includes our strings defined above as well as some standard flowfile - // attributes. - public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + // flowfile attribute keys returned after reading the response + public final static String STATUS_CODE = "invokehttp.status.code"; + public final static String STATUS_MESSAGE = "invokehttp.status.message"; + public final static String RESPONSE_BODY = "invokehttp.response.body"; + public final static String REQUEST_URL = "invokehttp.request.url"; + public final static String TRANSACTION_ID = "invokehttp.tx.id"; + public final static String REMOTE_DN = "invokehttp.remote.dn"; + + // Set of flowfile attributes which we generally always ignore during + // processing, including when converting http headers, copying attributes, etc. + // This set includes our strings defined above as well as some standard flowfile + // attributes. + public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN, "uuid", "filename", "path"))); - // properties - public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() + // properties + public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() .name("HTTP Method") - .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).") + .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported but will " + + "be sent without a message body.") .required(true) .defaultValue("GET") .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) .build(); - public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder() .name("Remote URL") .description("Remote URL which will be connected to, including scheme, host, port, path.") .required(true) @@ -191,7 +148,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.URL_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder() .name("Connection Timeout") .description("Max wait time for connection to remote service.") .required(true) @@ -199,7 +156,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder() .name("Read Timeout") .description("Max wait time for response from remote service.") .required(true) @@ -207,7 +164,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder() .name("Include Date Header") .description("Include an RFC-2616 Date header in the request.") .required(true) @@ -216,7 +173,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() .name("Follow Redirects") .description("Follow HTTP redirects issued by remote server.") .required(true) @@ -225,48 +182,50 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder() .name("Attributes to Send") .description("Regular expression that defines which attributes to send as HTTP headers in the request. " - + "If not defined, no attributes are sent as headers.") + + "If not defined, no attributes are sent as headers. Also any dynamic properties set will be sent as headers. " + + "The dynamic property key will be the header key and the dynamic property value will be interpreted as expression " + + "language will be the header value.") .required(false) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") .required(false) .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder() .name("Proxy Host") .description("The fully qualified hostname or IP address of the proxy server") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder() .name("Proxy Port") .description("The port of the proxy server") .required(false) .addValidator(StandardValidators.PORT_VALIDATOR) .build(); - // Per RFC 7235, 2617, and 2616. - // basic-credentials = base64-user-pass - // base64-user-pass = userid ":" password - // userid = *<TEXT excluding ":"> - // password = *TEXT - // - // OCTET = <any 8-bit sequence of data> - // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)> - // LWS = [CRLF] 1*( SP | HT ) - // TEXT = <any OCTET except CTLs but including LWS> - // - // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs. - public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder() + // Per RFC 7235, 2617, and 2616. + // basic-credentials = base64-user-pass + // base64-user-pass = userid ":" password + // userid = *<TEXT excluding ":"> + // password = *TEXT + // + // OCTET = <any 8-bit sequence of data> + // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)> + // LWS = [CRLF] 1*( SP | HT ) + // TEXT = <any OCTET except CTLs but including LWS> + // + // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs. + public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder() .name("Basic Authentication Username") .displayName("Basic Authentication Username") .description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).") @@ -274,7 +233,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))) .build(); - public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder() .name("Basic Authentication Password") .displayName("Basic Authentication Password") .description("The password to be used by the client to authenticate against the Remote URL.") @@ -283,7 +242,64 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))) .build(); - public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + public static final PropertyDescriptor PROP_PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Put Response Body In Attribute") + .description("If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate " + + "FlowFile. The attribute key to put to is determined by evaluating value of this property. ") + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor PROP_PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder() + .name("Max Length To Put In Attribute") + .description("If routing the response body to an attribute of the original (by setting the \"Put response body in attribute\" " + + "property or by receiving an error status code), the number of characters put to the attribute value will be at " + + "most this amount. This is important because attributes are held in memory and large attributes will quickly " + + "cause out of memory issues. If the output goes longer than this value, it will be truncated to fit. " + + "Consider making this smaller if able.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("256") + .build(); + + public static final PropertyDescriptor PROP_DIGEST_AUTH = new PropertyDescriptor.Builder() + .name("Digest Authentication") + .displayName("Use Digest Authentication") + .description("Whether to communicate with the website using Digest Authentication. 'Basic Authentication Username' and 'Basic Authentication Password' are used " + + "for authentication.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder() + .name("Always Output Response") + .description("Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is " + + "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the " + + "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder() + .name("Trusted Hostname") + .description("Bypass the normal truststore hostname verifier to allow the specified remote hostname as trusted. " + + "Enabling this property has MITM security implications, use wisely. Will still accept other connections based " + + "on the normal truststore hostname verifier. Only valid with SSL (HTTPS) connections.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + public static final PropertyDescriptor PROP_ADD_HEADERS_TO_REQUEST = new PropertyDescriptor.Builder() + .name("Add Response Headers to Request") + .description("Enabling this property saves all the response headers to the original request. This may be when the response headers are needed " + + "but a response is not generated due to the status code received.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( PROP_METHOD, PROP_URL, PROP_SSL_CONTEXT_SERVICE, @@ -295,499 +311,626 @@ public final class InvokeHTTP extends AbstractProcessor { PROP_BASIC_AUTH_USERNAME, PROP_BASIC_AUTH_PASSWORD, PROP_PROXY_HOST, - PROP_PROXY_PORT)); - - // property to allow the hostname verifier to be overridden - // this is a "hidden" property - it's configured using a dynamic user property - public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder() - .name("Trusted Hostname") - .description("Bypass the normal truststore hostname verifier to allow the specified remote hostname as trusted. " - + "Enabling this property has MITM security implications, use wisely. Will still accept other connections based " - + "on the normal truststore hostname verifier. Only valid with SSL (HTTPS) connections.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .dynamic(true) - .build(); - - // relationships - public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() + PROP_PROXY_PORT, + PROP_PUT_OUTPUT_IN_ATTRIBUTE, + PROP_PUT_ATTRIBUTE_MAX_LENGTH, + PROP_DIGEST_AUTH, + PROP_OUTPUT_RESPONSE_REGARDLESS, + PROP_TRUSTED_HOSTNAME, + PROP_ADD_HEADERS_TO_REQUEST)); + + // relationships + public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() .name("Original") - .description("Original FlowFile will be routed upon success (2xx status codes).") + .description("The original FlowFile will be routed upon success (2xx status codes). It will have new attributes detailing the " + + "success of the request.") .build(); - public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder() + public static final Relationship REL_RESPONSE = new Relationship.Builder() .name("Response") - .description("Response FlowFile will be routed upon success (2xx status codes).") + .description("A Response FlowFile will be routed upon success (2xx status codes). If the 'Output Response Regardless' property " + + "is true then the response will be sent to this relationship regardless of the status code received.") .build(); - public static final Relationship REL_RETRY = new Relationship.Builder() + public static final Relationship REL_RETRY = new Relationship.Builder() .name("Retry") - .description("FlowFile will be routed on any status code that can be retried (5xx status codes).") + .description("The original FlowFile will be routed on any status code that can be retried (5xx status codes). It will have new " + + "attributes detailing the request.") .build(); - public static final Relationship REL_NO_RETRY = new Relationship.Builder() + public static final Relationship REL_NO_RETRY = new Relationship.Builder() .name("No Retry") - .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).") + .description("The original FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes). " + + "It will have new attributes detailing the request.") .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("Failure") - .description("FlowFile will be routed on any type of connection failure, timeout or general exception.") + .description("The original FlowFile will be routed on any type of connection failure, timeout or general exception. " + + "It will have new attributes detailing the request.") .build(); - public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE))); + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE))); - } + private volatile Set<String> dynamicPropertyNames = new HashSet<>(); /** - * A single invocation of an HTTP request/response from the InvokeHTTP processor. This class encapsulates the entirety of the flowfile processing. - * <p> - * This class is not thread safe and is created new for every flowfile processed. + * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date + * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string). */ - private static class Transaction implements Config { - - /** - * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date - * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string). - */ - private static final String rfc1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'"; - private static final DateTimeFormatter dateFormat = DateTimeFormat.forPattern(rfc1123).withLocale(Locale.US).withZoneUTC(); - - /** - * Every request/response cycle from this client has a unique transaction id which will be stored as a flowfile attribute. This generator is used to create the id. - */ - private static final AtomicLong txIdGenerator = new AtomicLong(); - - private static final Charset utf8 = Charset.forName("UTF-8"); - - private final ProcessorLog logger; - private final SSLContext sslContext; - private final Pattern attributesToSend; - private final ProcessContext context; - private final ProcessSession session; - - private final long txId = txIdGenerator.incrementAndGet(); - private final long startNanos = System.nanoTime(); - - private FlowFile request; - private FlowFile response; - private HttpURLConnection conn; - - private int statusCode; - private String statusMessage; - - public Transaction( - final ProcessorLog logger, - final SSLContext sslContext, - final Pattern attributesToSend, - final ProcessContext context, - final ProcessSession session, - final FlowFile request) { - - this.logger = logger; - this.sslContext = sslContext; - this.attributesToSend = attributesToSend; - this.context = context; - this.session = session; - this.request = request; - } + private static final String RFC_1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'"; + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(RFC_1123).withLocale(Locale.US).withZoneUTC(); + private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>(); - public void process() { - try { - openConnection(); - sendRequest(); - readResponse(); - transfer(); - } catch (final Exception e) { - // log exception - logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e); - - // penalize - request = session.penalize(request); + public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; - // transfer original to failure - session.transfer(request, REL_FAILURE); + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } - // cleanup response flowfile, if applicable - try { - if (response != null) { - session.remove(response); - } - } catch (final Exception e1) { - logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1); + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + private volatile Pattern regexAttributesToSend = null; + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (descriptor.isDynamic()) { + final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames); + if (newValue == null) { + newDynamicPropertyNames.remove(descriptor.getName()); + } else if (oldValue == null) { // new property + newDynamicPropertyNames.add(descriptor.getName()); + } + this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames); + } else { + // compile the attributes-to-send filter pattern + if (PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) { + if (newValue.isEmpty()) { + regexAttributesToSend = null; + } else { + final String trimmedValue = StringUtils.trimToEmpty(newValue); + regexAttributesToSend = Pattern.compile(trimmedValue); } } } + } - private void openConnection() throws IOException { - // read the url property from the context - final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue()); - final URL url = new URL(urlstr); - final String authuser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); - final String authpass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue()); - - String authstrencoded = null; - if (!authuser.isEmpty()) { - String authstr = authuser + ":" + authpass; - byte[] bytestrencoded = Base64.encodeBase64(authstr.getBytes(StandardCharsets.UTF_8)); - authstrencoded = new String(bytestrencoded, StandardCharsets.UTF_8); - } + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(1); + final boolean proxyHostSet = validationContext.getProperty(PROP_PROXY_HOST).isSet(); + final boolean proxyPortSet = validationContext.getProperty(PROP_PROXY_PORT).isSet(); - // create the connection - final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue(); - final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger(); - if (proxyHost == null || proxyPort == null) { - conn = (HttpURLConnection) url.openConnection(); - } else { - final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); - conn = (HttpURLConnection) url.openConnection(proxy); - } + if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) { + results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build()); + } - if (authstrencoded != null) { - conn.setRequestProperty("Authorization", "Basic " + authstrencoded); - } + return results; + } - // set the request method - String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase(); - conn.setRequestMethod(method); + @OnScheduled + public void setUpClient(final ProcessContext context) throws IOException { + okHttpClientAtomicReference.set(null); - // set timeouts - conn.setConnectTimeout(context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - conn.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + OkHttpClient okHttpClient = new OkHttpClient(); - // set whether to follow redirects - conn.setInstanceFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean()); + // Add a proxy if set + final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue(); + final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger(); + if (proxyHost != null && proxyPort != null) { + final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); + okHttpClient.setProxy(proxy); + } - // special handling for https - if (conn instanceof HttpsURLConnection) { - HttpsURLConnection sconn = (HttpsURLConnection) conn; + // Set timeouts + okHttpClient.setConnectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); + okHttpClient.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); - // check if the ssl context is set - if (sslContext != null) { - sconn.setSSLSocketFactory(sslContext.getSocketFactory()); - } + // Set whether to follow redirects + okHttpClient.setFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean()); - // check the trusted hostname property and override the HostnameVerifier - String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue()); - if (!trustedHostname.isEmpty()) { - sconn.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, sconn.getHostnameVerifier())); - } - } + final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE); + // check if the ssl context is set and add the factory if so + if (sslContext != null) { + okHttpClient.setSslSocketFactory(sslContext.getSocketFactory()); } - private void sendRequest() throws IOException { - // set the http request properties using flowfile attribute values - setRequestProperties(); + // check the trusted hostname property and override the HostnameVerifier + String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue()); + if (!trustedHostname.isEmpty()) { + okHttpClient.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, okHttpClient.getHostnameVerifier())); + } - // log request - logRequest(); + final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); - // we only stream data for POST and PUT requests - String method = conn.getRequestMethod().toUpperCase(); - if ("POST".equals(method) || "PUT".equals(method)) { - conn.setDoOutput(true); - conn.setFixedLengthStreamingMode(request.getSize()); + // If the username/password properties are set then check if digest auth is being used + if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) { + final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue()); - // write the flowfile contents to the output stream - try (OutputStream os = new BufferedOutputStream(conn.getOutputStream())) { - session.exportTo(request, os); - } + /* + * Currently OkHttp doesn't have built-in Digest Auth Support. The ticket for adding it is here: + * https://github.com/square/okhttp/issues/205#issuecomment-154047052 + * Once added this should be refactored to use the built in support. For now, a third party lib is needed. + */ + final Map<String, CachingAuthenticator> authCache = new ConcurrentHashMap<>(); + + com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass); + + final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials); + + DispatchingAuthenticator authenticator = new DispatchingAuthenticator.Builder() + .with("Digest", digestAuthenticator) + .build(); + + okHttpClient.interceptors().add(new AuthenticationCacheInterceptor(authCache)); + okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache)); + } - // emit provenance event - session.getProvenanceReporter().send(request, conn.getURL().toExternalForm()); + okHttpClientAtomicReference.set(okHttpClient); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + OkHttpClient okHttpClient = okHttpClientAtomicReference.get(); + + FlowFile requestFlowFile = session.get(); + + // Checking to see if the property to put the body of the response in an attribute was set + boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet(); + if (requestFlowFile == null) { + if(context.hasNonLoopConnection()){ + return; } + String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase(); + if ("POST".equals(request) || "PUT".equals(request)) { + return; + } else if (putToAttribute) { + requestFlowFile = session.create(); + } } - private void readResponse() throws IOException { + // Setting some initial variables + final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger(); + final ProcessorLog logger = getLogger(); + + // Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute. + final UUID txId = UUID.randomUUID(); + + FlowFile responseFlowFile = null; + try { + // read the url property from the context + final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue()); + final URL url = new URL(urlstr); + + Request httpRequest = configureRequest(context, session, requestFlowFile, url); + + // log request + logRequest(logger, httpRequest); + + // emit send provenance event if successfully sent to the server + if (httpRequest.body() != null) { + session.getProvenanceReporter().send(requestFlowFile, url.toExternalForm(), true); + } + + final long startNanos = System.nanoTime(); + Response responseHttp = okHttpClient.newCall(httpRequest).execute(); // output the raw response headers (DEBUG level only) - logResponse(); + logResponse(logger, url, responseHttp); // store the status code and message - statusCode = conn.getResponseCode(); - statusMessage = conn.getResponseMessage(); + int statusCode = responseHttp.code(); + String statusMessage = responseHttp.message(); + + if (statusCode == 0) { + throw new IllegalStateException("Status code unknown, connection hasn't been attempted."); + } + + // Create a map of the status attributes that are always written to the request and reponse FlowFiles + Map<String, String> statusAttributes = new HashMap<>(); + statusAttributes.put(STATUS_CODE, String.valueOf(statusCode)); + statusAttributes.put(STATUS_MESSAGE, statusMessage); + statusAttributes.put(REQUEST_URL, url.toExternalForm()); + statusAttributes.put(TRANSACTION_ID, txId.toString()); - // always write the status attributes to the request flowfile - request = writeStatusAttributes(request); + if (requestFlowFile != null) { + requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes); + } - // read from the appropriate input stream - try (InputStream is = getResponseStream()) { + // If the property to add the response headers to the request flowfile is true then add them + if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) { + // write the response headers as attributes + // this will overwrite any existing flowfile attributes + requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp)); + } - // if not successful, store the response body into a flowfile attribute - if (!isSuccess()) { - String body = trimToEmpty(toString(is, utf8)); - request = session.putAttribute(request, RESPONSE_BODY, body); + boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null; + boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean(); + ResponseBody responseBody = responseHttp.body(); + boolean bodyExists = responseBody != null; + + InputStream responseBodyStream = null; + SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null; + TeeInputStream teeInputStream = null; + try { + responseBodyStream = bodyExists ? responseBody.byteStream() : null; + if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) { + outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize); + teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute); } - // if successful, store the response body as the flowfile payload - // we include additional flowfile attributes including the reponse headers - // and the status codes. - if (isSuccess()) { + if (outputBodyToResponseContent) { + /* + * If successful and putting to response flowfile, store the response body as the flowfile payload + * we include additional flowfile attributes including the response headers and the status codes. + */ + // clone the flowfile to capture the response - response = session.create(request); + if (requestFlowFile != null) { + responseFlowFile = session.create(requestFlowFile); + } else { + responseFlowFile = session.create(); + } // write the status attributes - response = writeStatusAttributes(response); + responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes); // write the response headers as attributes // this will overwrite any existing flowfile attributes - response = session.putAllAttributes(response, convertAttributesFromHeaders()); + responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp)); // transfer the message body to the payload // can potentially be null in edge cases - if (is != null) { - response = session.importFrom(is, response); + if (bodyExists) { + if (teeInputStream != null) { + responseFlowFile = session.importFrom(teeInputStream, responseFlowFile); + } else { + responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile); + } // emit provenance event final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis); + session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis); + } + } + + // if not successful and request flowfile is not null, store the response body into a flowfile attribute + if (outputBodyToRequestAttribute && bodyExists) { + String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue(); + if (attributeKey == null) { + attributeKey = RESPONSE_BODY; } + byte[] outputBuffer; + int size; + + if (outputStreamToRequestAttribute != null) { + outputBuffer = outputStreamToRequestAttribute.getBuffer(); + size = outputStreamToRequestAttribute.size(); + } else { + outputBuffer = new byte[maxAttributeSize]; + size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false); + } + String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType())); + requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to " + + url.toExternalForm() + ". It took " + millis + "millis,"); + } + } finally { + if(outputStreamToRequestAttribute != null){ + outputStreamToRequestAttribute.close(); + outputStreamToRequestAttribute = null; } + if(teeInputStream != null){ + teeInputStream.close(); + teeInputStream = null; + } else if(responseBodyStream != null){ + responseBodyStream.close(); + responseBodyStream = null; + } + } + route(requestFlowFile, responseFlowFile, session, context, statusCode); + } catch (final Exception e) { + // penalize or yield + if (requestFlowFile != null) { + logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e); + requestFlowFile = session.penalize(requestFlowFile); + // transfer original to failure + session.transfer(requestFlowFile, REL_FAILURE); + } else { + logger.error("Yielding processor due to exception encountered as a source processor: {}", e); + context.yield(); } - } - private void transfer() throws IOException { - // check if we should penalize the request - if (!isSuccess()) { - request = session.penalize(request); + // cleanup response flowfile, if applicable + try { + if (responseFlowFile != null) { + session.remove(responseFlowFile); + } + } catch (final Exception e1) { + logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1); } + } + } - // log the status codes from the response - logger.info("Request to {} returned status code {} for {}", - new Object[] {conn.getURL().toExternalForm(), statusCode, request}); - // transfer to the correct relationship - // 2xx -> SUCCESS - if (isSuccess()) { - // we have two flowfiles to transfer - session.transfer(request, REL_SUCCESS_REQ); - session.transfer(response, REL_SUCCESS_RESP); + private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) { + Request.Builder requestBuilder = new Request.Builder(); - // 5xx -> RETRY - } else if (statusCode / 100 == 5) { - session.transfer(request, REL_RETRY); + requestBuilder = requestBuilder.url(url); + final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); - // 1xx, 3xx, 4xx -> NO RETRY - } else { - session.transfer(request, REL_NO_RETRY); - } + // If the username/password properties are set then check if digest auth is being used + if (!authUser.isEmpty() && "false".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) { + final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue()); + String credential = com.squareup.okhttp.Credentials.basic(authUser, authPass); + requestBuilder = requestBuilder.header("Authorization", credential); } - private void setRequestProperties() { + // set the request method + String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase(); + switch (method) { + case "GET": + requestBuilder = requestBuilder.get(); + break; + case "POST": + RequestBody requestBody = getRequestBodyToSend(session, requestFlowFile); + requestBuilder = requestBuilder.post(requestBody); + break; + case "PUT": + requestBody = getRequestBodyToSend(session, requestFlowFile); + requestBuilder = requestBuilder.put(requestBody); + break; + case "HEAD": + requestBuilder = requestBuilder.head(); + break; + case "DELETE": + requestBuilder = requestBuilder.delete(); + break; + default: + requestBuilder = requestBuilder.method(method, null); + } - // check if we should send the a Date header with the request - if (context.getProperty(PROP_DATE_HEADER).asBoolean()) { - conn.setRequestProperty("Date", getDateValue()); - } + requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile); - // iterate through the flowfile attributes, adding any attribute that - // matches the attributes-to-send pattern. if the pattern is not set - // (it's an optional property), ignore that attribute entirely - if (attributesToSend != null) { - Map<String, String> attributes = request.getAttributes(); - Matcher m = attributesToSend.matcher(""); - for (Map.Entry<String, String> entry : attributes.entrySet()) { - String key = trimToEmpty(entry.getKey()); - String val = trimToEmpty(entry.getValue()); - - // don't include any of the ignored attributes - if (IGNORED_ATTRIBUTES.contains(key)) { - continue; - } + return requestBuilder.build(); + } - // check if our attribute key matches the pattern - // if so, include in the request as a header - m.reset(key); - if (m.matches()) { - conn.setRequestProperty(key, val); - } - } + private RequestBody getRequestBodyToSend(final ProcessSession session, final FlowFile requestFlowFile) { + return new RequestBody() { + @Override + public MediaType contentType() { + final String attributeValue = requestFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + String contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue; + return MediaType.parse(contentType); } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + session.exportTo(requestFlowFile, sink.outputStream()); + } + }; + } + + private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) { + // check if we should send the a Date header with the request + if (context.getProperty(PROP_DATE_HEADER).asBoolean()) { + requestBuilder = requestBuilder.addHeader("Date", DATE_FORMAT.print(System.currentTimeMillis())); + } + + for (String headerKey : dynamicPropertyNames) { + String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestFlowFile).getValue(); + requestBuilder = requestBuilder.addHeader(headerKey, headerValue); } - /** - * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings. - */ - private Map<String, String> convertAttributesFromHeaders() throws IOException { - // create a new hashmap to store the values from the connection - Map<String, String> map = new HashMap<>(); - for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().entrySet()) { - String key = entry.getKey(); - if (key == null) { + // iterate through the flowfile attributes, adding any attribute that + // matches the attributes-to-send pattern. if the pattern is not set + // (it's an optional property), ignore that attribute entirely + if (regexAttributesToSend != null) { + Map<String, String> attributes = requestFlowFile.getAttributes(); + Matcher m = regexAttributesToSend.matcher(""); + for (Map.Entry<String, String> entry : attributes.entrySet()) { + String headerKey = trimToEmpty(entry.getKey()); + + // don't include any of the ignored attributes + if (IGNORED_ATTRIBUTES.contains(headerKey)) { continue; } - List<String> values = entry.getValue(); - - // we ignore any headers with no actual values (rare) - if (values == null || values.isEmpty()) { - continue; + // check if our attribute key matches the pattern + // if so, include in the request as a header + m.reset(headerKey); + if (m.matches()) { + String headerVal = trimToEmpty(entry.getValue()); + requestBuilder = requestBuilder.addHeader(headerKey, headerVal); } + } + } + return requestBuilder; + } - // create a comma separated string from the values, this is stored in the map - String value = csv(values); - // put the csv into the map - map.put(key, value); + private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode){ + // check if we should penalize the request + if (!isSuccess(statusCode)) { + if (request == null) { + context.yield(); + } else { + request = session.penalize(request); } + } - if (conn instanceof HttpsURLConnection) { - HttpsURLConnection sconn = (HttpsURLConnection) conn; - // this should seemingly not be required, but somehow the state of the jdk client is messed up - // when retrieving SSL certificate related information if connect() has not been called previously. - sconn.connect(); - map.put(REMOTE_DN, sconn.getPeerPrincipal().getName()); + // If the property to output the response flowfile regardless of status code is set then transfer it + boolean responseSent = false; + if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) { + session.transfer(response, REL_RESPONSE); + responseSent = true; + } + + // transfer to the correct relationship + // 2xx -> SUCCESS + if (isSuccess(statusCode)) { + // we have two flowfiles to transfer + if (request != null) { + session.transfer(request, REL_SUCCESS_REQ); + } + if (response != null && !responseSent) { + session.transfer(response, REL_RESPONSE); } - return map; - } + // 5xx -> RETRY + } else if (statusCode / 100 == 5) { + if (request != null) { + session.transfer(request, REL_RETRY); + } - private boolean isSuccess() throws IOException { - if (statusCode == 0) { - throw new IllegalStateException("Status code unknown, connection hasn't been attempted."); + // 1xx, 3xx, 4xx -> NO RETRY + } else { + if (request != null) { + session.transfer(request, REL_NO_RETRY); } - return statusCode / 100 == 2; } - private void logRequest() { - logger.debug("\nRequest to remote service:\n\t{}\n{}", - new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())}); - } + } - private void logResponse() { - logger.debug("\nResponse from remote service:\n\t{}\n{}", - new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())}); - } + private boolean isSuccess(int statusCode) { + return statusCode / 100 == 2; + } - private String getLogString(Map<String, List<String>> map) { - StringBuilder sb = new StringBuilder(); - for (Map.Entry<String, List<String>> entry : map.entrySet()) { - List<String> list = entry.getValue(); - if (list.isEmpty()) { - continue; - } - sb.append("\t"); - sb.append(entry.getKey()); - sb.append(": "); - if (list.size() == 1) { - sb.append(list.get(0)); - } else { - sb.append(list.toString()); - } - sb.append("\n"); - } - return sb.toString(); - } + private void logRequest(ProcessorLog logger, Request request) { + logger.debug("\nRequest to remote service:\n\t{}\n{}", + new Object[]{request.url().toExternalForm(), getLogString(request.headers().toMultimap())}); + } - /** - * Convert a collection of string values into a overly simple comma separated string. - * - * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style. - */ - private String csv(Collection<String> values) { - if (values == null || values.isEmpty()) { - return ""; - } - if (values.size() == 1) { - return values.iterator().next(); - } + private void logResponse(ProcessorLog logger, URL url, Response response) { + logger.debug("\nResponse from remote service:\n\t{}\n{}", + new Object[]{url.toExternalForm(), getLogString(response.headers().toMultimap())}); + } - StringBuilder sb = new StringBuilder(); - for (String value : values) { - value = value.trim(); - if (value.isEmpty()) { - continue; - } - if (sb.length() > 0) { - sb.append(", "); - } - sb.append(value); + private String getLogString(Map<String, List<String>> map) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, List<String>> entry : map.entrySet()) { + List<String> list = entry.getValue(); + if (list.isEmpty()) { + continue; + } + sb.append("\t"); + sb.append(entry.getKey()); + sb.append(": "); + if (list.size() == 1) { + sb.append(list.get(0)); + } else { + sb.append(list.toString()); } - return sb.toString().trim(); + sb.append("\n"); } + return sb.toString(); + } - /** - * Return the current datetime as an RFC 1123 formatted string in the GMT tz. - */ - private String getDateValue() { - return dateFormat.print(System.currentTimeMillis()); + /** + * Convert a collection of string values into a overly simple comma separated string. + * <p/> + * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style. + */ + private String csv(Collection<String> values) { + if (values == null || values.isEmpty()) { + return ""; + } + if (values.size() == 1) { + return values.iterator().next(); } - /** - * Returns a string from the input stream using the specified character encoding. - */ - private String toString(InputStream is, Charset charset) throws IOException { - if (is == null) { - return ""; + StringBuilder sb = new StringBuilder(); + for (String value : values) { + value = value.trim(); + if (value.isEmpty()) { + continue; } - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] buf = new byte[4096]; - int len; - while ((len = is.read(buf)) != -1) { - out.write(buf, 0, len); + if (sb.length() > 0) { + sb.append(", "); } - return new String(out.toByteArray(), charset); + sb.append(value); } + return sb.toString().trim(); + } - /** - * Returns the input stream to use for reading from the remote server. We're either going to want the inputstream or errorstream, effectively depending on the status code. - * <p> - * This method can return null if there is no inputstream to read from. For example, if the remote server did not send a message body. eg. 204 No Content or 304 Not Modified - */ - private InputStream getResponseStream() { - try { - InputStream is = conn.getErrorStream(); - if (is == null) { - is = conn.getInputStream(); - } - return new BufferedInputStream(is); + /** + * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings. + */ + private Map<String, String> convertAttributesFromHeaders(URL url, Response responseHttp){ + // create a new hashmap to store the values from the connection + Map<String, String> map = new HashMap<>(); + for (Map.Entry<String, List<String>> entry : responseHttp.headers().toMultimap().entrySet()) { + String key = entry.getKey(); + if (key == null) { + continue; + } - } catch (IOException e) { - logger.warn("Response stream threw an exception: {}", new Object[] {e}, e); - return null; + List<String> values = entry.getValue(); + + // we ignore any headers with no actual values (rare) + if (values == null || values.isEmpty()) { + continue; } + + // create a comma separated string from the values, this is stored in the map + String value = csv(values); + + // put the csv into the map + map.put(key, value); } - /** - * Writes the status attributes onto the flowfile, returning the flowfile that was updated. - */ - private FlowFile writeStatusAttributes(FlowFile flowfile) { - flowfile = session.putAttribute(flowfile, STATUS_CODE, String.valueOf(statusCode)); - flowfile = session.putAttribute(flowfile, STATUS_MESSAGE, statusMessage); - flowfile = session.putAttribute(flowfile, REQUEST_URL, conn.getURL().toExternalForm()); - flowfile = session.putAttribute(flowfile, TRANSACTION_ID, Long.toString(txId)); - return flowfile; + if ("HTTPS".equals(url.getProtocol().toUpperCase())) { + map.put(REMOTE_DN, responseHttp.handshake().peerPrincipal().getName()); } - /** - * - */ - private static class OverrideHostnameVerifier implements HostnameVerifier { + return map; + } - private final String trustedHostname; - private final HostnameVerifier delegate; + private Charset getCharsetFromMediaType(MediaType contentType) { + return contentType != null ? contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8; + } - private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) { - this.trustedHostname = trustedHostname; - this.delegate = delegate; - } + private static class OverrideHostnameVerifier implements HostnameVerifier { - @Override - public boolean verify(String hostname, SSLSession session) { - if (trustedHostname.equalsIgnoreCase(hostname)) { - return true; - } - return delegate.verify(hostname, session); + private final String trustedHostname; + private final HostnameVerifier delegate; + + private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) { + this.trustedHostname = trustedHostname; + this.delegate = delegate; + } + + @Override + public boolean verify(String hostname, SSLSession session) { + if (trustedHostname.equalsIgnoreCase(hostname)) { + return true; } + return delegate.verify(hostname, session); } } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java index a26b2ed..a82bc5a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java @@ -19,22 +19,27 @@ package org.apache.nifi.processors.standard; import java.io.IOException; import java.io.PrintWriter; import java.net.URL; +import java.util.HashMap; +import java.util.Map; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon; +import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunners; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; + public class TestInvokeHTTP extends TestInvokeHttpCommon { @BeforeClass @@ -72,41 +77,99 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon { return new TestServer(); } + @Test + public void testSslSetHttpRequest() throws Exception { + + final Map<String, String> sslProperties = new HashMap<>(); + sslProperties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProperties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProperties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + sslProperties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + sslProperties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + sslProperties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + + runner = TestRunners.newTestRunner(InvokeHTTP.class); + final StandardSSLContextService sslService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslService, sslProperties); + runner.enableControllerService(sslService); + runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code and status.message + // original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals("Foo", "Bar"); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + bundle1.assertAttributeEquals("OkHttp-Selected-Protocol", "http/1.1"); + } + // Currently InvokeHttp does not support Proxy via Https @Test public void testProxy() throws Exception { addHandler(new MyProxyHandler()); URL proxyURL = new URL(url); - runner.setProperty(InvokeHTTP.Config.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out - runner.setProperty(InvokeHTTP.Config.PROP_PROXY_HOST, proxyURL.getHost()); - runner.setProperty(InvokeHTTP.Config.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort())); + runner.setProperty(InvokeHTTP.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out + runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, proxyURL.getHost()); + + try{ + runner.run(); + Assert.fail(); + } catch (AssertionError e){ + // Expect assetion error when proxy port isn't set but host is. + } + runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort())); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); //expected in request status.code and status.message //original flow file (+attributes) - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); //expected in response //status code, status message, all headers from server response --> ff attributes //server response message body into payload of ff - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("http://nifi.apache.org/".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java index d155b74..b3cd9dc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.standard; -import org.apache.nifi.processors.standard.InvokeHTTP.Config; import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon; import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.util.TestRunners; @@ -63,7 +62,7 @@ public class TestInvokeHttpSSL extends TestInvokeHttpCommon { final StandardSSLContextService sslService = new StandardSSLContextService(); runner.addControllerService("ssl-context", sslService, sslProperties); runner.enableControllerService(sslService); - runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); server.clearHandlers(); }