Repository: nifi Updated Branches: refs/heads/develop e59ee5dda -> 4b9ee460a
NIFI-825: Use new method of accessing controller services and rather than caching an SSLContext, obtain one from the service each time Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4b9ee460 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4b9ee460 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4b9ee460 Branch: refs/heads/develop Commit: 4b9ee460a8ea67ceff89b6dc9ecbb7d06f596c21 Parents: e59ee5d Author: Mark Payne <[email protected]> Authored: Thu Aug 6 11:55:08 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Aug 6 11:55:08 2015 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/InvokeHTTP.java | 98 +++++++------------- 1 file changed, 34 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4b9ee460/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index c8a354b..82e0573 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -39,7 +39,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -52,10 +51,10 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; @@ -84,10 +83,8 @@ import org.joda.time.format.DateTimeFormatter; + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.") public final class InvokeHTTP extends AbstractProcessor { - //-- properties --// @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return Config.PROPERTIES; } @@ -99,70 +96,47 @@ public final class InvokeHTTP extends AbstractProcessor { return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName); } - //-- relationships --// @Override public Set<Relationship> getRelationships() { return Config.RELATIONSHIPS; } - //-- class properties --// - final AtomicReference<SSLContext> sslContextRef = new AtomicReference<>(); - final AtomicReference<Pattern> attributesToSendRef = new AtomicReference<>(); + private volatile Pattern attributesToSend = null; @Override - public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - newValue = StringUtils.trimToEmpty(newValue); - - // listen for the SSL Context Service property and retrieve the SSLContext from the controller service. - if (Config.PROP_SSL_CONTEXT_SERVICE.getName().equalsIgnoreCase(descriptor.getName())) { - if (newValue.isEmpty()) { - sslContextRef.set(null); - } else { - SSLContextService svc = (SSLContextService) getControllerServiceLookup().getControllerService(newValue); - sslContextRef.set(svc.createSSLContext(ClientAuth.NONE)); // ClientAuth is only useful for servers, not clients. - getLogger().info("Loading SSL configuration from keystore={} and truststore={}", - new Object[]{svc.getKeyStoreFile(), svc.getTrustStoreFile()}); - } - } + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + final String trimmedValue = StringUtils.trimToEmpty(newValue); // compile the attributes-to-send filter pattern if (Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) { if (newValue.isEmpty()) { - attributesToSendRef.set(null); + attributesToSend = null; } else { - attributesToSendRef.set(Pattern.compile(newValue)); + attributesToSend = Pattern.compile(trimmedValue); } } } - //-- processing --// @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - List<FlowFile> flowfiles = session.get(Config.MAX_RESULTS_PER_THREAD); - if (flowfiles.isEmpty()) { - context.yield(); + final FlowFile flowFile = session.get(); + if (flowFile == null) { return; } - for (FlowFile flowfile : flowfiles) { - Transaction transaction = new Transaction(getLogger(), sslContextRef, attributesToSendRef, context, session, flowfile); - transaction.process(); - } + final SSLContextService sslService = context.getProperty(Config.PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE); + + Transaction transaction = new Transaction(getLogger(), sslContext, attributesToSend, context, session, flowFile); + transaction.process(); } /** - * * Stores properties, relationships, configuration values, hard coded strings, magic numbers, etc. - * - * */ public interface Config { - - //-- magic numbers --// - int MAX_RESULTS_PER_THREAD = 50; - - //-- flowfile attribute keys returned after reading the response --// + // flowfile attribute keys returned after reading the response String STATUS_CODE = "invokehttp.status.code"; String STATUS_MESSAGE = "invokehttp.status.message"; String RESPONSE_BODY = "invokehttp.response.body"; @@ -179,7 +153,7 @@ public final class InvokeHTTP extends AbstractProcessor { "uuid", "filename", "path" ))); - //-- properties --// + // properties public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() .name("HTTP Method") .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).") @@ -297,7 +271,7 @@ public final class InvokeHTTP extends AbstractProcessor { .dynamic(true) .build(); - //-- relationships --// + // relationships public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() .name("Original") .description("Original FlowFile will be routed upon success (2xx status codes).") @@ -351,8 +325,8 @@ public final class InvokeHTTP extends AbstractProcessor { private static final Charset utf8 = Charset.forName("UTF-8"); private final ProcessorLog logger; - private final AtomicReference<SSLContext> sslContextRef; - private final AtomicReference<Pattern> attributesToSendRef; + private final SSLContext sslContext; + private final Pattern attributesToSend; private final ProcessContext context; private final ProcessSession session; @@ -366,32 +340,31 @@ public final class InvokeHTTP extends AbstractProcessor { private String statusMessage; public Transaction( - ProcessorLog logger, - AtomicReference<SSLContext> sslContextRef, - AtomicReference<Pattern> attributesToSendRef, - ProcessContext context, - ProcessSession session, - FlowFile request) { + final ProcessorLog logger, + final SSLContext sslContext, + final Pattern attributesToSend, + final ProcessContext context, + final ProcessSession session, + final FlowFile request) { this.logger = logger; - this.sslContextRef = sslContextRef; - this.attributesToSendRef = attributesToSendRef; + this.sslContext = sslContext; + this.attributesToSend = attributesToSend; this.context = context; this.session = session; this.request = request; } - public void process() { + public void process() { try { openConnection(); sendRequest(); readResponse(); transfer(); - - } catch (Throwable t) { + } catch (final Exception e) { // log exception - logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), t}, t); + logger.error("Routing to {} due to exception: {}", new Object[] { REL_FAILURE.getName(), e }, e); // penalize request = session.penalize(request); @@ -404,10 +377,9 @@ public final class InvokeHTTP extends AbstractProcessor { if (response != null) { session.remove(response); } - } catch (Throwable t1) { - logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{t1}, t1); + } catch (final Exception e1) { + logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] { e1 }, e1); } - } } @@ -447,7 +419,6 @@ public final class InvokeHTTP extends AbstractProcessor { HttpsURLConnection sconn = (HttpsURLConnection) conn; // check if the ssl context is set - SSLContext sslContext = sslContextRef.get(); if (sslContext != null) { sconn.setSSLSocketFactory(sslContext.getSocketFactory()); } @@ -573,10 +544,9 @@ public final class InvokeHTTP extends AbstractProcessor { // iterate through the flowfile attributes, adding any attribute that // matches the attributes-to-send pattern. if the pattern is not set // (it's an optional property), ignore that attribute entirely - Pattern p = attributesToSendRef.get(); - if (p != null) { + if (attributesToSend != null) { Map<String, String> attributes = request.getAttributes(); - Matcher m = p.matcher(""); + Matcher m = attributesToSend.matcher(""); for (Map.Entry<String, String> entry : attributes.entrySet()) { String key = trimToEmpty(entry.getKey()); String val = trimToEmpty(entry.getValue());
