Adding TLS to PutEmail and several other PropertyDescriptors that can be driven 
by EL for full control of outgoing emails. Also switched from Sun SMTPTransport 
to Java Transport for sending the email message

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b97396c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b97396c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b97396c0

Branch: refs/heads/develop
Commit: b97396c0fabe5a13043e6243c2e578e527b4f0c5
Parents: e48ea72
Author: Brian Ghigiarelli <[email protected]>
Authored: Fri Mar 20 16:37:23 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Mar 23 10:10:49 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/PutEmail.java      | 149 ++++++++++++++++---
 .../nifi/processors/standard/TestPutEmail.java  |  40 ++++-
 2 files changed, 167 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b97396c0/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
index d83b100..f632000 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
@@ -22,18 +22,22 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
 import javax.activation.DataHandler;
+import javax.mail.Authenticator;
 import javax.mail.Message;
 import javax.mail.Message.RecipientType;
 import javax.mail.MessagingException;
+import javax.mail.PasswordAuthentication;
 import javax.mail.Session;
-import javax.mail.URLName;
+import javax.mail.Transport;
 import javax.mail.internet.AddressException;
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.MimeBodyPart;
@@ -61,8 +65,6 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import com.sun.mail.smtp.SMTPTransport;
-
 @SupportsBatching
 @Tags({"email", "put", "notify", "smtp"})
 @CapabilityDescription("Sends an e-mail to configured recipients for each 
incoming FlowFile")
@@ -72,6 +74,7 @@ public class PutEmail extends AbstractProcessor {
             .name("SMTP Hostname")
             .description("The hostname of the SMTP host")
             .required(true)
+                   .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
     public static final PropertyDescriptor SMTP_PORT = new 
PropertyDescriptor.Builder()
@@ -79,21 +82,56 @@ public class PutEmail extends AbstractProcessor {
             .description("The Port used for SMTP communications")
             .required(true)
             .defaultValue("25")
+                   .expressionLanguageSupported(true)
             .addValidator(StandardValidators.PORT_VALIDATOR)
             .build();
     public static final PropertyDescriptor SMTP_USERNAME = new 
PropertyDescriptor.Builder()
             .name("SMTP Username")
             .description("Username for the SMTP account")
+                   .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
             .build();
     public static final PropertyDescriptor SMTP_PASSWORD = new 
PropertyDescriptor.Builder()
             .name("SMTP Password")
             .description("Password for the SMTP account")
+                   .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
             .sensitive(true)
             .build();
+    public static final PropertyDescriptor SMTP_AUTH = new 
PropertyDescriptor.Builder()
+                   .name("SMTP Auth")
+                   .description("Flag indicating whether authentication should 
be used")
+                   .required(true)
+                   .expressionLanguageSupported(true)
+                   .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue("true")
+            .build();
+    public static final PropertyDescriptor SMTP_TLS = new 
PropertyDescriptor.Builder()
+                   .name("SMTP TLS")
+                   .description("Flag indicating whether TLS should be 
enabled")
+                   .required(true)
+                   .expressionLanguageSupported(true)
+                   .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                   .defaultValue("false")
+                   .build();
+    public static final PropertyDescriptor SMTP_SOCKET_FACTORY = new 
PropertyDescriptor.Builder()
+                   .name("SMTP Socket Factory")
+                   .description("Socket Factory to use for SMTP Connection")
+                   .required(true)
+                   .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                   .defaultValue("javax.net.ssl.SSLSocketFactory")
+                   .build();
+    public static final PropertyDescriptor HEADER_XMAILER = new 
PropertyDescriptor.Builder()
+                   .name("SMTP X-Mailer Header")
+                   .description("X-Mailer used in the header of the outgoing 
email")
+                   .required(true)
+                   .expressionLanguageSupported(true)
+                   .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                   .defaultValue("NiFi")
+                   .build();
     public static final PropertyDescriptor FROM = new 
PropertyDescriptor.Builder()
             .name("From")
             .description("Specifies the Email address to use as the sender")
@@ -152,12 +190,27 @@ public class PutEmail extends AbstractProcessor {
             .allowableValues("true", "false")
             .defaultValue("false")
             .build();
-
+    
     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("FlowFiles that are 
successfully sent will be routed to this relationship").build();
     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("FlowFiles that fail to send 
will be routed to this relationship").build();
 
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
+    
+    /**
+     * Mapping of the mail properties to the NiFi PropertyDescriptors that 
will be evaluated at runtime
+     */
+    private static Map<String, PropertyDescriptor> propertyToContext = new 
HashMap<String, PropertyDescriptor>();
+    static {
+       propertyToContext.put("mail.smtp.host", SMTP_HOSTNAME);
+           propertyToContext.put("mail.smtp.port", SMTP_PORT);
+           propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT);
+           propertyToContext.put("mail.smtp.socketFactory.class", 
SMTP_SOCKET_FACTORY);
+           propertyToContext.put("mail.smtp.auth", SMTP_AUTH);
+           propertyToContext.put("mail.smtp.starttls.enable", SMTP_TLS);
+           propertyToContext.put("mail.smtp.user", SMTP_USERNAME);
+           propertyToContext.put("mail.smtp.password", SMTP_PASSWORD);
+    }
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -166,6 +219,10 @@ public class PutEmail extends AbstractProcessor {
         properties.add(SMTP_PORT);
         properties.add(SMTP_USERNAME);
         properties.add(SMTP_PASSWORD);
+        properties.add(SMTP_AUTH);
+        properties.add(SMTP_TLS);
+        properties.add(SMTP_SOCKET_FACTORY);
+        properties.add(HEADER_XMAILER);
         properties.add(FROM);
         properties.add(TO);
         properties.add(CC);
@@ -214,9 +271,10 @@ public class PutEmail extends AbstractProcessor {
             return;
         }
 
-        final Properties properties = new Properties();
-        properties.setProperty("smtp.mail.host", 
context.getProperty(SMTP_HOSTNAME).getValue());
-        final Session mailSession = Session.getInstance(properties);
+        final Properties properties = 
this.getMailPropertiesFromFlowFile(context, flowFile);
+        
+        final Session mailSession = this.createMailSession(properties);
+        
         final Message message = new MimeMessage(mailSession);
         final ProcessorLog logger = getLogger();
 
@@ -232,7 +290,7 @@ public class PutEmail extends AbstractProcessor {
             final InternetAddress[] bccAddresses = 
toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions(flowFile).getValue());
             message.setRecipients(RecipientType.BCC, bccAddresses);
 
-            message.setHeader("X-Mailer", "NiFi");
+            message.setHeader("X-Mailer", 
context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue());
             
message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue());
             String messageText = 
context.getProperty(MESSAGE).evaluateAttributeExpressions(flowFile).getValue();
 
@@ -264,18 +322,8 @@ public class PutEmail extends AbstractProcessor {
                 multipart.addBodyPart(mimeFile);
                 message.setContent(multipart);
             }
-
-            final String smtpHost = 
context.getProperty(SMTP_HOSTNAME).getValue();
-            final SMTPTransport transport = new SMTPTransport(mailSession, new 
URLName(smtpHost));
-            try {
-                final int smtpPort = 
context.getProperty(SMTP_PORT).asInteger();
-                final String smtpUsername = 
context.getProperty(SMTP_USERNAME).getValue();
-                final String smtpPassword = 
context.getProperty(SMTP_PASSWORD).getValue();
-                transport.connect(smtpHost, smtpPort, smtpUsername, 
smtpPassword);
-                transport.sendMessage(message, message.getAllRecipients());
-            } finally {
-                transport.close();
-            }
+            
+            Transport.send(message);
 
             session.getProvenanceReporter().send(flowFile, "mailto:"; + 
message.getAllRecipients()[0].toString());
             session.transfer(flowFile, REL_SUCCESS);
@@ -287,7 +335,66 @@ public class PutEmail extends AbstractProcessor {
         }
     }
 
-    public static final String BODY_SEPARATOR = 
"\n\n--------------------------------------------------\n";
+    /**
+     * Based on the input properties, determine whether an authenticate or 
unauthenticated session
+     * should be used. If authenticated, creates a Password Authenticator for 
use in sending the email.
+     * 
+     * @param properties
+     * @return
+     */
+       private Session createMailSession(final Properties properties) {
+               String authValue = properties.getProperty("mail.smtp.auth");
+        Boolean auth = Boolean.valueOf(authValue);
+        
+        /*
+         * Conditionally create a password authenticator if the 'auth' 
parameter is set.
+         */
+        final Session mailSession = auth ? Session.getInstance(properties, new 
Authenticator() {
+               @Override
+               public PasswordAuthentication getPasswordAuthentication() {
+                       String username = 
properties.getProperty("mail.smtp.user"),
+                                       password = 
properties.getProperty("mail.smtp.password");
+                       return new PasswordAuthentication(username, password);
+               }
+        }) : Session.getInstance(properties); // without auth
+               return mailSession;
+       }
+
+    /**
+     * Uses the mapping of javax.mail properties to NiFi PropertyDescriptors 
to build
+     * the required Properties object to be used for sending this email
+     * 
+     * @param context
+     * @param flowFile
+     * @return
+     */
+    private Properties getMailPropertiesFromFlowFile(final ProcessContext 
context, final FlowFile flowFile) {
+
+        final Properties properties = new Properties();
+        
+        final ProcessorLog logger = this.getLogger();
+        
+        for(Entry<String, PropertyDescriptor> entry : 
propertyToContext.entrySet()) {
+               
+               // Evaluate the property descriptor against the flow file
+               String flowFileValue = 
context.getProperty(entry.getValue()).evaluateAttributeExpressions(flowFile).getValue();
+               
+               String property = entry.getKey();
+               
+               logger.debug("Evaluated Mail Property: {} with Value: {}", new 
Object[]{property, flowFileValue});
+               
+               // Nullable values are not allowed, so filter out
+               if(null != flowFileValue) {
+                       properties.setProperty(property, flowFileValue);
+               }
+               
+        }
+        
+        return properties;
+        
+       }
+
+       public static final String BODY_SEPARATOR = 
"\n\n--------------------------------------------------\n";
 
     private static String formatAttributes(final FlowFile flowFile, final 
String messagePrepend) {
         StringBuilder message = new StringBuilder(messagePrepend);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b97396c0/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java
index b737ed6..b0ce477 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java
@@ -19,14 +19,19 @@ package org.apache.nifi.processors.standard;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestPutEmail {
 
     @Test
-    public void testHotNotFound() {
+    public void testHostNotFound() {
         // verifies that files are routed to failure when the SMTP host 
doesn't exist
         final TestRunner runner = TestRunners.newTestRunner(new PutEmail());
         runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123");
@@ -42,4 +47,37 @@ public class TestPutEmail {
         runner.assertQueueEmpty();
         runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
     }
+    
+    @Test
+    public void testEmailPropertyFormatters() {
+        // verifies that files are routed to failure when the SMTP host 
doesn't exist
+        final TestRunner runner = TestRunners.newTestRunner(new PutEmail());
+        runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
+        runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
+        runner.setProperty(PutEmail.SMTP_SOCKET_FACTORY, 
"${dynamicSocketFactory}");
+        runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
+        runner.setProperty(PutEmail.FROM, "[email protected]");
+        runner.setProperty(PutEmail.MESSAGE, "Message Body");
+        runner.setProperty(PutEmail.TO, "[email protected]");
+        
+        ProcessSession session = 
runner.getProcessSessionFactory().createSession();
+        FlowFile ff = session.create();
+        ff = session.putAttribute(ff, "dynamicSocketFactory", 
"testingSocketFactory");
+        ProcessContext context = runner.getProcessContext();
+        
+        String xmailer = 
context.getProperty(PutEmail.HEADER_XMAILER).evaluateAttributeExpressions(ff).getValue();
+        assertEquals("X-Mailer Header", "TestingNiFi", xmailer);
+        
+        String socketFactory = 
context.getProperty(PutEmail.SMTP_SOCKET_FACTORY).evaluateAttributeExpressions(ff).getValue();
+        assertEquals("Socket Factory", "testingSocketFactory", socketFactory);
+        
+        final Map<String, String> attributes = new HashMap<>();
+        runner.enqueue("Some Text".getBytes(), attributes);
+
+        runner.run();
+
+        runner.assertQueueEmpty();
+        runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
+    }
+    
 }

Reply via email to