This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 3288c0dbfd NIFI-15433: If connector validation throws an Exception
keep trying u… (#10736)
3288c0dbfd is described below
commit 3288c0dbfd1191041ee12cb1fffd343060d141cb
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jan 9 16:21:18 2026 -0500
NIFI-15433: If connector validation throws an Exception keep trying u…
(#10736)
* NIFI-15433: If connector validation throws an Exception keep trying until
it completes (with a timeout between retries); some bug fixes
* NIFI-15433: Addressed review feedback
* NIFI-15433: Fixed failing unit tests
---
.../StandardConnectorValidationTrigger.java | 46 +++++++++++++++++++---
.../nifi/components/connector/ConnectorNode.java | 6 +++
.../connector/ConnectorValidationTrigger.java | 7 +++-
.../connector/StandardConnectorNode.java | 12 +++---
.../controller/flow/TestStandardFlowManager.java | 7 ++++
5 files changed, 66 insertions(+), 12 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java
index a88cb0aeb4..2e92671d2d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/StandardConnectorValidationTrigger.java
@@ -20,37 +20,73 @@ package org.apache.nifi.components.connector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
/**
* Standard implementation of ConnectorValidationTrigger that submits
validation
- * tasks to an ExecutorService for asynchronous execution.
+ * tasks to an ScheduledExecutorService for asynchronous execution.
*/
public class StandardConnectorValidationTrigger implements
ConnectorValidationTrigger {
private static final Logger logger =
LoggerFactory.getLogger(StandardConnectorValidationTrigger.class);
- private final ExecutorService threadPool;
+ private final ScheduledExecutorService threadPool;
private final BooleanSupplier flowInitialized;
+ private final Set<ConnectorNode> activelyValidating =
Collections.synchronizedSet(new HashSet<>());
- public StandardConnectorValidationTrigger(final ExecutorService
threadPool, final BooleanSupplier flowInitialized) {
+ public StandardConnectorValidationTrigger(final ScheduledExecutorService
threadPool, final BooleanSupplier flowInitialized) {
this.threadPool = threadPool;
this.flowInitialized = flowInitialized;
}
@Override
public void triggerAsync(final ConnectorNode connector) {
+ // Avoid adding multiple validation tasks for the same connector
concurrently. This is not 100% thread safe because when a task
+ // is rescheduled, there's a small window where a second thread could
be scheduled after the Connector is removed from 'activelyValidating' and
+ // before the task is rescheduled. However, this is acceptable because
having multiple threads validating concurrently is safe, it's just inefficient.
+ final boolean added = activelyValidating.add(connector);
+ if (!added) {
+ logger.debug("Connector {} is already undergoing validation; will
not trigger another validation concurrently", connector);
+ return;
+ }
+
if (!flowInitialized.getAsBoolean()) {
logger.debug("Triggered to perform validation on {} asynchronously
but flow is not yet initialized so will ignore validation", connector);
+ reschedule(connector, Duration.ofSeconds(1));
return;
}
- threadPool.submit(() -> trigger(connector));
+ threadPool.submit(() -> {
+ try {
+ if (connector.isValidationPaused()) {
+ logger.debug("Connector {} is currently marked as having
validation paused; will retry in 1 second", connector);
+ reschedule(connector, Duration.ofSeconds(1));
+ return;
+ }
+
+ trigger(connector);
+
+ activelyValidating.remove(connector);
+ } catch (final Exception e) {
+ logger.error("Validation for connector {} failed; will retry
in 5 seconds", connector, e);
+ reschedule(connector, Duration.ofSeconds(5));
+ }
+ });
}
@Override
public void trigger(final ConnectorNode connector) {
connector.performValidation();
}
+
+ private void reschedule(final ConnectorNode connector, final Duration
delay) {
+ activelyValidating.remove(connector);
+ threadPool.schedule(() -> triggerAsync(connector), delay.toMillis(),
TimeUnit.MILLISECONDS);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index 2bd111c752..d4121b2db9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -113,6 +113,12 @@ public interface ConnectorNode extends
ComponentAuthorizable, VersionedComponent
*/
void resumeValidationTrigger();
+ /**
+ * Indicates whether validation triggering is currently paused.
+ * @return true if validation triggering is paused, false otherwise
+ */
+ boolean isValidationPaused();
+
List<ConfigVerificationResult> verifyConfigurationStep(String
configurationStepName, StepConfiguration configurationOverrides);
List<ConfigVerificationResult> verify();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java
index 9cd4a13d7d..f1e617be32 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorValidationTrigger.java
@@ -24,13 +24,18 @@ public interface ConnectorValidationTrigger {
/**
* Triggers validation of the given connector to occur asynchronously.
+ * If the Connector's validation is already in progress by another thread,
this method will
+ * return without triggering another validation. If the Connector's
validation is paused, this
+ * will schedule the validation to occur once unpaused.
*
* @param connector the connector to validate
*/
void triggerAsync(ConnectorNode connector);
/**
- * Triggers validation of the given connector immediately in the current
thread.
+ * Triggers validation of the given connector immediately in the current
thread. This will
+ * trigger validation even if other validation is currently in progress or
if the Connector's
+ * validation is paused.
*
* @param connector the connector to validate
*/
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index a35e191c83..671e88efea 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -664,6 +664,11 @@ public class StandardConnectorNode implements
ConnectorNode {
resetValidationState();
}
+ @Override
+ public boolean isValidationPaused() {
+ return !triggerValidation;
+ }
+
@Override
public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final StepConfiguration configurationOverrides) {
final List<SecretReference> invalidSecretRefs = new ArrayList<>();
@@ -924,12 +929,7 @@ public class StandardConnectorNode implements
ConnectorNode {
private void resetValidationState() {
validationState.set(new ValidationState(ValidationStatus.VALIDATING,
Collections.emptyList()));
-
- if (triggerValidation && validationTrigger != null) {
- validationTrigger.triggerAsync(this);
- } else {
- logger.debug("Reset validation state of {} but will not trigger
async validation because trigger has been paused or is null", this);
- }
+ validationTrigger.triggerAsync(this);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java
index 33ec45aac0..473962890f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/TestStandardFlowManager.java
@@ -23,7 +23,9 @@ import org.apache.nifi.components.connector.Connector;
import org.apache.nifi.components.connector.ConnectorInitializationContext;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
+import org.apache.nifi.components.connector.ConnectorValidationTrigger;
import
org.apache.nifi.components.connector.StandardConnectorInitializationContext;
+import org.apache.nifi.components.connector.StandardConnectorStateTransition;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.GarbageCollectionLog;
import org.apache.nifi.controller.MockStateManagerProvider;
@@ -54,6 +56,7 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -139,10 +142,14 @@ public class TestStandardFlowManager {
when(flowController.getFlowFileEventRepository()).thenReturn(mock(FlowFileEventRepository.class));
when(flowController.getReloadComponent()).thenReturn(mock(ReloadComponent.class));
+ final ConnectorValidationTrigger validationTrigger =
mock(ConnectorValidationTrigger.class);
+
when(flowController.getConnectorValidationTrigger()).thenReturn(validationTrigger);
+
final ConnectorRepository connectorRepository =
mock(ConnectorRepository.class);
when(connectorRepository.createInitializationContextBuilder()).thenAnswer(
invocation -> new
StandardConnectorInitializationContext.Builder());
when(flowController.getConnectorRepository()).thenReturn(connectorRepository);
+ when(connectorRepository.createStateTransition(anyString(),
anyString())).thenReturn(new StandardConnectorStateTransition("test
component"));
// Create the connector
final String type = NopConnector.class.getName();