This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit fae768c537fd31a0e623da262e443c2437cbe283
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jan 9 16:21:18 2026 -0500

    NIFI-15433: If connector validation throws an Exception keep trying u… 
(#10736)
    
    * NIFI-15433: If connector validation throws an Exception keep trying until 
it completes (with a timeout between retries); some bug fixes
    
    * NIFI-15433: Addressed review feedback
    
    * NIFI-15433: Fixed failing unit tests
---
 .../StandardConnectorValidationTrigger.java        | 46 +++++++++++++++++++---
 .../nifi/components/connector/ConnectorNode.java   |  6 +++
 .../connector/ConnectorValidationTrigger.java      |  7 +++-
 .../connector/StandardConnectorNode.java           | 12 +++---
 .../controller/flow/TestStandardFlowManager.java   |  7 ++++
 5 files changed, 66 insertions(+), 12 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java
index a88cb0aeb4..2e92671d2d 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java
@@ -20,37 +20,73 @@ package org.apache.nifi.components.connector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 
 /**
  * Standard implementation of ConnectorValidationTrigger that submits 
validation
- * tasks to an ExecutorService for asynchronous execution.
+ * tasks to an ScheduledExecutorService for asynchronous execution.
  */
 public class StandardConnectorValidationTrigger implements 
ConnectorValidationTrigger {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardConnectorValidationTrigger.class);
 
-    private final ExecutorService threadPool;
+    private final ScheduledExecutorService threadPool;
     private final BooleanSupplier flowInitialized;
+    private final Set<ConnectorNode> activelyValidating = 
Collections.synchronizedSet(new HashSet<>());
 
-    public StandardConnectorValidationTrigger(final ExecutorService 
threadPool, final BooleanSupplier flowInitialized) {
+    public StandardConnectorValidationTrigger(final ScheduledExecutorService 
threadPool, final BooleanSupplier flowInitialized) {
         this.threadPool = threadPool;
         this.flowInitialized = flowInitialized;
     }
 
     @Override
     public void triggerAsync(final ConnectorNode connector) {
+        // Avoid adding multiple validation tasks for the same connector 
concurrently. This is not 100% thread safe because when a task
+        // is rescheduled, there's a small window where a second thread could 
be scheduled after the Connector is removed from 'activelyValidating' and
+        // before the task is rescheduled. However, this is acceptable because 
having multiple threads validating concurrently is safe, it's just inefficient.
+        final boolean added = activelyValidating.add(connector);
+        if (!added) {
+            logger.debug("Connector {} is already undergoing validation; will 
not trigger another validation concurrently", connector);
+            return;
+        }
+
         if (!flowInitialized.getAsBoolean()) {
             logger.debug("Triggered to perform validation on {} asynchronously 
but flow is not yet initialized so will ignore validation", connector);
+            reschedule(connector, Duration.ofSeconds(1));
             return;
         }
 
-        threadPool.submit(() -> trigger(connector));
+        threadPool.submit(() -> {
+            try {
+                if (connector.isValidationPaused()) {
+                    logger.debug("Connector {} is currently marked as having 
validation paused; will retry in 1 second", connector);
+                    reschedule(connector, Duration.ofSeconds(1));
+                    return;
+                }
+
+                trigger(connector);
+
+                activelyValidating.remove(connector);
+            } catch (final Exception e) {
+                logger.error("Validation for connector {} failed; will retry 
in 5 seconds", connector, e);
+                reschedule(connector, Duration.ofSeconds(5));
+            }
+        });
     }
 
     @Override
     public void trigger(final ConnectorNode connector) {
         connector.performValidation();
     }
+
+    private void reschedule(final ConnectorNode connector, final Duration 
delay) {
+        activelyValidating.remove(connector);
+        threadPool.schedule(() -> triggerAsync(connector), delay.toMillis(), 
TimeUnit.MILLISECONDS);
+    }
 }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index 2bd111c752..d4121b2db9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -113,6 +113,12 @@ public interface ConnectorNode extends 
ComponentAuthorizable, VersionedComponent
      */
     void resumeValidationTrigger();
 
+    /**
+     * Indicates whether validation triggering is currently paused.
+     * @return true if validation triggering is paused, false otherwise
+     */
+    boolean isValidationPaused();
+
     List<ConfigVerificationResult> verifyConfigurationStep(String 
configurationStepName, StepConfiguration configurationOverrides);
 
     List<ConfigVerificationResult> verify();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java
index 9cd4a13d7d..f1e617be32 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java
@@ -24,13 +24,18 @@ public interface ConnectorValidationTrigger {
 
     /**
      * Triggers validation of the given connector to occur asynchronously.
+     * If the Connector's validation is already in progress by another thread, 
this method will
+     * return without triggering another validation. If the Connector's 
validation is paused, this
+     * will schedule the validation to occur once unpaused.
      *
      * @param connector the connector to validate
      */
     void triggerAsync(ConnectorNode connector);
 
     /**
-     * Triggers validation of the given connector immediately in the current 
thread.
+     * Triggers validation of the given connector immediately in the current 
thread. This will
+     * trigger validation even if other validation is currently in progress or 
if the Connector's
+     * validation is paused.
      *
      * @param connector the connector to validate
      */
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index a35e191c83..671e88efea 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -664,6 +664,11 @@ public class StandardConnectorNode implements 
ConnectorNode {
         resetValidationState();
     }
 
+    @Override
+    public boolean isValidationPaused() {
+        return !triggerValidation;
+    }
+
     @Override
     public List<ConfigVerificationResult> verifyConfigurationStep(final String 
stepName, final StepConfiguration configurationOverrides) {
         final List<SecretReference> invalidSecretRefs = new ArrayList<>();
@@ -924,12 +929,7 @@ public class StandardConnectorNode implements 
ConnectorNode {
 
     private void resetValidationState() {
         validationState.set(new ValidationState(ValidationStatus.VALIDATING, 
Collections.emptyList()));
-
-        if (triggerValidation && validationTrigger != null) {
-            validationTrigger.triggerAsync(this);
-        } else {
-            logger.debug("Reset validation state of {} but will not trigger 
async validation because trigger has been paused or is null", this);
-        }
+        validationTrigger.triggerAsync(this);
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java
index 33ec45aac0..473962890f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java
@@ -23,7 +23,9 @@ import org.apache.nifi.components.connector.Connector;
 import org.apache.nifi.components.connector.ConnectorInitializationContext;
 import org.apache.nifi.components.connector.ConnectorNode;
 import org.apache.nifi.components.connector.ConnectorRepository;
+import org.apache.nifi.components.connector.ConnectorValidationTrigger;
 import 
org.apache.nifi.components.connector.StandardConnectorInitializationContext;
+import org.apache.nifi.components.connector.StandardConnectorStateTransition;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.GarbageCollectionLog;
 import org.apache.nifi.controller.MockStateManagerProvider;
@@ -54,6 +56,7 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -139,10 +142,14 @@ public class TestStandardFlowManager {
         
when(flowController.getFlowFileEventRepository()).thenReturn(mock(FlowFileEventRepository.class));
         
when(flowController.getReloadComponent()).thenReturn(mock(ReloadComponent.class));
 
+        final ConnectorValidationTrigger validationTrigger = 
mock(ConnectorValidationTrigger.class);
+        
when(flowController.getConnectorValidationTrigger()).thenReturn(validationTrigger);
+
         final ConnectorRepository connectorRepository = 
mock(ConnectorRepository.class);
         
when(connectorRepository.createInitializationContextBuilder()).thenAnswer(
             invocation -> new 
StandardConnectorInitializationContext.Builder());
         
when(flowController.getConnectorRepository()).thenReturn(connectorRepository);
+        when(connectorRepository.createStateTransition(anyString(), 
anyString())).thenReturn(new StandardConnectorStateTransition("test 
component"));
 
         // Create the connector
         final String type = NopConnector.class.getName();

Reply via email to