Repository: nifi Updated Branches: refs/heads/master 2f80cd504 -> 8ef337b65
NIFI-1434 Prevent array index exception in PutEmail Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8ef337b6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8ef337b6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8ef337b6 Branch: refs/heads/master Commit: 8ef337b65e479cb57bedd8023772e591c02dca74 Parents: 2f80cd5 Author: Richard Miskin <[email protected]> Authored: Sun Jan 24 09:07:57 2016 +0000 Committer: jpercivall <[email protected]> Committed: Sat Jan 30 10:56:31 2016 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/PutEmail.java | 60 +++++-- .../nifi/processors/standard/TestPutEmail.java | 167 ++++++++++++++++--- 2 files changed, 189 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8ef337b6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java index 5605b8d..e71f5aa 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java @@ -298,16 +298,10 @@ public class PutEmail extends AbstractProcessor { final ProcessorLog logger = getLogger(); try { - message.setFrom(InternetAddress.parse(context.getProperty(FROM).evaluateAttributeExpressions(flowFile).getValue())[0]); - - final InternetAddress[] toAddresses = toInetAddresses(context.getProperty(TO).evaluateAttributeExpressions(flowFile).getValue()); - message.setRecipients(RecipientType.TO, toAddresses); - - final InternetAddress[] ccAddresses = toInetAddresses(context.getProperty(CC).evaluateAttributeExpressions(flowFile).getValue()); - message.setRecipients(RecipientType.CC, ccAddresses); - - final InternetAddress[] bccAddresses = toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions(flowFile).getValue()); - message.setRecipients(RecipientType.BCC, bccAddresses); + message.addFrom(toInetAddresses(context, flowFile, FROM)); + message.setRecipients(RecipientType.TO, toInetAddresses(context, flowFile, TO)); + message.setRecipients(RecipientType.CC, toInetAddresses(context, flowFile, CC)); + message.setRecipients(RecipientType.BCC, toInetAddresses(context, flowFile, BCC)); message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue()); message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue()); @@ -344,14 +338,14 @@ public class PutEmail extends AbstractProcessor { message.setContent(multipart); } - Transport.send(message); + send(message); session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString()); session.transfer(flowFile, REL_SUCCESS); logger.info("Sent email as a result of receiving {}", new Object[]{flowFile}); } catch (final ProcessException | MessagingException | IOException e) { context.yield(); - logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e}); + logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e.getMessage()}, e); session.transfer(flowFile, REL_FAILURE); } } @@ -418,7 +412,7 @@ public class PutEmail extends AbstractProcessor { StringBuilder message = new StringBuilder(messagePrepend); message.append(BODY_SEPARATOR); message.append("\nStandard FlowFile Metadata:"); - message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getId())); + message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getAttribute(CoreAttributes.UUID.key()))); message.append(String.format("\n\t%1$s = '%2$s'", "entryDate", new Date(flowFile.getEntryDate()))); message.append(String.format("\n\t%1$s = '%2$s'", "fileSize", flowFile.getSize())); message.append("\nFlowFile Attributes:"); @@ -429,11 +423,43 @@ public class PutEmail extends AbstractProcessor { return message.toString(); } - private static InternetAddress[] toInetAddresses(final String val) throws AddressException { - if (val == null) { - return new InternetAddress[0]; + /** + * @param context the current context + * @param flowFile the current flow file + * @param propertyDescriptor the property to evaluate + * @return an InternetAddress[] parsed from the supplied property + * @throws AddressException if the property cannot be parsed to a valid InternetAddress[] + */ + private InternetAddress[] toInetAddresses(final ProcessContext context, final FlowFile flowFile, + PropertyDescriptor propertyDescriptor) throws AddressException { + InternetAddress[] parse; + String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue(); + if (value == null || value.isEmpty()){ + if (propertyDescriptor.isRequired()) { + final String exceptionMsg = "Required property '" + propertyDescriptor.getDisplayName() + "' evaluates to an empty string."; + throw new AddressException(exceptionMsg); + } else { + parse = new InternetAddress[0]; + } + } else { + try { + parse = InternetAddress.parse(value); + } catch (AddressException e) { + final String exceptionMsg = "Unable to parse a valid address for property '" + propertyDescriptor.getDisplayName() + "' with value '"+ value +"'"; + throw new AddressException(exceptionMsg); + } } - return InternetAddress.parse(val); + return parse; + } + + /** + * Wrapper for static method {@link Transport#send(Message)} to add testability of this class. + * + * @param msg the message to send + * @throws MessagingException on error + */ + protected void send(final Message msg) throws MessagingException { + Transport.send(msg); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8ef337b6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java index 727e8e9..fb11d8f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java @@ -16,28 +16,79 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; +import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.internet.MimeMessage.RecipientType; + +import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import static org.junit.Assert.assertEquals; +import org.junit.Before; import org.junit.Test; public class TestPutEmail { + /** + * Extension to PutEmail that stubs out the calls to + * Transport.sendMessage(). + * + * <p> + * All sent messages are records in a list available via the + * {@link #getMessages()} method.</p> + * <p> Calling + * {@link #setException(MessagingException)} will cause the supplied exception to be + * thrown when sendMessage is invoked. + * </p> + */ + private static final class PutEmailExtension extends PutEmail { + private MessagingException e; + private ArrayList<Message> messages = new ArrayList<>(); + + @Override + protected void send(Message msg) throws MessagingException { + messages.add(msg); + if (this.e != null) { + throw e; + } + } + + void setException(final MessagingException e) { + this.e = e; + } + + List<Message> getMessages() { + return messages; + } + } + + PutEmailExtension processor; + TestRunner runner; + + @Before + public void setup() { + processor = new PutEmailExtension(); + runner = TestRunners.newTestRunner(processor); + } + @Test - public void testHostNotFound() { - // verifies that files are routed to failure when the SMTP host doesn't exist - final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); + public void testExceptionWhenSending() { + // verifies that files are routed to failure when Transport.send() throws a MessagingException runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123"); runner.setProperty(PutEmail.FROM, "[email protected]"); runner.setProperty(PutEmail.TO, "[email protected]"); runner.setProperty(PutEmail.MESSAGE, "Message Body"); + processor.setException(new MessagingException("Forced failure from send()")); + final Map<String, String> attributes = new HashMap<>(); runner.enqueue("Some Text".getBytes(), attributes); @@ -45,38 +96,112 @@ public class TestPutEmail { runner.assertQueueEmpty(); runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); + assertEquals("Expected an attempt to send a single message", 1, processor.getMessages().size()); } @Test - public void testEmailPropertyFormatters() { - // verifies that files are routed to failure when the SMTP host doesn't exist - final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); - runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + public void testOutgoingMessage() throws Exception { + // verifies that are set on the outgoing Message correctly runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); - runner.setProperty(PutEmail.SMTP_SOCKET_FACTORY, "${dynamicSocketFactory}"); runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); runner.setProperty(PutEmail.FROM, "[email protected]"); runner.setProperty(PutEmail.MESSAGE, "Message Body"); runner.setProperty(PutEmail.TO, "[email protected]"); - ProcessSession session = runner.getProcessSessionFactory().createSession(); - FlowFile ff = session.create(); - ff = session.putAttribute(ff, "dynamicSocketFactory", "testingSocketFactory"); - ProcessContext context = runner.getProcessContext(); + runner.enqueue("Some Text".getBytes()); + + runner.run(); - String xmailer = context.getProperty(PutEmail.HEADER_XMAILER).evaluateAttributeExpressions(ff).getValue(); - assertEquals("X-Mailer Header", "TestingNiFi", xmailer); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS); - String socketFactory = context.getProperty(PutEmail.SMTP_SOCKET_FACTORY).evaluateAttributeExpressions(ff).getValue(); - assertEquals("Socket Factory", "testingSocketFactory", socketFactory); + // Verify that the Message was populated correctly + assertEquals("Expected a single message to be sent", 1, processor.getMessages().size()); + Message message = processor.getMessages().get(0); + assertEquals("[email protected]", message.getFrom()[0].toString()); + assertEquals("X-Mailer Header", "TestingNiFi", message.getHeader("X-Mailer")[0]); + assertEquals("Message Body", message.getContent()); + assertEquals("[email protected]", message.getRecipients(RecipientType.TO)[0].toString()); + assertNull(message.getRecipients(RecipientType.BCC)); + assertNull(message.getRecipients(RecipientType.CC)); + } - final Map<String, String> attributes = new HashMap<>(); + @Test + public void testOutgoingMessageWithOptionalProperties() throws Exception { + // verifies that optional attributes are set on the outgoing Message correctly + runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); + runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + runner.setProperty(PutEmail.FROM, "${from}"); + runner.setProperty(PutEmail.MESSAGE, "${message}"); + runner.setProperty(PutEmail.TO, "${to}"); + runner.setProperty(PutEmail.BCC, "${bcc}"); + runner.setProperty(PutEmail.CC, "${cc}"); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("from", "[email protected] <NiFi>"); + attributes.put("message", "the message body"); + attributes.put("to", "[email protected]"); + attributes.put("bcc", "[email protected]"); + attributes.put("cc", "[email protected]"); runner.enqueue("Some Text".getBytes(), attributes); runner.run(); runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS); + + // Verify that the Message was populated correctly + assertEquals("Expected a single message to be sent", 1, processor.getMessages().size()); + Message message = processor.getMessages().get(0); + assertEquals("\"[email protected]\" <NiFi>", message.getFrom()[0].toString()); + assertEquals("X-Mailer Header", "TestingNiFi", message.getHeader("X-Mailer")[0]); + assertEquals("the message body", message.getContent()); + assertEquals(1, message.getRecipients(RecipientType.TO).length); + assertEquals("[email protected]", message.getRecipients(RecipientType.TO)[0].toString()); + assertEquals(1, message.getRecipients(RecipientType.BCC).length); + assertEquals("[email protected]", message.getRecipients(RecipientType.BCC)[0].toString()); + assertEquals(1, message.getRecipients(RecipientType.CC).length); + assertEquals("[email protected]",message.getRecipients(RecipientType.CC)[0].toString()); + } + + @Test + public void testInvalidAddress() throws Exception { + // verifies that unparsable addresses lead to the flow file being routed to failure + runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); + runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + runner.setProperty(PutEmail.FROM, "[email protected] <invalid"); + runner.setProperty(PutEmail.MESSAGE, "Message Body"); + runner.setProperty(PutEmail.TO, "[email protected]"); + + runner.enqueue("Some Text".getBytes()); + + runner.run(); + + runner.assertQueueEmpty(); runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); + + assertEquals("Expected no messages to be sent", 0, processor.getMessages().size()); + } + @Test + public void testEmptyFrom() throws Exception { + // verifies that if the FROM property evaluates to an empty string at + // runtime the flow file is transferred to failure. + runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); + runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + runner.setProperty(PutEmail.FROM, "${MISSING_PROPERTY}"); + runner.setProperty(PutEmail.MESSAGE, "Message Body"); + runner.setProperty(PutEmail.TO, "[email protected]"); + + runner.enqueue("Some Text".getBytes()); + + runner.run(); + + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); + + assertEquals("Expected no messages to be sent", 0, processor.getMessages().size()); + final LogMessage logMessage = runner.getLogger().getErrorMessages().get(0); + assertTrue(((String)logMessage.getArgs()[2]).contains("Required property 'From' evaluates to an empty string")); } }
