This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 89f23372bd NIFI-15544: If a Connector requires processors or
controller services that are unavailable, make Connector invalid but not
ghosted (#10851)
89f23372bd is described below
commit 89f23372bdfd31255351d2fb67ab4e92455a71b1
Author: Mark Payne <[email protected]>
AuthorDate: Thu Feb 5 12:56:29 2026 -0500
NIFI-15544: If a Connector requires processors or controller services that
are unavailable, make Connector invalid but not ghosted (#10851)
---
.../server/StandardConnectorMockServer.java | 11 ++
.../mock/connectors/tests/CreateConnectorIT.java | 17 +++
.../mock/connectors/MissingBundleConnector.java | 108 +++++++++++++++
.../org.apache.nifi.components.connector.Connector | 1 +
.../nifi/components/connector/GhostConnector.java | 8 +-
.../connector/StandardConnectorNode.java | 146 ++++++++++++++++++---
.../apache/nifi/controller/ExtensionBuilder.java | 10 +-
.../connector/MissingBundleConnector.java | 111 ++++++++++++++++
.../connector/StandardConnectorNodeIT.java | 17 +++
.../services/org.apache.nifi.processor.Processor | 2 +
10 files changed, 406 insertions(+), 25 deletions(-)
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
index 685e84748a..d23e6400f4 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
@@ -28,11 +28,13 @@ import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.connector.AssetReference;
+import org.apache.nifi.components.connector.Connector;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.components.connector.ConnectorValueReference;
import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.GhostConnector;
import org.apache.nifi.components.connector.SecretReference;
import
org.apache.nifi.components.connector.StandaloneConnectorRequestReplicator;
import org.apache.nifi.components.connector.StepConfiguration;
@@ -188,6 +190,15 @@ public class StandardConnectorMockServer implements
ConnectorMockServer {
final BundleCoordinate bundleCoordinate =
bundles.getFirst().getBundleDetails().getCoordinate();
connectorNode =
flowController.getFlowManager().createConnector(connectorClassName,
CONNECTOR_ID, bundleCoordinate, true, true);
+
+ if (connectorNode.isExtensionMissing()) {
+ final Connector connector = connectorNode.getConnector();
+ if (connector instanceof final GhostConnector ghostConnector) {
+ throw new IllegalStateException("Failed to create Connector of
type " + connectorClassName, ghostConnector.getCauseOfGhost());
+ } else {
+ throw new IllegalStateException("Failed to create Connector of
type " + connectorClassName);
+ }
+ }
}
@Override
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-integration-tests/src/test/java/org/apache/nifi/mock/connectors/tests/CreateConnectorIT.java
b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-integration-tests/src/test/java/org/apache/nifi/mock/connectors/tests/CreateConnectorIT.java
index dccbaaa705..950b945003 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-integration-tests/src/test/java/org/apache/nifi/mock/connectors/tests/CreateConnectorIT.java
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-integration-tests/src/test/java/org/apache/nifi/mock/connectors/tests/CreateConnectorIT.java
@@ -24,6 +24,9 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class CreateConnectorIT {
@Test
@@ -37,4 +40,18 @@ public class CreateConnectorIT {
testRunner.stopConnector();
}
}
+
+ @Test
+ public void testConnectorWithMissingBundleThrowsException() {
+ final IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> {
+ new StandardConnectorTestRunner.Builder()
+
.connectorClassName("org.apache.nifi.mock.connectors.MissingBundleConnector")
+ .narLibraryDirectory(new File("target/libDir"))
+ .build();
+ });
+
+ final String message = exception.getMessage();
+
assertTrue(message.contains("com.example.nonexistent:missing-nar:1.0.0"),
"Expected exception message to contain missing bundle coordinates but was: " +
message);
+
assertTrue(message.contains("com.example.nonexistent.MissingProcessor"),
"Expected exception message to contain missing processor type but was: " +
message);
+ }
}
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/java/org/apache/nifi/mock/connectors/MissingBundleConnector.java
b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/java/org/apache/nifi/mock/connectors/MissingBundleConnector.java
new file mode 100644
index 0000000000..d49674ffd0
--- /dev/null
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/java/org/apache/nifi/mock/connectors/MissingBundleConnector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.mock.connectors;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * A test connector that returns an initial flow containing a processor with a
bundle that does not exist.
+ * This is used to test the behavior when a connector's initial flow
references unavailable components.
+ */
+public class MissingBundleConnector extends AbstractConnector {
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+ rootGroup.setIdentifier(UUID.randomUUID().toString());
+ rootGroup.setInstanceIdentifier(UUID.randomUUID().toString());
+ rootGroup.setName("Missing Bundle Connector Flow");
+ rootGroup.setPosition(new Position(0.0, 0.0));
+ rootGroup.setProcessGroups(new HashSet<>());
+ rootGroup.setConnections(new HashSet<>());
+ rootGroup.setInputPorts(new HashSet<>());
+ rootGroup.setOutputPorts(new HashSet<>());
+ rootGroup.setControllerServices(new HashSet<>());
+ rootGroup.setFunnels(new HashSet<>());
+ rootGroup.setLabels(new HashSet<>());
+
+ final VersionedProcessor missingProcessor = new VersionedProcessor();
+ missingProcessor.setIdentifier(UUID.randomUUID().toString());
+ missingProcessor.setInstanceIdentifier(UUID.randomUUID().toString());
+ missingProcessor.setName("Missing Processor");
+ missingProcessor.setType("com.example.nonexistent.MissingProcessor");
+ missingProcessor.setPosition(new Position(100.0, 100.0));
+ missingProcessor.setScheduledState(ScheduledState.ENABLED);
+ missingProcessor.setSchedulingPeriod("0 sec");
+ missingProcessor.setSchedulingStrategy("TIMER_DRIVEN");
+ missingProcessor.setExecutionNode("ALL");
+ missingProcessor.setPenaltyDuration("30 sec");
+ missingProcessor.setYieldDuration("1 sec");
+ missingProcessor.setBulletinLevel("WARN");
+ missingProcessor.setRunDurationMillis(0L);
+ missingProcessor.setConcurrentlySchedulableTaskCount(1);
+ missingProcessor.setAutoTerminatedRelationships(new HashSet<>());
+ missingProcessor.setProperties(Map.of());
+ missingProcessor.setPropertyDescriptors(Map.of());
+ missingProcessor.setGroupIdentifier(rootGroup.getIdentifier());
+
+ final Bundle missingBundle = new Bundle();
+ missingBundle.setGroup("com.example.nonexistent");
+ missingBundle.setArtifact("missing-nar");
+ missingBundle.setVersion("1.0.0");
+ missingProcessor.setBundle(missingBundle);
+
+ rootGroup.setProcessors(new HashSet<>(List.of(missingProcessor)));
+
+ final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
+ externalFlow.setFlowContents(rootGroup);
+ return externalFlow;
+ }
+
+ @Override
+ protected void onStepConfigured(final String stepName, final FlowContext
flowContext) {
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return List.of();
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingContext, final
FlowContext activeContext) throws FlowUpdateException {
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final Map<String, String> overrides, final FlowContext
workingContext) {
+ return List.of();
+ }
+}
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index c6cbc30195..7f2959091e 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.nifi.mock.connectors.GenerateAndLog
+org.apache.nifi.mock.connectors.MissingBundleConnector
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
index cd8a614b1b..0508821e9d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
@@ -33,10 +33,12 @@ public class GhostConnector implements Connector {
private final String canonicalClassName;
private final List<ValidationResult> validationResults;
private final List<ConfigVerificationResult> configVerificationResults;
+ private final Exception causeOfGhost;
- public GhostConnector(final String identifier, final String
canonicalClassName) {
+ public GhostConnector(final String identifier, final String
canonicalClassName, final Exception causeOfGhost) {
this.identifier = identifier;
this.canonicalClassName = canonicalClassName;
+ this.causeOfGhost = causeOfGhost;
validationResults = List.of(new ValidationResult.Builder()
.subject("Missing Connector")
@@ -51,6 +53,10 @@ public class GhostConnector implements Connector {
.build());
}
+ public Exception getCauseOfGhost() {
+ return causeOfGhost;
+ }
+
@Override
public void initialize(final ConnectorInitializationContext initContext) {
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 77a577d86a..038ce10c36 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -100,6 +100,7 @@ public class StandardConnectorNode implements ConnectorNode
{
private final boolean extensionMissing;
private volatile boolean triggerValidation = true;
private final AtomicReference<CompletableFuture<Void>> drainFutureRef =
new AtomicReference<>();
+ private volatile ValidationResult unresolvedBundleValidationResult = null;
private volatile FrameworkFlowContext workingFlowContext;
@@ -709,11 +710,18 @@ public class StandardConnectorNode implements
ConnectorNode {
if (initialFlow == null) {
logger.info("{} has no initial flow to load", this);
} else {
- logger.info("Loading initial flow for {}", this);
- // Update all RUNNING components to ENABLED before applying the
initial flow so that components
- // are not started before being configured.
- stopComponents(initialFlow.getFlowContents());
- initializationContext.updateFlow(activeFlowContext, initialFlow,
BundleCompatibility.RESOLVE_BUNDLE);
+ final ValidationResult unresolvedBundleResult =
validateBundlesCanBeResolved(initialFlow.getFlowContents(),
initializationContext.getComponentBundleLookup());
+
+ if (unresolvedBundleResult != null) {
+ logger.error("Cannot load initial flow for {} because some
component bundles cannot be resolved: {}", this,
unresolvedBundleResult.getExplanation());
+ unresolvedBundleValidationResult = unresolvedBundleResult;
+ } else {
+ logger.info("Loading initial flow for {}", this);
+ // Update all RUNNING components to ENABLED before applying
the initial flow so that components
+ // are not started before being configured.
+ stopComponents(initialFlow.getFlowContents());
+ initializationContext.updateFlow(activeFlowContext,
initialFlow, BundleCompatibility.RESOLVE_BUNDLE);
+ }
}
resetValidationState();
@@ -738,6 +746,101 @@ public class StandardConnectorNode implements
ConnectorNode {
}
}
+ /**
+ * Ensures that all bundles required by the given Process Group can be
resolved. We do this in order to make the Connector
+ * invalid if any Processor or Controller Service cannot be properly
instantiated due to missing bundles. We intentionally
+ * differentiate between making the Connector invalid versus Ghosting the
Connector for a few reasons:
+ * <ul>
+ * <li>
+ * Ghosting the Connector would prevent us from even getting the
Configuration Steps, and it results in all Properties becoming sensitive. This
can lead to confusion.
+ * </li>
+ * <li>
+ * The flow may change dynamically and so it's possible for a
Connector to be valid given its initial flow and then become invalid
+ * based on configuration because the new configuration requires a new
component that is unavailable. We would not suddenly change from
+ * a valid Connector to a Ghosted Connector, we could only become
invalid. We do not want a missing component in the Initial Flow to be
+ * treated differently than a missing component from a subsequent flow
update.
+ * </li>
+ * <li>
+ * Ghosting should be reserved for situations where the extension
itself is missing.
+ * </li>
+ * </ul>
+ *
+ * @param group the process group to validate
+ * @param bundleLookup the bundle lookup
+ * @return a ValidationResult describing the missing bundles if any are
missing; null if all bundles can be resolved
+ */
+ private ValidationResult validateBundlesCanBeResolved(final
VersionedProcessGroup group, final ComponentBundleLookup bundleLookup) {
+ final Set<String> missingBundles = new HashSet<>();
+ final Set<String> missingProcessorTypes = new HashSet<>();
+ final Set<String> missingControllerServiceTypes = new HashSet<>();
+
+ collectUnresolvedBundles(group, bundleLookup, missingBundles,
missingProcessorTypes, missingControllerServiceTypes);
+
+ if (missingBundles.isEmpty()) {
+ return null;
+ }
+
+ final StringBuilder explanation = new StringBuilder();
+ explanation.append("%d Processors and %d Controller Services
unavailable from %d missing bundles".formatted(
+ missingProcessorTypes.size(),
missingControllerServiceTypes.size(), missingBundles.size()));
+ explanation.append("\nMissing Bundles: %s".formatted(missingBundles));
+ if (!missingProcessorTypes.isEmpty()) {
+ explanation.append("\nMissing Processors:
%s".formatted(missingProcessorTypes));
+ }
+ if (!missingControllerServiceTypes.isEmpty()) {
+ explanation.append("\nMissing Controller Services:
%s".formatted(missingControllerServiceTypes));
+ }
+
+ return new ValidationResult.Builder()
+ .subject("Missing Bundles")
+ .valid(false)
+ .explanation(explanation.toString())
+ .build();
+ }
+
+ private void collectUnresolvedBundles(final VersionedProcessGroup group,
final ComponentBundleLookup bundleLookup,
+ final Set<String> missingBundles,
final Set<String> missingProcessorTypes,
+ final Set<String>
missingControllerServiceTypes) {
+ if (group.getProcessors() != null) {
+ for (final VersionedProcessor processor : group.getProcessors()) {
+ if (!isBundleResolvable(processor.getType(),
processor.getBundle(), bundleLookup)) {
+ missingBundles.add(formatBundle(processor.getBundle()));
+ missingProcessorTypes.add(processor.getType());
+ }
+ }
+ }
+
+ if (group.getControllerServices() != null) {
+ for (final VersionedControllerService service :
group.getControllerServices()) {
+ if (!isBundleResolvable(service.getType(),
service.getBundle(), bundleLookup)) {
+ missingBundles.add(formatBundle(service.getBundle()));
+ missingControllerServiceTypes.add(service.getType());
+ }
+ }
+ }
+
+ if (group.getProcessGroups() != null) {
+ for (final VersionedProcessGroup childGroup :
group.getProcessGroups()) {
+ collectUnresolvedBundles(childGroup, bundleLookup,
missingBundles, missingProcessorTypes, missingControllerServiceTypes);
+ }
+ }
+ }
+
+ private String formatBundle(final Bundle bundle) {
+ return "%s:%s:%s".formatted(bundle.getGroup(), bundle.getArtifact(),
bundle.getVersion());
+ }
+
+ private boolean isBundleResolvable(final String componentType, final
Bundle currentBundle, final ComponentBundleLookup bundleLookup) {
+ final List<Bundle> availableBundles =
bundleLookup.getAvailableBundles(componentType);
+
+ if (availableBundles.contains(currentBundle)) {
+ return true;
+ }
+
+ // With RESOLVE_BUNDLE, a bundle can be resolved only if exactly one
alternative bundle is available
+ return availableBundles.size() == 1;
+ }
+
private void recreateWorkingFlowContext() {
destroyWorkingContext();
workingFlowContext =
flowContextFactory.createWorkingFlowContext(identifier,
@@ -1233,22 +1336,27 @@ public class StandardConnectorNode implements
ConnectorNode {
logger.debug("Performing validation for {}", this);
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
getConnector().getClass(), getIdentifier())) {
- final ConnectorValidationContext validationContext =
createValidationContext(activeFlowContext);
-
final List<ValidationResult> allResults = new ArrayList<>();
- validateManagedFlowComponents(allResults);
- validatePropertyReferences(allResults);
- if (allResults.isEmpty()) {
- try {
- final List<ValidationResult> implValidationResults =
getConnector().validate(activeFlowContext, validationContext);
- allResults.addAll(implValidationResults);
- } catch (final Exception e) {
- allResults.add(new ValidationResult.Builder()
- .subject("Validation Failure")
- .valid(false)
- .explanation("Encountered a failure while attempting
to perform validation: " + e.getMessage())
- .build());
+ if (unresolvedBundleValidationResult != null) {
+ allResults.add(unresolvedBundleValidationResult);
+ } else {
+ final ConnectorValidationContext validationContext =
createValidationContext(activeFlowContext);
+
+ validateManagedFlowComponents(allResults);
+ validatePropertyReferences(allResults);
+
+ if (allResults.isEmpty()) {
+ try {
+ final List<ValidationResult> implValidationResults =
getConnector().validate(activeFlowContext, validationContext);
+ allResults.addAll(implValidationResults);
+ } catch (final Exception e) {
+ allResults.add(new ValidationResult.Builder()
+ .subject("Validation Failure")
+ .valid(false)
+ .explanation("Encountered a failure while
attempting to perform validation: " + e.getMessage())
+ .build());
+ }
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index 64f06ded7c..cdc2aa57f1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -508,7 +508,7 @@ public class ExtensionBuilder {
connector = createConnector();
} catch (final Exception e) {
logger.error("Could not create Connector of type {} from {} for ID
{} due to: {}; creating \"Ghost\" implementation", type, bundleCoordinate,
identifier, e.getMessage(), e);
- return createGhostConnectorNode(connectorsAuthorizable);
+ return createGhostConnectorNode(connectorsAuthorizable, e);
}
final String componentType = connector.getClass().getSimpleName();
@@ -537,7 +537,7 @@ public class ExtensionBuilder {
connectorNode.initializeConnector(initContext);
} catch (final Exception e) {
logger.error("Could not initialize Connector of type {} from {}
for ID {}; creating \"Ghost\" implementation", type, bundleCoordinate,
identifier, e);
- return createGhostConnectorNode(connectorsAuthorizable);
+ return createGhostConnectorNode(connectorsAuthorizable, e);
}
if (loadInitialFlow) {
@@ -545,15 +545,15 @@ public class ExtensionBuilder {
connectorNode.loadInitialFlow();
} catch (final Exception e) {
logger.error("Failed to load initial flow for {}; creating
\"Ghost\" implementation", connectorNode, e);
- return createGhostConnectorNode(connectorsAuthorizable);
+ return createGhostConnectorNode(connectorsAuthorizable, e);
}
}
return connectorNode;
}
- private ConnectorNode createGhostConnectorNode(final Authorizable
connectorsAuthorizable) {
- final GhostConnector ghostConnector = new GhostConnector(identifier,
type);
+ private ConnectorNode createGhostConnectorNode(final Authorizable
connectorsAuthorizable, final Exception cause) {
+ final GhostConnector ghostConnector = new GhostConnector(identifier,
type, cause);
final String simpleClassName = type.contains(".") ?
StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
final ComponentLog componentLog = new SimpleProcessLogger(identifier,
ghostConnector, new StandardLoggingContext());
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/MissingBundleConnector.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/MissingBundleConnector.java
new file mode 100644
index 0000000000..c21b55dfbb
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/MissingBundleConnector.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * A test connector that returns an initial flow containing a processor with a
bundle that does not exist.
+ * This is used to test the behavior when a connector's initial flow
references unavailable components.
+ */
+public class MissingBundleConnector extends AbstractConnector {
+
+ private static final List<ConfigurationStep> CONFIGURATION_STEPS =
List.of();
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return CONFIGURATION_STEPS;
+ }
+
+ @Override
+ protected void init() {
+ }
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+ rootGroup.setIdentifier(UUID.randomUUID().toString());
+ rootGroup.setInstanceIdentifier(UUID.randomUUID().toString());
+ rootGroup.setName("Missing Bundle Connector Flow");
+ rootGroup.setPosition(new Position(0.0, 0.0));
+ rootGroup.setProcessGroups(new HashSet<>());
+ rootGroup.setConnections(new HashSet<>());
+ rootGroup.setInputPorts(new HashSet<>());
+ rootGroup.setOutputPorts(new HashSet<>());
+ rootGroup.setControllerServices(new HashSet<>());
+ rootGroup.setFunnels(new HashSet<>());
+ rootGroup.setLabels(new HashSet<>());
+
+ final VersionedProcessor missingProcessor = new VersionedProcessor();
+ missingProcessor.setIdentifier(UUID.randomUUID().toString());
+ missingProcessor.setInstanceIdentifier(UUID.randomUUID().toString());
+ missingProcessor.setName("Missing Processor");
+ missingProcessor.setType("com.example.nonexistent.MissingProcessor");
+ missingProcessor.setPosition(new Position(100.0, 100.0));
+ missingProcessor.setScheduledState(ScheduledState.ENABLED);
+ missingProcessor.setSchedulingPeriod("0 sec");
+ missingProcessor.setSchedulingStrategy("TIMER_DRIVEN");
+ missingProcessor.setExecutionNode("ALL");
+ missingProcessor.setPenaltyDuration("30 sec");
+ missingProcessor.setYieldDuration("1 sec");
+ missingProcessor.setBulletinLevel("WARN");
+ missingProcessor.setRunDurationMillis(0L);
+ missingProcessor.setConcurrentlySchedulableTaskCount(1);
+ missingProcessor.setAutoTerminatedRelationships(new HashSet<>());
+ missingProcessor.setProperties(Map.of());
+ missingProcessor.setPropertyDescriptors(Map.of());
+ missingProcessor.setGroupIdentifier(rootGroup.getIdentifier());
+
+ final Bundle missingBundle = new Bundle();
+ missingBundle.setGroup("com.example.nonexistent");
+ missingBundle.setArtifact("missing-nar");
+ missingBundle.setVersion("1.0.0");
+ missingProcessor.setBundle(missingBundle);
+
+ rootGroup.setProcessors(new HashSet<>(List.of(missingProcessor)));
+
+ final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
+ externalFlow.setFlowContents(rootGroup);
+ return externalFlow;
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingContext, final
FlowContext activeContext) throws FlowUpdateException {
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final Map<String, String> overrides, final FlowContext flowContext) {
+ return List.of();
+ }
+
+ @Override
+ public void onStepConfigured(final String stepName, final FlowContext
workingContext) throws FlowUpdateException {
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
index afdf4f7062..3ad6c99053 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
@@ -545,6 +545,23 @@ public class StandardConnectorNodeIT {
connectorNode.stop(componentLifecycleThreadPool);
}
+ @Test
+ public void testInitialFlowWithMissingBundleResultsInInvalidConnector() {
+ final ConnectorNode connectorNode =
flowManager.createConnector(MissingBundleConnector.class.getName(),
"missing-bundle-connector", SystemBundle.SYSTEM_BUNDLE_COORDINATE, true, true);
+ assertNotNull(connectorNode);
+
+ assertFalse(connectorNode.isExtensionMissing());
+
+ final ValidationState validationState =
connectorNode.performValidation();
+ assertNotNull(validationState);
+ assertEquals(ValidationStatus.INVALID, validationState.getStatus());
+ assertEquals(1, validationState.getValidationErrors().size());
+
+ final String explanation =
validationState.getValidationErrors().iterator().next().getExplanation();
+
assertTrue(explanation.contains("com.example.nonexistent:missing-nar:1.0.0"),
"Expected explanation to mention the missing bundle coordinates but was: " +
explanation);
+
assertTrue(explanation.contains("com.example.nonexistent.MissingProcessor"),
"Expected explanation to mention the missing processor type but was: " +
explanation);
+ }
+
private List<String> getConfigurationStepNames(final ConnectorNode
connectorNode) {
return connectorNode.getConfigurationSteps().stream()
.map(ConfigurationStep::getName)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/META-INF/services/org.apache.nifi.processor.Processor
index af0bf1e11e..ff061c3200 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -22,3 +22,5 @@
org.apache.nifi.components.connector.processors.OverwriteFlowFile
org.apache.nifi.components.connector.processors.TerminateFlowFile
org.apache.nifi.components.connector.processors.LogFlowFileContents
org.apache.nifi.components.connector.processors.ExposeFileValues
+org.apache.nifi.components.connector.processors.Sleep
+org.apache.nifi.components.connector.processors.OnPropertyModifiedTracker
\ No newline at end of file