This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5e869aa238d NIFI-15782 Fixed Connection Exception handling for Email
Processors (#11099)
5e869aa238d is described below
commit 5e869aa238dd5e6b92539aa1b4557b2caebec64d
Author: David Handermann <[email protected]>
AuthorDate: Thu Apr 2 14:12:12 2026 -0500
NIFI-15782 Fixed Connection Exception handling for Email Processors (#11099)
---
.../processors/email/AbstractEmailProcessor.java | 32 +++--
.../nifi/processors/email/TestConsumeEmail.java | 150 ++++++++++++++-------
2 files changed, 122 insertions(+), 60 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
index d7c0c5d84e8..9c309734ab7 100644
---
a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
+++
b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
@@ -17,8 +17,10 @@
package org.apache.nifi.processors.email;
import jakarta.mail.Address;
+import jakarta.mail.FolderClosedException;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
+import jakarta.mail.StoreClosedException;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
@@ -305,9 +307,7 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
int passwordEndIndex = urlBuilder.indexOf("@");
urlBuilder.replace(passwordStartIndex, passwordEndIndex, "[password]");
this.displayUrl = protocol + "://" + urlBuilder;
- if (this.logger.isInfoEnabled()) {
- this.logger.info("Connecting to Email server at the following URL:
{}", this.displayUrl);
- }
+ this.logger.info("Connecting to server [{}]", this.displayUrl);
return finalUrl;
}
@@ -384,11 +384,13 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
Object[] messages;
try {
messages = this.messageReceiver.receive();
- } catch (MessagingException e) {
- String errorMsg = "Failed to receive messages from Email
server: [" + e.getClass().getName()
- + " - " + e.getMessage();
- this.getLogger().error(errorMsg);
- throw new ProcessException(errorMsg, e);
+ } catch (final MessagingException e) {
+ if (isClosedException(e)) {
+ // Destroy Receiver to force reinitialization on
subsequent Processor.onTrigger()
+ messageReceiver.destroy();
+ messageReceiver = null;
+ }
+ throw new ProcessException("Failed to receive messages from
server [%s]".formatted(displayUrl), e);
}
if (messages != null) {
@@ -399,6 +401,20 @@ abstract class AbstractEmailProcessor<T extends
AbstractMailReceiver> extends Ab
}
}
+ private boolean isClosedException(final MessagingException exception) {
+ final boolean closedException;
+
+ final Exception nextException = exception.getNextException();
+ if (exception instanceof FolderClosedException || exception instanceof
StoreClosedException) {
+ closedException = true;
+ } else {
+ // Handle IOException and subclasses as closed exceptions
+ closedException = nextException instanceof IOException;
+ }
+
+ return closedException;
+ }
+
/**
* Disposes the message by converting it to a {@link FlowFile} transferring
* it to the REL_SUCCESS relationship.
diff --git
a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java
b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java
index 9911f3f8fc1..32d8b1767a4 100644
---
a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java
+++
b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java
@@ -25,6 +25,7 @@ import jakarta.mail.MessagingException;
import jakarta.mail.Session;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
@@ -37,54 +38,51 @@ import
org.springframework.integration.mail.inbound.AbstractMailReceiver;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TestConsumeEmail {
+class TestConsumeEmail {
- private GreenMail mockIMAP4Server;
- private GreenMail mockPOP3Server;
+ private static final String SENDER_ADDRESS = "[email protected]";
+ private static final String RECIPIENT_ADDRESS = "[email protected]";
+ private static final String RECIPIENT_USER = "recipient-user";
+ private static final String RECIPIENT_PASSWORD =
UUID.randomUUID().toString();
+ private static final String INBOX_FOLDER = "INBOX";
+
+ private GreenMail imapServer;
+ private GreenMail popServer;
private GreenMailUser imapUser;
private GreenMailUser popUser;
@BeforeEach
- public void setUp() {
- mockIMAP4Server = new GreenMail(ServerSetupTest.IMAP);
- mockIMAP4Server.start();
- mockPOP3Server = new GreenMail(ServerSetupTest.POP3);
- mockPOP3Server.start();
-
- imapUser = mockIMAP4Server.setUser("[email protected]", "nifiUserImap",
"nifiPassword");
- popUser = mockPOP3Server.setUser("[email protected]", "nifiUserPop",
"nifiPassword");
- }
+ void setUp() {
+ imapServer = new GreenMail(ServerSetupTest.IMAP);
+ imapServer.start();
+ popServer = new GreenMail(ServerSetupTest.POP3);
+ popServer.start();
- @AfterEach
- public void cleanUp() {
- mockIMAP4Server.stop();
- mockPOP3Server.stop();
+ setUsers();
}
- public void addMessage(String testName, GreenMailUser user) throws
MessagingException {
- Properties prop = new Properties();
- Session session = Session.getDefaultInstance(prop);
- MimeMessage message = new MimeMessage(session);
- message.setFrom(new InternetAddress("[email protected]"));
- message.addRecipient(Message.RecipientType.TO, new
InternetAddress("[email protected]"));
- message.setSubject("Test email" + testName);
- message.setText("test test test chocolate");
- user.deliver(message);
+ @AfterEach
+ void cleanUp() {
+ imapServer.stop();
+ popServer.stop();
}
@Test
- public void testConsumeIMAP4() throws Exception {
+ void testConsumeIMAP4() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ConsumeIMAP());
runner.setProperty(ConsumeIMAP.HOST,
ServerSetupTest.IMAP.getBindAddress());
runner.setProperty(ConsumeIMAP.PORT,
String.valueOf(ServerSetupTest.IMAP.getPort()));
- runner.setProperty(ConsumeIMAP.USER, "nifiUserImap");
- runner.setProperty(ConsumeIMAP.PASSWORD, "nifiPassword");
- runner.setProperty(ConsumeIMAP.FOLDER, "INBOX");
- runner.setProperty(ConsumeIMAP.USE_SSL, "false");
+ runner.setProperty(ConsumeIMAP.USER, RECIPIENT_USER);
+ runner.setProperty(ConsumeIMAP.PASSWORD, RECIPIENT_PASSWORD);
+ runner.setProperty(ConsumeIMAP.FOLDER, INBOX_FOLDER);
+ runner.setProperty(ConsumeIMAP.USE_SSL, Boolean.FALSE.toString());
addMessage("testConsumeImap1", imapUser);
addMessage("testConsumeImap2", imapUser);
@@ -95,26 +93,19 @@ public class TestConsumeEmail {
final List<MockFlowFile> messages =
runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS);
String result = new
String(runner.getContentAsByteArray(messages.getFirst()));
- // Verify body
assertTrue(result.contains("test test test chocolate"));
-
- // Verify sender
- assertTrue(result.contains("[email protected]"));
-
- // Verify subject
+ assertTrue(result.contains(SENDER_ADDRESS));
assertTrue(result.contains("testConsumeImap1"));
-
}
@Test
- public void testConsumePOP3() throws Exception {
+ void testConsumePOP3() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ConsumePOP3());
- runner.setProperty(ConsumeIMAP.HOST,
ServerSetupTest.POP3.getBindAddress());
- runner.setProperty(ConsumeIMAP.PORT,
String.valueOf(ServerSetupTest.POP3.getPort()));
- runner.setProperty(ConsumeIMAP.USER, "nifiUserPop");
- runner.setProperty(ConsumeIMAP.PASSWORD, "nifiPassword");
- runner.setProperty(ConsumeIMAP.FOLDER, "INBOX");
- runner.setProperty(ConsumeIMAP.USE_SSL, "false");
+ runner.setProperty(ConsumePOP3.HOST,
ServerSetupTest.POP3.getBindAddress());
+ runner.setProperty(ConsumePOP3.PORT,
String.valueOf(ServerSetupTest.POP3.getPort()));
+ runner.setProperty(ConsumePOP3.USER, RECIPIENT_USER);
+ runner.setProperty(ConsumePOP3.PASSWORD, RECIPIENT_PASSWORD);
+ runner.setProperty(ConsumePOP3.FOLDER, INBOX_FOLDER);
addMessage("testConsumePop1", popUser);
addMessage("testConsumePop2", popUser);
@@ -125,19 +116,13 @@ public class TestConsumeEmail {
final List<MockFlowFile> messages =
runner.getFlowFilesForRelationship(ConsumePOP3.REL_SUCCESS);
String result = new
String(runner.getContentAsByteArray(messages.getFirst()));
- // Verify body
assertTrue(result.contains("test test test chocolate"));
-
- // Verify sender
- assertTrue(result.contains("[email protected]"));
-
- // Verify subject
+ assertTrue(result.contains(SENDER_ADDRESS));
assertTrue(result.contains("Pop1"));
-
}
@Test
- public void validateProtocol() {
+ void testValidProtocols() {
AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new
ConsumeIMAP();
TestRunner runner = TestRunners.newTestRunner(consume);
runner.setProperty(ConsumeIMAP.USE_SSL, "false");
@@ -154,6 +139,42 @@ public class TestConsumeEmail {
assertEquals("pop3", consume.getProtocol(runner.getProcessContext()));
}
+ @Test
+ void testServerReconnected() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(new ConsumeIMAP());
+ setImapServerProperties(runner);
+
+ addMessage("testServerReconnected-1", imapUser);
+
+ runner.run(1, false, true);
+ runner.assertTransferCount(ConsumeIMAP.REL_SUCCESS, 1);
+
+ imapServer.stop();
+
+ final AssertionError assertionError =
assertThrows(AssertionError.class, () -> runner.run(1, false, false));
+ final Throwable cause = assertionError.getCause();
+ assertInstanceOf(ProcessException.class, cause);
+ final Throwable processExceptionCause = cause.getCause();
+ assertInstanceOf(MessagingException.class, processExceptionCause);
+
+ // Configure replacement server on different port for verified
reconnection
+ final GreenMail replacementServer = new
GreenMail(ServerSetupTest.IMAP.port(0));
+ try {
+ replacementServer.start();
+ final GreenMailUser replacementUser =
replacementServer.setUser(RECIPIENT_ADDRESS, RECIPIENT_USER,
RECIPIENT_PASSWORD);
+ final int replacementPort = replacementServer.getImap().getPort();
+ runner.setProperty(ConsumeIMAP.PORT,
Integer.toString(replacementPort));
+
+ addMessage("testServerReconnected-2", replacementUser);
+
+ runner.clearTransferState();
+ runner.run(1, false, false);
+ runner.assertTransferCount(ConsumeIMAP.REL_SUCCESS, 1);
+ } finally {
+ replacementServer.stop();
+ }
+ }
+
@Test
void testMigrateProperties() {
final TestRunner runner = TestRunners.newTestRunner(ConsumeIMAP.class);
@@ -173,4 +194,29 @@ public class TestConsumeEmail {
final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
assertEquals(expectedRenamed,
propertyMigrationResult.getPropertiesRenamed());
}
+
+ private void setImapServerProperties(final TestRunner runner) {
+ runner.setProperty(ConsumeIMAP.HOST,
ServerSetupTest.IMAP.getBindAddress());
+ runner.setProperty(ConsumeIMAP.PORT,
String.valueOf(ServerSetupTest.IMAP.getPort()));
+ runner.setProperty(ConsumeIMAP.USER, RECIPIENT_USER);
+ runner.setProperty(ConsumeIMAP.PASSWORD, RECIPIENT_PASSWORD);
+ runner.setProperty(ConsumeIMAP.FOLDER, INBOX_FOLDER);
+ runner.setProperty(ConsumeIMAP.USE_SSL, Boolean.FALSE.toString());
+ }
+
+ private void setUsers() {
+ imapUser = imapServer.setUser(RECIPIENT_ADDRESS, RECIPIENT_USER,
RECIPIENT_PASSWORD);
+ popUser = popServer.setUser(RECIPIENT_ADDRESS, RECIPIENT_USER,
RECIPIENT_PASSWORD);
+ }
+
+ void addMessage(final String testName, final GreenMailUser user) throws
MessagingException {
+ Properties prop = new Properties();
+ Session session = Session.getDefaultInstance(prop);
+ MimeMessage message = new MimeMessage(session);
+ message.setFrom(new InternetAddress(SENDER_ADDRESS));
+ message.addRecipient(Message.RecipientType.TO, new
InternetAddress(RECIPIENT_ADDRESS));
+ message.setSubject("Test email" + testName);
+ message.setText("test test test chocolate");
+ user.deliver(message);
+ }
}