Repository: nifi
Updated Branches:
  refs/heads/develop e59ee5dda -> 4b9ee460a


NIFI-825: Use new method of accessing controller services and rather than 
caching an SSLContext, obtain one from the service each time


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4b9ee460
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4b9ee460
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4b9ee460

Branch: refs/heads/develop
Commit: 4b9ee460a8ea67ceff89b6dc9ecbb7d06f596c21
Parents: e59ee5d
Author: Mark Payne <[email protected]>
Authored: Thu Aug 6 11:55:08 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Aug 6 11:55:08 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/InvokeHTTP.java    | 98 +++++++-------------
 1 file changed, 34 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4b9ee460/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index c8a354b..82e0573 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -39,7 +39,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -52,10 +51,10 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
@@ -84,10 +83,8 @@ import org.joda.time.format.DateTimeFormatter;
         + "Enabling this property has MITM security implications, use wisely. 
Only valid with SSL (HTTPS) connections.")
 public final class InvokeHTTP extends AbstractProcessor {
 
-    //-- properties --//
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-
         return Config.PROPERTIES;
     }
 
@@ -99,70 +96,47 @@ public final class InvokeHTTP extends AbstractProcessor {
         return 
super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
     }
 
-    //-- relationships --//
     @Override
     public Set<Relationship> getRelationships() {
         return Config.RELATIONSHIPS;
     }
 
-    //-- class properties --//
-    final AtomicReference<SSLContext> sslContextRef = new AtomicReference<>();
-    final AtomicReference<Pattern> attributesToSendRef = new 
AtomicReference<>();
+    private volatile Pattern attributesToSend = null;
 
     @Override
-    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-        newValue = StringUtils.trimToEmpty(newValue);
-
-        // listen for the SSL Context Service property and retrieve the 
SSLContext from the controller service.
-        if 
(Config.PROP_SSL_CONTEXT_SERVICE.getName().equalsIgnoreCase(descriptor.getName()))
 {
-            if (newValue.isEmpty()) {
-                sslContextRef.set(null);
-            } else {
-                SSLContextService svc = (SSLContextService) 
getControllerServiceLookup().getControllerService(newValue);
-                sslContextRef.set(svc.createSSLContext(ClientAuth.NONE));  // 
ClientAuth is only useful for servers, not clients.
-                getLogger().info("Loading SSL configuration from keystore={} 
and truststore={}",
-                        new Object[]{svc.getKeyStoreFile(), 
svc.getTrustStoreFile()});
-            }
-        }
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        final String trimmedValue = StringUtils.trimToEmpty(newValue);
 
         // compile the attributes-to-send filter pattern
         if 
(Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName()))
 {
             if (newValue.isEmpty()) {
-                attributesToSendRef.set(null);
+                attributesToSend = null;
             } else {
-                attributesToSendRef.set(Pattern.compile(newValue));
+                attributesToSend = Pattern.compile(trimmedValue);
             }
         }
 
     }
 
-    //-- processing --//
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        List<FlowFile> flowfiles = session.get(Config.MAX_RESULTS_PER_THREAD);
-        if (flowfiles.isEmpty()) {
-            context.yield();
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
 
-        for (FlowFile flowfile : flowfiles) {
-            Transaction transaction = new Transaction(getLogger(), 
sslContextRef, attributesToSendRef, context, session, flowfile);
-            transaction.process();
-        }
+        final SSLContextService sslService = 
context.getProperty(Config.PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final SSLContext sslContext = sslService == null ? null : 
sslService.createSSLContext(ClientAuth.NONE);
+
+        Transaction transaction = new Transaction(getLogger(), sslContext, 
attributesToSend, context, session, flowFile);
+        transaction.process();
     }
 
     /**
-     *
      * Stores properties, relationships, configuration values, hard coded 
strings, magic numbers, etc.
-     *
-     *
      */
     public interface Config {
-
-        //-- magic numbers --//
-        int MAX_RESULTS_PER_THREAD = 50;
-
-        //-- flowfile attribute keys returned after reading the response --//
+        // flowfile attribute keys returned after reading the response
         String STATUS_CODE = "invokehttp.status.code";
         String STATUS_MESSAGE = "invokehttp.status.message";
         String RESPONSE_BODY = "invokehttp.response.body";
@@ -179,7 +153,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 "uuid", "filename", "path"
         )));
 
