This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit fae768c537fd31a0e623da262e443c2437cbe283 Author: Mark Payne <[email protected]> AuthorDate: Fri Jan 9 16:21:18 2026 -0500 NIFI-15433: If connector validation throws an Exception keep trying u… (#10736) * NIFI-15433: If connector validation throws an Exception keep trying until it completes (with a timeout between retries); some bug fixes * NIFI-15433: Addressed review feedback * NIFI-15433: Fixed failing unit tests --- .../StandardConnectorValidationTrigger.java | 46 +++++++++++++++++++--- .../nifi/components/connector/ConnectorNode.java | 6 +++ .../connector/ConnectorValidationTrigger.java | 7 +++- .../connector/StandardConnectorNode.java | 12 +++--- .../controller/flow/TestStandardFlowManager.java | 7 ++++ 5 files changed, 66 insertions(+), 12 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java index a88cb0aeb4..2e92671d2d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java @@ -20,37 +20,73 @@ package org.apache.nifi.components.connector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; /** * Standard implementation of ConnectorValidationTrigger that submits validation - * tasks to an ExecutorService for asynchronous execution. + * tasks to an ScheduledExecutorService for asynchronous execution. */ public class StandardConnectorValidationTrigger implements ConnectorValidationTrigger { private static final Logger logger = LoggerFactory.getLogger(StandardConnectorValidationTrigger.class); - private final ExecutorService threadPool; + private final ScheduledExecutorService threadPool; private final BooleanSupplier flowInitialized; + private final Set<ConnectorNode> activelyValidating = Collections.synchronizedSet(new HashSet<>()); - public StandardConnectorValidationTrigger(final ExecutorService threadPool, final BooleanSupplier flowInitialized) { + public StandardConnectorValidationTrigger(final ScheduledExecutorService threadPool, final BooleanSupplier flowInitialized) { this.threadPool = threadPool; this.flowInitialized = flowInitialized; } @Override public void triggerAsync(final ConnectorNode connector) { + // Avoid adding multiple validation tasks for the same connector concurrently. This is not 100% thread safe because when a task + // is rescheduled, there's a small window where a second thread could be scheduled after the Connector is removed from 'activelyValidating' and + // before the task is rescheduled. However, this is acceptable because having multiple threads validating concurrently is safe, it's just inefficient. + final boolean added = activelyValidating.add(connector); + if (!added) { + logger.debug("Connector {} is already undergoing validation; will not trigger another validation concurrently", connector); + return; + } + if (!flowInitialized.getAsBoolean()) { logger.debug("Triggered to perform validation on {} asynchronously but flow is not yet initialized so will ignore validation", connector); + reschedule(connector, Duration.ofSeconds(1)); return; } - threadPool.submit(() -> trigger(connector)); + threadPool.submit(() -> { + try { + if (connector.isValidationPaused()) { + logger.debug("Connector {} is currently marked as having validation paused; will retry in 1 second", connector); + reschedule(connector, Duration.ofSeconds(1)); + return; + } + + trigger(connector); + + activelyValidating.remove(connector); + } catch (final Exception e) { + logger.error("Validation for connector {} failed; will retry in 5 seconds", connector, e); + reschedule(connector, Duration.ofSeconds(5)); + } + }); } @Override public void trigger(final ConnectorNode connector) { connector.performValidation(); } + + private void reschedule(final ConnectorNode connector, final Duration delay) { + activelyValidating.remove(connector); + threadPool.schedule(() -> triggerAsync(connector), delay.toMillis(), TimeUnit.MILLISECONDS); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java index 2bd111c752..d4121b2db9 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java @@ -113,6 +113,12 @@ public interface ConnectorNode extends ComponentAuthorizable, VersionedComponent */ void resumeValidationTrigger(); + /** + * Indicates whether validation triggering is currently paused. + * @return true if validation triggering is paused, false otherwise + */ + boolean isValidationPaused(); + List<ConfigVerificationResult> verifyConfigurationStep(String configurationStepName, StepConfiguration configurationOverrides); List<ConfigVerificationResult> verify(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java index 9cd4a13d7d..f1e617be32 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java @@ -24,13 +24,18 @@ public interface ConnectorValidationTrigger { /** * Triggers validation of the given connector to occur asynchronously. + * If the Connector's validation is already in progress by another thread, this method will + * return without triggering another validation. If the Connector's validation is paused, this + * will schedule the validation to occur once unpaused. * * @param connector the connector to validate */ void triggerAsync(ConnectorNode connector); /** - * Triggers validation of the given connector immediately in the current thread. + * Triggers validation of the given connector immediately in the current thread. This will + * trigger validation even if other validation is currently in progress or if the Connector's + * validation is paused. * * @param connector the connector to validate */ diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java index a35e191c83..671e88efea 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java @@ -664,6 +664,11 @@ public class StandardConnectorNode implements ConnectorNode { resetValidationState(); } + @Override + public boolean isValidationPaused() { + return !triggerValidation; + } + @Override public List<ConfigVerificationResult> verifyConfigurationStep(final String stepName, final StepConfiguration configurationOverrides) { final List<SecretReference> invalidSecretRefs = new ArrayList<>(); @@ -924,12 +929,7 @@ public class StandardConnectorNode implements ConnectorNode { private void resetValidationState() { validationState.set(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList())); - - if (triggerValidation && validationTrigger != null) { - validationTrigger.triggerAsync(this); - } else { - logger.debug("Reset validation state of {} but will not trigger async validation because trigger has been paused or is null", this); - } + validationTrigger.triggerAsync(this); } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java index 33ec45aac0..473962890f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java @@ -23,7 +23,9 @@ import org.apache.nifi.components.connector.Connector; import org.apache.nifi.components.connector.ConnectorInitializationContext; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorValidationTrigger; import org.apache.nifi.components.connector.StandardConnectorInitializationContext; +import org.apache.nifi.components.connector.StandardConnectorStateTransition; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.GarbageCollectionLog; import org.apache.nifi.controller.MockStateManagerProvider; @@ -54,6 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -139,10 +142,14 @@ public class TestStandardFlowManager { when(flowController.getFlowFileEventRepository()).thenReturn(mock(FlowFileEventRepository.class)); when(flowController.getReloadComponent()).thenReturn(mock(ReloadComponent.class)); + final ConnectorValidationTrigger validationTrigger = mock(ConnectorValidationTrigger.class); + when(flowController.getConnectorValidationTrigger()).thenReturn(validationTrigger); + final ConnectorRepository connectorRepository = mock(ConnectorRepository.class); when(connectorRepository.createInitializationContextBuilder()).thenAnswer( invocation -> new StandardConnectorInitializationContext.Builder()); when(flowController.getConnectorRepository()).thenReturn(connectorRepository); + when(connectorRepository.createStateTransition(anyString(), anyString())).thenReturn(new StandardConnectorStateTransition("test component")); // Create the connector final String type = NopConnector.class.getName();
