Repository: oozie
Updated Branches:
  refs/heads/master 42cebf6e2 -> 403e49a70


OOZIE-2160 Support attachment in email action (ryota)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/403e49a7
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/403e49a7
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/403e49a7

Branch: refs/heads/master
Commit: 403e49a709221cb25c09cb4f5cc175703584aff9
Parents: 42cebf6
Author: egashira <[email protected]>
Authored: Wed Mar 11 14:14:29 2015 -0700
Committer: egashira <[email protected]>
Committed: Wed Mar 11 14:14:29 2015 -0700

----------------------------------------------------------------------
 client/src/main/resources/email-action-0.2.xsd  |   1 +
 .../oozie/action/email/EmailActionExecutor.java | 104 +++++++++++++++++--
 .../service/AbandonedCoordCheckerService.java   |   2 +-
 .../action/email/TestEmailActionExecutor.java   |  84 +++++++++++++++
 .../site/twiki/DG_EmailActionExtension.twiki    |   3 +
 5 files changed, 186 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/client/src/main/resources/email-action-0.2.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/email-action-0.2.xsd 
b/client/src/main/resources/email-action-0.2.xsd
index 60ce545..0096dc7 100644
--- a/client/src/main/resources/email-action-0.2.xsd
+++ b/client/src/main/resources/email-action-0.2.xsd
@@ -29,6 +29,7 @@
             <xs:element name="subject" type="xs:string" minOccurs="1" 
maxOccurs="1"/>
             <xs:element name="body" type="xs:string" minOccurs="1" 
maxOccurs="1"/>
             <xs:element name="content_type" type="xs:string" minOccurs="0" 
maxOccurs="1"/>
+            <xs:element name="attachment" type="xs:string" minOccurs="0" 
maxOccurs="1"/>
         </xs:sequence>
     </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java
index 21c6313..04d49c3 100644
--- a/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java
@@ -18,26 +18,44 @@
 
 package org.apache.oozie.action.email;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
 import javax.mail.Authenticator;
 import javax.mail.Message;
 import javax.mail.Message.RecipientType;
 import javax.mail.MessagingException;
+import javax.mail.Multipart;
 import javax.mail.NoSuchProviderException;
 import javax.mail.PasswordAuthentication;
 import javax.mail.Session;
 import javax.mail.Transport;
 import javax.mail.internet.AddressException;
 import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeBodyPart;
 import javax.mail.internet.MimeMessage;
