This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e869aa238d NIFI-15782 Fixed Connection Exception handling for Email 
Processors (#11099)
5e869aa238d is described below

commit 5e869aa238dd5e6b92539aa1b4557b2caebec64d
Author: David Handermann <[email protected]>
AuthorDate: Thu Apr 2 14:12:12 2026 -0500

    NIFI-15782 Fixed Connection Exception handling for Email Processors (#11099)
---
 .../processors/email/AbstractEmailProcessor.java   |  32 +++--
 .../nifi/processors/email/TestConsumeEmail.java    | 150 ++++++++++++++-------
 2 files changed, 122 insertions(+), 60 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
 
b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
index d7c0c5d84e8..9c309734ab7 100644
--- 
a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
+++ 
b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
@@ -17,8 +17,10 @@
 package org.apache.nifi.processors.email;
 
 import jakarta.mail.Address;
+import jakarta.mail.FolderClosedException;
 import jakarta.mail.Message;
 import jakarta.mail.MessagingException;
+import jakarta.mail.StoreClosedException;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
@@ -305,9 +307,7 @@ abstract class AbstractEmailProcessor<T extends 
AbstractMailReceiver> extends Ab
         int passwordEndIndex = urlBuilder.indexOf("@");
         urlBuilder.replace(passwordStartIndex, passwordEndIndex, "[password]");
         this.displayUrl = protocol + "://" + urlBuilder;
-        if (this.logger.isInfoEnabled()) {
-            this.logger.info("Connecting to Email server at the following URL: 
{}", this.displayUrl);
-        }
+        this.logger.info("Connecting to server [{}]", this.displayUrl);
 
         return finalUrl;
     }
@@ -384,11 +384,13 @@ abstract class AbstractEmailProcessor<T extends 
AbstractMailReceiver> extends Ab
             Object[] messages;
             try {
                 messages = this.messageReceiver.receive();
-            } catch (MessagingException e) {
-                String errorMsg = "Failed to receive messages from Email 
server: [" + e.getClass().getName()
-                        + " - " + e.getMessage();
-                this.getLogger().error(errorMsg);
-                throw new ProcessException(errorMsg, e);
+            } catch (final MessagingException e) {
+                if (isClosedException(e)) {
+                    // Destroy Receiver to force reinitialization on 
subsequent Processor.onTrigger()
+                    messageReceiver.destroy();
+                    messageReceiver = null;
+                }
+                throw new ProcessException("Failed to receive messages from 
server [%s]".formatted(displayUrl), e);
             }
 
             if (messages != null) {
@@ -399,6 +401,20 @@ abstract class AbstractEmailProcessor<T extends 
AbstractMailReceiver> extends Ab
         }
     }
 
+    private boolean isClosedException(final MessagingException exception) {
+        final boolean closedException;
+
+        final Exception nextException = exception.getNextException();
+        if (exception instanceof FolderClosedException || exception instanceof 
StoreClosedException) {
+            closedException = true;
+        } else {
+            // Handle IOException and subclasses as closed exceptions
+            closedException = nextException instanceof IOException;
+        }
+
+        return closedException;
+    }
+
     /**
      * Disposes the message by converting it to a {@link FlowFile} transferring
      * it to the REL_SUCCESS relationship.
diff --git 
a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java
 
b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java
index 9911f3f8fc1..32d8b1767a4 100644
--- 
a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java
+++ 
b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java
@@ -25,6 +25,7 @@ import jakarta.mail.MessagingException;
 import jakarta.mail.Session;
 import jakarta.mail.internet.InternetAddress;
 import jakarta.mail.internet.MimeMessage;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.PropertyMigrationResult;
 import org.apache.nifi.util.TestRunner;
@@ -37,54 +38,51 @@ import 
org.springframework.integration.mail.inbound.AbstractMailReceiver;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestConsumeEmail {
+class TestConsumeEmail {
 
-    private GreenMail mockIMAP4Server;
-    private GreenMail mockPOP3Server;
+    private static final String SENDER_ADDRESS = "[email protected]";
+    private static final String RECIPIENT_ADDRESS = "[email protected]";
+    private static final String RECIPIENT_USER = "recipient-user";
+    private static final String RECIPIENT_PASSWORD = 
UUID.randomUUID().toString();
+    private static final String INBOX_FOLDER = "INBOX";
+
+    private GreenMail imapServer;
+    private GreenMail popServer;
     private GreenMailUser imapUser;
     private GreenMailUser popUser;
 
     @BeforeEach
-    public void setUp() {
-        mockIMAP4Server = new GreenMail(ServerSetupTest.IMAP);
-        mockIMAP4Server.start();
-        mockPOP3Server = new GreenMail(ServerSetupTest.POP3);
-        mockPOP3Server.start();
-
-        imapUser = mockIMAP4Server.setUser("[email protected]", "nifiUserImap", 
"nifiPassword");
-        popUser = mockPOP3Server.setUser("[email protected]", "nifiUserPop", 
"nifiPassword");
-    }
+    void setUp() {
+        imapServer = new GreenMail(ServerSetupTest.IMAP);
+        imapServer.start();
+        popServer = new GreenMail(ServerSetupTest.POP3);
+        popServer.start();
 
-    @AfterEach
-    public void cleanUp() {
-        mockIMAP4Server.stop();
-        mockPOP3Server.stop();
+        setUsers();
     }
 
-    public void addMessage(String testName, GreenMailUser user) throws 
MessagingException {
-        Properties prop = new Properties();
-        Session session = Session.getDefaultInstance(prop);
-        MimeMessage message = new MimeMessage(session);
-        message.setFrom(new InternetAddress("[email protected]"));
-        message.addRecipient(Message.RecipientType.TO, new 
InternetAddress("[email protected]"));
-        message.setSubject("Test email" + testName);
-        message.setText("test test test chocolate");
-        user.deliver(message);
+    @AfterEach
+    void cleanUp() {
+        imapServer.stop();
+        popServer.stop();
     }
 
     @Test
-    public void testConsumeIMAP4() throws Exception {
+    void testConsumeIMAP4() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new ConsumeIMAP());
         runner.setProperty(ConsumeIMAP.HOST, 
ServerSetupTest.IMAP.getBindAddress());
         runner.setProperty(ConsumeIMAP.PORT, 
String.valueOf(ServerSetupTest.IMAP.getPort()));
-        runner.setProperty(ConsumeIMAP.USER, "nifiUserImap");
-        runner.setProperty(ConsumeIMAP.PASSWORD, "nifiPassword");
-        runner.setProperty(ConsumeIMAP.FOLDER, "INBOX");
-        runner.setProperty(ConsumeIMAP.USE_SSL, "false");
+        runner.setProperty(ConsumeIMAP.USER, RECIPIENT_USER);
+        runner.setProperty(ConsumeIMAP.PASSWORD, RECIPIENT_PASSWORD);
+        runner.setProperty(ConsumeIMAP.FOLDER, INBOX_FOLDER);
+        runner.setProperty(ConsumeIMAP.USE_SSL, Boolean.FALSE.toString());
 
         addMessage("testConsumeImap1", imapUser);
         addMessage("testConsumeImap2", imapUser);
@@ -95,26 +93,19 @@ public class TestConsumeEmail {
         final List<MockFlowFile> messages = 
runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS);
         String result = new 
String(runner.getContentAsByteArray(messages.getFirst()));
 
-        // Verify body
         assertTrue(result.contains("test test test chocolate"));
-
-        // Verify sender
-        assertTrue(result.contains("[email protected]"));
-
-        // Verify subject
+        assertTrue(result.contains(SENDER_ADDRESS));
         assertTrue(result.contains("testConsumeImap1"));
-
     }
 
     @Test
-    public void testConsumePOP3() throws Exception {
+    void testConsumePOP3() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new ConsumePOP3());
-        runner.setProperty(ConsumeIMAP.HOST, 
ServerSetupTest.POP3.getBindAddress());
-        runner.setProperty(ConsumeIMAP.PORT, 
String.valueOf(ServerSetupTest.POP3.getPort()));
-        runner.setProperty(ConsumeIMAP.USER, "nifiUserPop");
-        runner.setProperty(ConsumeIMAP.PASSWORD, "nifiPassword");
-        runner.setProperty(ConsumeIMAP.FOLDER, "INBOX");
-        runner.setProperty(ConsumeIMAP.USE_SSL, "false");
+        runner.setProperty(ConsumePOP3.HOST, 
ServerSetupTest.POP3.getBindAddress());
+        runner.setProperty(ConsumePOP3.PORT, 
String.valueOf(ServerSetupTest.POP3.getPort()));
+        runner.setProperty(ConsumePOP3.USER, RECIPIENT_USER);
+        runner.setProperty(ConsumePOP3.PASSWORD, RECIPIENT_PASSWORD);
+        runner.setProperty(ConsumePOP3.FOLDER, INBOX_FOLDER);
 
         addMessage("testConsumePop1", popUser);
         addMessage("testConsumePop2", popUser);
@@ -125,19 +116,13 @@ public class TestConsumeEmail {
         final List<MockFlowFile> messages = 
runner.getFlowFilesForRelationship(ConsumePOP3.REL_SUCCESS);
         String result = new 
String(runner.getContentAsByteArray(messages.getFirst()));
 
-        // Verify body
         assertTrue(result.contains("test test test chocolate"));
-
-        // Verify sender
-        assertTrue(result.contains("[email protected]"));
-
-        // Verify subject
+        assertTrue(result.contains(SENDER_ADDRESS));
         assertTrue(result.contains("Pop1"));
-
     }
 
     @Test
-    public void validateProtocol() {
+    void testValidProtocols() {
         AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new 
ConsumeIMAP();
         TestRunner runner = TestRunners.newTestRunner(consume);
         runner.setProperty(ConsumeIMAP.USE_SSL, "false");
@@ -154,6 +139,42 @@ public class TestConsumeEmail {
         assertEquals("pop3", consume.getProtocol(runner.getProcessContext()));
     }
 
+    @Test
+    void testServerReconnected() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new ConsumeIMAP());
+        setImapServerProperties(runner);
+
+        addMessage("testServerReconnected-1", imapUser);
+
+        runner.run(1, false, true);
+        runner.assertTransferCount(ConsumeIMAP.REL_SUCCESS, 1);
+
+        imapServer.stop();
+
+        final AssertionError assertionError = 
assertThrows(AssertionError.class, () -> runner.run(1, false, false));
+        final Throwable cause = assertionError.getCause();
+        assertInstanceOf(ProcessException.class, cause);
+        final Throwable processExceptionCause = cause.getCause();
+        assertInstanceOf(MessagingException.class, processExceptionCause);
+
+        // Configure replacement server on different port for verified 
reconnection
+        final GreenMail replacementServer = new 
GreenMail(ServerSetupTest.IMAP.port(0));
+        try {
+            replacementServer.start();
+            final GreenMailUser replacementUser = 
replacementServer.setUser(RECIPIENT_ADDRESS, RECIPIENT_USER, 
RECIPIENT_PASSWORD);
+            final int replacementPort = replacementServer.getImap().getPort();
+            runner.setProperty(ConsumeIMAP.PORT, 
Integer.toString(replacementPort));
+
+            addMessage("testServerReconnected-2", replacementUser);
+
+            runner.clearTransferState();
+            runner.run(1, false, false);
+            runner.assertTransferCount(ConsumeIMAP.REL_SUCCESS, 1);
+        } finally {
+            replacementServer.stop();
+        }
+    }
+
     @Test
     void testMigrateProperties() {
         final TestRunner runner = TestRunners.newTestRunner(ConsumeIMAP.class);
@@ -173,4 +194,29 @@ public class TestConsumeEmail {
         final PropertyMigrationResult propertyMigrationResult = 
runner.migrateProperties();
         assertEquals(expectedRenamed, 
propertyMigrationResult.getPropertiesRenamed());
     }
+
+    private void setImapServerProperties(final TestRunner runner) {
+        runner.setProperty(ConsumeIMAP.HOST, 
ServerSetupTest.IMAP.getBindAddress());
+        runner.setProperty(ConsumeIMAP.PORT, 
String.valueOf(ServerSetupTest.IMAP.getPort()));
+        runner.setProperty(ConsumeIMAP.USER, RECIPIENT_USER);
+        runner.setProperty(ConsumeIMAP.PASSWORD, RECIPIENT_PASSWORD);
+        runner.setProperty(ConsumeIMAP.FOLDER, INBOX_FOLDER);
+        runner.setProperty(ConsumeIMAP.USE_SSL, Boolean.FALSE.toString());
+    }
+
+    private void setUsers() {
+        imapUser = imapServer.setUser(RECIPIENT_ADDRESS, RECIPIENT_USER, 
RECIPIENT_PASSWORD);
+        popUser = popServer.setUser(RECIPIENT_ADDRESS, RECIPIENT_USER, 
RECIPIENT_PASSWORD);
+    }
+
+    void addMessage(final String testName, final GreenMailUser user) throws 
MessagingException {
+        Properties prop = new Properties();
+        Session session = Session.getDefaultInstance(prop);
+        MimeMessage message = new MimeMessage(session);
+        message.setFrom(new InternetAddress(SENDER_ADDRESS));
+        message.addRecipient(Message.RecipientType.TO, new 
InternetAddress(RECIPIENT_ADDRESS));
+        message.setSubject("Test email" + testName);
+        message.setText("test test test chocolate");
+        user.deliver(message);
+    }
 }

Reply via email to