-        //-- properties --//
+        // properties
         public static final PropertyDescriptor PROP_METHOD = new 
PropertyDescriptor.Builder()
                 .name("HTTP Method")
                 .description("HTTP request method (GET, POST, PUT, DELETE, 
HEAD, OPTIONS).")
@@ -297,7 +271,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .dynamic(true)
                 .build();
 
-        //-- relationships --//
+        // relationships
         public static final Relationship REL_SUCCESS_REQ = new 
Relationship.Builder()
                 .name("Original")
                 .description("Original FlowFile will be routed upon success 
(2xx status codes).")
@@ -351,8 +325,8 @@ public final class InvokeHTTP extends AbstractProcessor {
         private static final Charset utf8 = Charset.forName("UTF-8");
 
         private final ProcessorLog logger;
-        private final AtomicReference<SSLContext> sslContextRef;
-        private final AtomicReference<Pattern> attributesToSendRef;
+        private final SSLContext sslContext;
+        private final Pattern attributesToSend;
         private final ProcessContext context;
         private final ProcessSession session;
 
@@ -366,32 +340,31 @@ public final class InvokeHTTP extends AbstractProcessor {
         private String statusMessage;
 
         public Transaction(
-                ProcessorLog logger,
-                AtomicReference<SSLContext> sslContextRef,
-                AtomicReference<Pattern> attributesToSendRef,
-                ProcessContext context,
-                ProcessSession session,
-                FlowFile request) {
+            final ProcessorLog logger,
+            final SSLContext sslContext,
+            final Pattern attributesToSend,
+            final ProcessContext context,
+            final ProcessSession session,
+            final FlowFile request) {
 
             this.logger = logger;
-            this.sslContextRef = sslContextRef;
-            this.attributesToSendRef = attributesToSendRef;
+            this.sslContext = sslContext;
+            this.attributesToSend = attributesToSend;
             this.context = context;
             this.session = session;
             this.request = request;
         }
 
-        public void process() {
 
+        public void process() {
             try {
                 openConnection();
                 sendRequest();
                 readResponse();
                 transfer();
-
-            } catch (Throwable t) {
+            } catch (final Exception e) {
                 // log exception
-                logger.error("Routing to {} due to exception: {}", new 
Object[]{REL_FAILURE.getName(), t}, t);
+                logger.error("Routing to {} due to exception: {}", new 
Object[] { REL_FAILURE.getName(), e }, e);
 
                 // penalize
                 request = session.penalize(request);
@@ -404,10 +377,9 @@ public final class InvokeHTTP extends AbstractProcessor {
                     if (response != null) {
                         session.remove(response);
                     }
-                } catch (Throwable t1) {
-                    logger.error("Could not cleanup response flowfile due to 
exception: {}", new Object[]{t1}, t1);
+                } catch (final Exception e1) {
+                    logger.error("Could not cleanup response flowfile due to 
exception: {}", new Object[] { e1 }, e1);
                 }
-
             }
         }
 
@@ -447,7 +419,6 @@ public final class InvokeHTTP extends AbstractProcessor {
                 HttpsURLConnection sconn = (HttpsURLConnection) conn;
 
                 // check if the ssl context is set
-                SSLContext sslContext = sslContextRef.get();
                 if (sslContext != null) {
                     sconn.setSSLSocketFactory(sslContext.getSocketFactory());
                 }
@@ -573,10 +544,9 @@ public final class InvokeHTTP extends AbstractProcessor {
             // iterate through the flowfile attributes, adding any attribute 
that
             // matches the attributes-to-send pattern. if the pattern is not 
set
             // (it's an optional property), ignore that attribute entirely
-            Pattern p = attributesToSendRef.get();
-            if (p != null) {
+            if (attributesToSend != null) {
                 Map<String, String> attributes = request.getAttributes();
-                Matcher m = p.matcher("");
+                Matcher m = attributesToSend.matcher("");
                 for (Map.Entry<String, String> entry : attributes.entrySet()) {
                     String key = trimToEmpty(entry.getKey());
                     String val = trimToEmpty(entry.getValue());

Reply via email to