Repository: oozie Updated Branches: refs/heads/master 42cebf6e2 -> 403e49a70
OOZIE-2160 Support attachment in email action (ryota) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/403e49a7 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/403e49a7 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/403e49a7 Branch: refs/heads/master Commit: 403e49a709221cb25c09cb4f5cc175703584aff9 Parents: 42cebf6 Author: egashira <[email protected]> Authored: Wed Mar 11 14:14:29 2015 -0700 Committer: egashira <[email protected]> Committed: Wed Mar 11 14:14:29 2015 -0700 ---------------------------------------------------------------------- client/src/main/resources/email-action-0.2.xsd | 1 + .../oozie/action/email/EmailActionExecutor.java | 104 +++++++++++++++++-- .../service/AbandonedCoordCheckerService.java | 2 +- .../action/email/TestEmailActionExecutor.java | 84 +++++++++++++++ .../site/twiki/DG_EmailActionExtension.twiki | 3 + 5 files changed, 186 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/client/src/main/resources/email-action-0.2.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/email-action-0.2.xsd b/client/src/main/resources/email-action-0.2.xsd index 60ce545..0096dc7 100644 --- a/client/src/main/resources/email-action-0.2.xsd +++ b/client/src/main/resources/email-action-0.2.xsd @@ -29,6 +29,7 @@ <xs:element name="subject" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="body" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="content_type" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="attachment" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> </xs:schema> http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java b/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java index 21c6313..04d49c3 100644 --- a/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/email/EmailActionExecutor.java @@ -18,26 +18,44 @@ package org.apache.oozie.action.email; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import javax.activation.DataHandler; +import javax.activation.DataSource; import javax.mail.Authenticator; import javax.mail.Message; import javax.mail.Message.RecipientType; import javax.mail.MessagingException; +import javax.mail.Multipart; import javax.mail.NoSuchProviderException; import javax.mail.PasswordAuthentication; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.AddressException; import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeBodyPart; import javax.mail.internet.MimeMessage; +import javax.mail.internet.MimeMultipart; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.action.ActionExecutorException.ErrorType; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorException; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.util.XLog; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.Namespace; @@ -60,10 +78,13 @@ public class EmailActionExecutor extends ActionExecutor { private final static String CC = "cc"; private final static String SUB = "subject"; private final static String BOD = "body"; + private final static String ATTACHMENT = "attachment"; private final static String COMMA = ","; private final static String CONTENT_TYPE = "content_type"; private final static String DEFAULT_CONTENT_TYPE = "text/plain"; + private XLog LOG = XLog.getLog(getClass()); + public EmailActionExecutor() { super("email"); } @@ -94,6 +115,7 @@ public class EmailActionExecutor extends ActionExecutor { String ccs[] = new String[0]; String subject = ""; String body = ""; + String attachments[] = new String[0]; String contentType; Element child = null; @@ -118,18 +140,23 @@ public class EmailActionExecutor extends ActionExecutor { // <body> - One ought to exist. body = element.getChildTextTrim(BOD, ns); + // <attachment> - Optional + String attachment = element.getChildTextTrim(ATTACHMENT, ns); + if(attachment != null) { + attachments = attachment.split(COMMA); + } + contentType = element.getChildTextTrim(CONTENT_TYPE, ns); if (contentType == null || contentType.isEmpty()) { contentType = DEFAULT_CONTENT_TYPE; } - // All good - lets try to mail! - email(tos, ccs, subject, body, contentType); + email(tos, ccs, subject, body, attachments, contentType, context.getWorkflow().getUser()); } - public void email(String[] to, String[] cc, String subject, String body, String contentType) - throws ActionExecutorException { + public void email(String[] to, String[] cc, String subject, String body, String[] attachments, String contentType, + String user) throws ActionExecutorException { // Get mailing server details. String smtpHost = getOozieConf().get(EMAIL_SMTP_HOST, "localhost"); String smtpPort = getOozieConf().get(EMAIL_SMTP_PORT, "25"); @@ -181,12 +208,47 @@ public class EmailActionExecutor extends ActionExecutor { // Set subject message.setSubject(subject); - message.setContent(body, contentType); - } catch (AddressException e) { + + // when there is attachment + if (attachments != null && attachments.length > 0) { + Multipart multipart = new MimeMultipart(); + + // Set body text + MimeBodyPart bodyTextPart = new MimeBodyPart(); + bodyTextPart.setText(body); + multipart.addBodyPart(bodyTextPart); + + for (String attachment : attachments) { + URI attachUri = new URI(attachment); + if (attachUri.getScheme() != null && attachUri.getScheme().equals("file")) { + throw new ActionExecutorException(ErrorType.ERROR, "EM008", + "Encountered an error when attaching a file. A local file cannot be attached:" + + attachment); + } + MimeBodyPart messageBodyPart = new MimeBodyPart(); + DataSource source = new URIDataSource(attachUri, user); + messageBodyPart.setDataHandler(new DataHandler(source)); + messageBodyPart.setFileName(new File(attachment).getName()); + multipart.addBodyPart(messageBodyPart); + } + message.setContent(multipart); + } + else { + message.setContent(body, contentType); + } + } + catch (AddressException e) { throw new ActionExecutorException(ErrorType.ERROR, "EM004", "Bad address format in <to> or <cc>.", e); - } catch (MessagingException e) { + } + catch (MessagingException e) { throw new ActionExecutorException(ErrorType.ERROR, "EM005", "An error occured while adding recipients.", e); } + catch (URISyntaxException e) { + throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e); + } + catch (HadoopAccessorException e) { + throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e); + } try { // Send over SMTP Transport @@ -239,4 +301,32 @@ public class EmailActionExecutor extends ActionExecutor { return new PasswordAuthentication(user, password); } } + + class URIDataSource implements DataSource{ + + HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); + FileSystem fs; + URI uri; + public URIDataSource(URI uri, String user) throws HadoopAccessorException { + this.uri = uri; + Configuration fsConf = has.createJobConf(uri.getAuthority()); + fs = has.createFileSystem(user, uri, fsConf); + } + + public InputStream getInputStream() throws IOException { + return fs.open(new Path(uri)); + } + + public OutputStream getOutputStream() throws IOException { + return fs.create(new Path(uri)); + } + + public String getContentType() { + return "application/octet-stream"; + } + + public String getName() { + return uri.getPath(); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java b/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java index ec8cf71..99cf76b 100644 --- a/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java +++ b/core/src/main/java/org/apache/oozie/service/AbandonedCoordCheckerService.java @@ -164,7 +164,7 @@ public class AbandonedCoordCheckerService implements Service { } EmailActionExecutor email = new EmailActionExecutor(); String subject = SUBJECT + " for " + serverURL + " at " + DateUtils.formatDateOozieTZ(new Date()); - email.email(to, new String[0], subject, body, CONTENT_TYPE); + email.email(to, new String[0], subject, body, null, CONTENT_TYPE, null); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java b/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java index e1f314e..1ccd22d 100644 --- a/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/email/TestEmailActionExecutor.java @@ -18,8 +18,22 @@ package org.apache.oozie.action.email; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import javax.mail.BodyPart; +import javax.mail.Multipart; +import javax.mail.internet.MimeMessage; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.action.hadoop.ActionExecutorTestCase; import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; @@ -175,6 +189,76 @@ public class TestEmailActionExecutor extends ActionExecutorTestCase { assertTrue(server.getReceivedMessages()[0].getContentType().contains("text/html")); } + public void testLocalFileAttachmentError() throws Exception { + File attachFile1 = new File(getTestCaseDir() + File.separator + "attachment1.txt"); + String content1 = "this is attachment content in file1"; + File attachFile2 = new File(getTestCaseDir() + File.separator + "attachment2.txt"); + String content2 = "this is attachment content in file2"; + BufferedWriter output = new BufferedWriter(new FileWriter(attachFile1)); + output.write(content1); + output.close(); + output = new BufferedWriter(new FileWriter(attachFile2)); + output.write(content2); + output.close(); + StringBuilder tag = new StringBuilder(); + tag.append("file://").append(attachFile1.getAbsolutePath()).append(",file://") + .append(attachFile2.getAbsolutePath()); + // local file not attached to email (for security reason) + try{ + assertAttachment(tag.toString(), 0, content1, content2); + fail(); + }catch (ActionExecutorException e){ + assertEquals("EM008", e.getErrorCode()); + } + } + + public void testHDFSFileAttachment() throws Exception { + String file1 = "file1"; + Path path1 = new Path(getFsTestCaseDir(), file1); + String content1 = "this is attachment content in file1"; + String file2 = "file2"; + Path path2 = new Path(getFsTestCaseDir(), file2); + String content2 = "this is attachment content in file2"; + FileSystem fs = getFileSystem(); + Writer writer = new OutputStreamWriter(fs.create(path1, true)); + writer.write(content1); + writer.close(); + writer = new OutputStreamWriter(fs.create(path2, true)); + writer.write(content2); + writer.close(); + StringBuilder tag = new StringBuilder(); + tag.append(path1.toString()).append(",").append(path2.toString()); + assertAttachment(tag.toString(), 2, content1, content2); + } + + private void assertAttachment(String attachtag, int attachCount, String content1, String content2) throws Exception { + StringBuilder elem = new StringBuilder(); + elem.append("<email xmlns=\"uri:oozie:email-action:0.2\">"); + elem.append("<to>[email protected]</to>"); + elem.append("<subject>sub</subject>"); + elem.append("<body><body> This is a test mail </body></body>"); + elem.append("<attachment>").append(attachtag).append("</attachment>"); + elem.append("</email>"); + EmailActionExecutor emailContnetType = new EmailActionExecutor(); + emailContnetType.validateAndMail(createAuthContext("email-action"), XmlUtils.parseXml(elem.toString())); + MimeMessage retMeg = server.getReceivedMessages()[0]; + Multipart retParts = (Multipart) (retMeg.getContent()); + int numAttach = 0; + for (int i = 0; i < retParts.getCount(); i++) { + BodyPart bp = retParts.getBodyPart(i); + String disp = bp.getDisposition(); + String retValue = IOUtils.toString(bp.getInputStream()); + if (disp != null && (disp.equals(BodyPart.ATTACHMENT))) { + assertTrue(retValue.equals(content1) || retValue.equals(content2)); + numAttach++; + } + else { + assertEquals("<body> This is a test mail </body>", retValue); + } + } + assertEquals(attachCount, numAttach); + } + @Override protected void tearDown() throws Exception { super.tearDown(); http://git-wip-us.apache.org/repos/asf/oozie/blob/403e49a7/docs/src/site/twiki/DG_EmailActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_EmailActionExtension.twiki b/docs/src/site/twiki/DG_EmailActionExtension.twiki index 695e16b..fee16b0 100644 --- a/docs/src/site/twiki/DG_EmailActionExtension.twiki +++ b/docs/src/site/twiki/DG_EmailActionExtension.twiki @@ -32,6 +32,7 @@ All values specified in the =email= action can be parameterized (templatized) us <subject>[SUBJECT]</subject> <body>[BODY]</body> <content_type>[CONTENT-TYPE]</content_type> <!-- content_type is optional --> + <attachment>[COMMA-SEPARATED-HDFS-FILE-PATHS]</attachment> <!-- attachment is optional --> </email> <ok to="[NODE-NAME]"/> <error to="[NODE-NAME]"/> @@ -47,6 +48,7 @@ The =subject= and =body= commands are used to specify subject and body of the ma From uri:oozie:email-action:0.2 one can also specify mail content type as <content_type>text/html</content_type>. "text/plain" is default. +The =attachment= is used to attach a file(s) on HDFS to the mail. Multiple attachment can be provided using comma-separated values. Non fully qualified path is considered as a file on default HDFS. A local file cannot be attached. *Configuration* @@ -98,6 +100,7 @@ with the subject and body both containing the workflow ID after substitution. <xs:element name="subject" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="body" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="content_type" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="attachment" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> </xs:schema>