+import javax.mail.internet.MimeMultipart;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.action.ActionExecutorException.ErrorType;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 import org.jdom.Namespace;
@@ -60,10 +78,13 @@ public class EmailActionExecutor extends ActionExecutor {
     private final static String CC = "cc";
     private final static String SUB = "subject";
     private final static String BOD = "body";
+    private final static String ATTACHMENT = "attachment";
     private final static String COMMA = ",";
     private final static String CONTENT_TYPE = "content_type";
 
     private final static String DEFAULT_CONTENT_TYPE = "text/plain";
+    private XLog LOG = XLog.getLog(getClass());
+
     public EmailActionExecutor() {
         super("email");
     }
@@ -94,6 +115,7 @@ public class EmailActionExecutor extends ActionExecutor {
         String ccs[] = new String[0];
         String subject = "";
         String body = "";
+        String attachments[] = new String[0];
         String contentType;
         Element child = null;
 
@@ -118,18 +140,23 @@ public class EmailActionExecutor extends ActionExecutor {
         // <body> - One ought to exist.
         body = element.getChildTextTrim(BOD, ns);
 
+        // <attachment> - Optional
+        String attachment = element.getChildTextTrim(ATTACHMENT, ns);
+        if(attachment != null) {
+            attachments = attachment.split(COMMA);
+        }
+
         contentType = element.getChildTextTrim(CONTENT_TYPE, ns);
         if (contentType == null || contentType.isEmpty()) {
             contentType = DEFAULT_CONTENT_TYPE;
         }
 
-
         // All good - lets try to mail!
-        email(tos, ccs, subject, body, contentType);
+        email(tos, ccs, subject, body, attachments, contentType, 
context.getWorkflow().getUser());
     }
 
-    public void email(String[] to, String[] cc, String subject, String body, 
String contentType)
-            throws ActionExecutorException {
+    public void email(String[] to, String[] cc, String subject, String body, 
String[] attachments, String contentType,
+            String user) throws ActionExecutorException {
         // Get mailing server details.
         String smtpHost = getOozieConf().get(EMAIL_SMTP_HOST, "localhost");
         String smtpPort = getOozieConf().get(EMAIL_SMTP_PORT, "25");
@@ -181,12 +208,47 @@ public class EmailActionExecutor extends ActionExecutor {
 
             // Set subject
             message.setSubject(subject);
-            message.setContent(body, contentType);
-        } catch (AddressException e) {
+
+            // when there is attachment
+            if (attachments != null && attachments.length > 0) {
+                Multipart multipart = new MimeMultipart();
+
+                // Set body text
+                MimeBodyPart bodyTextPart = new MimeBodyPart();
+                bodyTextPart.setText(body);
+                multipart.addBodyPart(bodyTextPart);
+
+                for (String attachment : attachments) {
+                    URI attachUri = new URI(attachment);
+                    if (attachUri.getScheme() != null && 
attachUri.getScheme().equals("file")) {
+                        throw new ActionExecutorException(ErrorType.ERROR, 
"EM008",
+                                "Encountered an error when attaching a file. A 
local file cannot be attached:"
+                                        + attachment);
+                    }
+                    MimeBodyPart messageBodyPart = new MimeBodyPart();
+                    DataSource source = new URIDataSource(attachUri, user);
+                    messageBodyPart.setDataHandler(new DataHandler(source));
+                    messageBodyPart.setFileName(new 
File(attachment).getName());
+                    multipart.addBodyPart(messageBodyPart);
+                }
+                message.setContent(multipart);
+            }
+            else {
+                message.setContent(body, contentType);
+            }
+        }
+        catch (AddressException e) {
             throw new ActionExecutorException(ErrorType.ERROR, "EM004", "Bad 
address format in <to> or <cc>.", e);
-        } catch (MessagingException e) {
+        }
+        catch (MessagingException e) {
             throw new ActionExecutorException(ErrorType.ERROR, "EM005", "An 
error occured while adding recipients.", e);
         }
+        catch (URISyntaxException e) {
+            throw new ActionExecutorException(ErrorType.ERROR, "EM008", 
"Encountered an error when attaching a file", e);
+        }
+        catch (HadoopAccessorException e) {
+            throw new ActionExecutorException(ErrorType.ERROR, "EM008", 
"Encountered an error when attaching a file", e);
+        }
 
         try {
             // Send over SMTP Transport
@@ -239,4 +301,32 @@ public class EmailActionExecutor extends ActionExecutor {
            return new PasswordAuthentication(user, password);
         }
     }
+
+    class URIDataSource implements DataSource{
+
+        HadoopAccessorService has = 
Services.get().get(HadoopAccessorService.class);
+        FileSystem fs;
+        URI uri;
+        public URIDataSource(URI uri, String user) throws 
HadoopAccessorException {
+            this.uri = uri;
+            Configuration fsConf = has.createJobConf(uri.getAuthority());
+            fs = has.createFileSystem(user, uri, fsConf);
+        }
+
+        public InputStream getInputStream() throws IOException {
+            return fs.open(new Path(uri));
+        }
+
+        public OutputStream getOutputStream() throws IOException {
+            return fs.create(new Path(uri));
+        }
+
+        public String getContentType() {
+            return "application/octet-stream";
+        }
+
+        public String getName() {
+            return uri.getPath();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java 
b/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java
index ec8cf71..99cf76b 100644
--- 
a/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java
+++ 
b/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java
@@ -164,7 +164,7 @@ public class AbandonedCoordCheckerService implements 
Service {
             }
             EmailActionExecutor email = new EmailActionExecutor();
             String subject = SUBJECT + " for " + serverURL + " at " + 
DateUtils.formatDateOozieTZ(new Date());
-            email.email(to, new String[0], subject, body, CONTENT_TYPE);
+            email.email(to, new String[0], subject, body, null, CONTENT_TYPE, 
null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java
index e1f314e..1ccd22d 100644
--- 
a/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java
@@ -18,8 +18,22 @@
 
 package org.apache.oozie.action.email;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import javax.mail.BodyPart;
+import javax.mail.Multipart;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.action.hadoop.ActionExecutorTestCase;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
@@ -175,6 +189,76 @@ public class TestEmailActionExecutor extends 
ActionExecutorTestCase {
         
assertTrue(server.getReceivedMessages()[0].getContentType().contains("text/html"));
     }
 
+    public void testLocalFileAttachmentError() throws Exception {
+        File attachFile1 = new File(getTestCaseDir() + File.separator + 
"attachment1.txt");
+        String content1 = "this is attachment content in file1";
+        File attachFile2 = new File(getTestCaseDir() + File.separator + 
"attachment2.txt");
+        String content2 = "this is attachment content in file2";
+        BufferedWriter output = new BufferedWriter(new 
FileWriter(attachFile1));
+        output.write(content1);
+        output.close();
+        output = new BufferedWriter(new FileWriter(attachFile2));
+        output.write(content2);
+        output.close();
+        StringBuilder tag = new StringBuilder();
+        
tag.append("file://").append(attachFile1.getAbsolutePath()).append(",file://")
+                .append(attachFile2.getAbsolutePath());
+        // local file not attached to email (for security reason)
+        try{
+            assertAttachment(tag.toString(), 0, content1, content2);
+            fail();
+        }catch (ActionExecutorException e){
+            assertEquals("EM008", e.getErrorCode());
+        }
+    }
+
+    public void testHDFSFileAttachment() throws Exception {
+        String file1 = "file1";
+        Path path1 = new Path(getFsTestCaseDir(), file1);
+        String content1 = "this is attachment content in file1";
+        String file2 = "file2";
+        Path path2 = new Path(getFsTestCaseDir(), file2);
+        String content2 = "this is attachment content in file2";
+        FileSystem fs = getFileSystem();
+        Writer writer = new OutputStreamWriter(fs.create(path1, true));
+        writer.write(content1);
+        writer.close();
+        writer = new OutputStreamWriter(fs.create(path2, true));
+        writer.write(content2);
+        writer.close();
+        StringBuilder tag = new StringBuilder();
+        tag.append(path1.toString()).append(",").append(path2.toString());
+        assertAttachment(tag.toString(), 2, content1, content2);
+    }
+
+    private void assertAttachment(String attachtag, int attachCount, String 
content1, String content2) throws Exception {
+        StringBuilder elem = new StringBuilder();
+        elem.append("<email xmlns=\"uri:oozie:email-action:0.2\">");
+        elem.append("<to>[email protected]</to>");
+        elem.append("<subject>sub</subject>");
+        elem.append("<body>&lt;body&gt; This is a test mail 
&lt;/body&gt;</body>");
+        elem.append("<attachment>").append(attachtag).append("</attachment>");
+        elem.append("</email>");
+        EmailActionExecutor emailContnetType = new EmailActionExecutor();
+        emailContnetType.validateAndMail(createAuthContext("email-action"), 
XmlUtils.parseXml(elem.toString()));
+        MimeMessage retMeg = server.getReceivedMessages()[0];
+        Multipart retParts = (Multipart) (retMeg.getContent());
+        int numAttach = 0;
+        for (int i = 0; i < retParts.getCount(); i++) {
+            BodyPart bp = retParts.getBodyPart(i);
+            String disp = bp.getDisposition();
+            String retValue = IOUtils.toString(bp.getInputStream());
+            if (disp != null && (disp.equals(BodyPart.ATTACHMENT))) {
+                assertTrue(retValue.equals(content1) || 
retValue.equals(content2));
+                numAttach++;
+            }
+            else {
+                assertEquals("<body> This is a test mail </body>", retValue);
+            }
+        }
+        assertEquals(attachCount, numAttach);
+    }
+
     @Override
     protected void tearDown() throws Exception {
         super.tearDown();

http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/docs/src/site/twiki/DG_EmailActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_EmailActionExtension.twiki 
b/docs/src/site/twiki/DG_EmailActionExtension.twiki
index 695e16b..fee16b0 100644
--- a/docs/src/site/twiki/DG_EmailActionExtension.twiki
+++ b/docs/src/site/twiki/DG_EmailActionExtension.twiki
@@ -32,6 +32,7 @@ All values specified in the =email= action can be 
parameterized (templatized) us
             <subject>[SUBJECT]</subject>
             <body>[BODY]</body>
             <content_type>[CONTENT-TYPE]</content_type> <!-- content_type is 
optional -->
+            <attachment>[COMMA-SEPARATED-HDFS-FILE-PATHS]</attachment> <!-- 
attachment is optional -->
         </email>
         <ok to="[NODE-NAME]"/>
         <error to="[NODE-NAME]"/>
@@ -47,6 +48,7 @@ The =subject= and =body= commands are used to specify subject 
and body of the ma
 From uri:oozie:email-action:0.2 one can also specify mail content type as 
<content_type>text/html</content_type>.
 "text/plain" is default.
 
+The =attachment= is used to attach a file(s) on HDFS to the mail. Multiple 
attachment can be provided using comma-separated values. Non fully qualified 
path is considered as a file on default HDFS. A local file cannot be attached.
 
 *Configuration*
 
@@ -98,6 +100,7 @@ with the subject and body both containing the workflow ID 
after substitution.
             <xs:element name="subject" type="xs:string" minOccurs="1" 
maxOccurs="1"/>
             <xs:element name="body" type="xs:string" minOccurs="1" 
maxOccurs="1"/>
             <xs:element name="content_type" type="xs:string" minOccurs="0" 
maxOccurs="1"/>
+            <xs:element name="attachment" type="xs:string" minOccurs="0" 
maxOccurs="1"/>
         </xs:sequence>
     </xs:complexType>
 </xs:schema>

Reply via email to