This is an automated email from the ASF dual-hosted git repository. kdoran pushed a commit to branch NIFI-15610-connector-external-assets in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 5c2fa7b0be390d942493e8f4cb3e6a1ed8ac6930 Author: Kevin Doran <[email protected]> AuthorDate: Tue Feb 17 11:16:26 2026 -0500 NIFI-15610 Add asset management to ConnectorConfigurationProvider --- .../connector/ConnectorAssetMetadata.java | 86 +++++ .../connector/ConnectorConfigurationProvider.java | 31 ++ .../connector/ConnectorWorkingConfiguration.java | 16 + .../components/connector/ConnectorRepository.java | 52 ++- .../connector/StandardConnectorRepository.java | 370 ++++++++++++++++++++- .../connector/TestStandardConnectorRepository.java | 346 +++++++++++++++++++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 10 +- .../configuration/WebApplicationConfiguration.java | 2 +- .../nifi/web/dao/impl/StandardConnectorDAO.java | 17 +- .../web/dao/impl/StandardConnectorDAOTest.java | 40 +-- 10 files changed, 919 insertions(+), 51 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorAssetMetadata.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorAssetMetadata.java new file mode 100644 index 0000000000..567212c643 --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorAssetMetadata.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +/** + * Metadata describing an asset managed by a {@link ConnectorConfigurationProvider}. + * + * <p>All identifiers and names in this class use the <em>external</em> identifier space + * of the provider (e.g., a stage file path), not NiFi's internal asset UUID space. + * The framework is responsible for translating between external and NiFi identifiers.</p> + * + * <p>This is a mutable POJO following the same style as {@link ConnectorWorkingConfiguration}.</p> + */ +public class ConnectorAssetMetadata { + + private String identifier; + private String name; + private String digest; + + public ConnectorAssetMetadata() { + } + + public ConnectorAssetMetadata(final String identifier, final String name, final String digest) { + this.identifier = identifier; + this.name = name; + this.digest = digest; + } + + /** + * Returns the external identifier assigned by the provider for this asset. + * For example, a Snowflake stage path such as {@code assets/postgresql-42.6.0.jar}. + * + * @return the external asset identifier + */ + public String getIdentifier() { + return identifier; + } + + public void setIdentifier(final String identifier) { + this.identifier = identifier; + } + + /** + * Returns the filename of the asset. This serves as the join key for matching + * external assets to NiFi-managed assets within a given property reference. + * + * @return the asset filename + */ + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + /** + * Returns the provider-supplied content hash for change detection. + * The digest algorithm is provider-specific (e.g., MD5 from a Snowflake stage LIST command). + * May be {@code null} if the provider does not support digest computation. + * + * @return the content digest, or {@code null} + */ + public String getDigest() { + return digest; + } + + public void setDigest(final String digest) { + this.digest = digest; + } +} diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java index fba4912e16..5286cc7600 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java @@ -17,6 +17,7 @@ package org.apache.nifi.components.connector; +import java.io.InputStream; import java.util.Optional; /** @@ -90,4 +91,34 @@ public interface ConnectorConfigurationProvider { * @param connectorId the identifier of the connector to be created */ void verifyCreate(String connectorId); + + /** + * Stores an asset in the external store. The provider assigns an external identifier + * to the asset and returns it. The framework uses this identifier for subsequent + * {@link #loadAsset} and {@link #deleteAsset} calls. + * + * @param connectorId the identifier of the connector that owns the asset + * @param assetName the filename of the asset (e.g., "postgresql-42.6.0.jar") + * @param content the binary content of the asset + * @return the external identifier assigned by the provider for this asset + */ + String storeAsset(String connectorId, String assetName, InputStream content); + + /** + * Loads an asset's binary content from the external store. + * + * @param connectorId the identifier of the connector that owns the asset + * @param externalAssetId the external identifier as returned by {@link #storeAsset} + * or from {@link ConnectorAssetMetadata#getIdentifier()} + * @return an Optional containing the asset content if found, or empty if the asset does not exist + */ + Optional<InputStream> loadAsset(String connectorId, String externalAssetId); + + /** + * Deletes an asset from the external store. + * + * @param connectorId the identifier of the connector that owns the asset + * @param externalAssetId the external identifier of the asset to delete + */ + void deleteAsset(String connectorId, String externalAssetId); } diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorWorkingConfiguration.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorWorkingConfiguration.java index fbe3a3af21..01bed450ed 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorWorkingConfiguration.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorWorkingConfiguration.java @@ -19,6 +19,7 @@ package org.apache.nifi.components.connector; import org.apache.nifi.flow.VersionedConfigurationStep; +import java.util.ArrayList; import java.util.List; /** @@ -32,6 +33,7 @@ import java.util.List; public class ConnectorWorkingConfiguration { private String name; private List<VersionedConfigurationStep> workingFlowConfiguration; + private List<ConnectorAssetMetadata> assets = new ArrayList<>(); public String getName() { return name; @@ -48,4 +50,18 @@ public class ConnectorWorkingConfiguration { public void setWorkingFlowConfiguration(final List<VersionedConfigurationStep> workingFlowConfiguration) { this.workingFlowConfiguration = workingFlowConfiguration; } + + /** + * Returns the asset metadata for assets managed by the external provider. + * All identifiers use the provider's external identifier space. + * + * @return the list of asset metadata, never {@code null} + */ + public List<ConnectorAssetMetadata> getAssets() { + return assets; + } + + public void setAssets(final List<ConnectorAssetMetadata> assets) { + this.assets = assets != null ? assets : new ArrayList<>(); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java index ea4c65579c..cfa5ae5f9a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java @@ -17,11 +17,15 @@ package org.apache.nifi.components.connector; +import org.apache.nifi.asset.Asset; import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedConfigurationStep; +import java.io.IOException; +import java.io.InputStream; import java.util.List; +import java.util.Optional; import java.util.concurrent.Future; public interface ConnectorRepository { @@ -127,5 +131,51 @@ public interface ConnectorRepository { FrameworkConnectorInitializationContextBuilder createInitializationContextBuilder(); - ConnectorAssetRepository getAssetRepository(); + /** + * Stores an asset for the given connector. If a {@link ConnectorConfigurationProvider} is configured, + * the asset is also uploaded to the external store and a mapping between the NiFi asset ID and the + * external ID is recorded. If the provider upload fails, the local asset is rolled back. + * + * @param connectorId the identifier of the connector that owns the asset + * @param assetId the NiFi-assigned asset identifier (UUID) + * @param assetName the filename of the asset + * @param content the binary content of the asset + * @return the stored Asset + * @throws IOException if an I/O error occurs during storage + */ + Asset storeAsset(String connectorId, String assetId, String assetName, InputStream content) throws IOException; + + /** + * Retrieves an asset by its NiFi-assigned identifier. + * + * @param assetId the NiFi-assigned asset identifier (UUID) + * @return an Optional containing the asset if found, or empty if no asset exists with the given ID + */ + Optional<Asset> getAsset(String assetId); + + /** + * Retrieves all assets belonging to the given connector. + * + * @param connectorId the identifier of the connector + * @return the list of assets for the connector + */ + List<Asset> getAssets(String connectorId); + + /** + * Deletes all assets belonging to the given connector from the local store and, + * if a {@link ConnectorConfigurationProvider} is configured, from the external store as well. + * + * @param connectorId the identifier of the connector whose assets should be deleted + */ + void deleteAssets(String connectorId); + + /** + * Ensures that asset binaries for the given connector are available locally by downloading + * any missing or changed assets from the external {@link ConnectorConfigurationProvider}. + * This is a no-op if no provider is configured. This method should be called before operations + * that need local asset files to be present, such as configuration verification. + * + * @param connector the connector whose assets should be synced + */ + void syncAssetsFromProvider(ConnectorNode connector); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java index cf7cb877c4..15487e0ff7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java @@ -19,6 +19,7 @@ package org.apache.nifi.components.connector; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.asset.Asset; +import org.apache.nifi.asset.AssetManager; import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.flow.Bundle; @@ -31,10 +32,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -50,10 +53,22 @@ public class StandardConnectorRepository implements ConnectorRepository { private final Map<String, ConnectorNode> connectors = new ConcurrentHashMap<>(); private final FlowEngine lifecycleExecutor = new FlowEngine(8, "NiFi Connector Lifecycle"); + // Bidirectional mapping per connector: connectorId -> (nifiUuid <-> externalId) + // Outer map: connectorId -> inner map. Inner map: nifiUuid -> externalId (and reverse lookups via helper methods). + private final Map<String, Map<String, String>> nifiToExternalId = new ConcurrentHashMap<>(); + private final Map<String, Map<String, String>> externalToNifiId = new ConcurrentHashMap<>(); + + // Provider digest at the time each asset was last downloaded: nifiUuid -> providerDigest + private final Map<String, String> lastDownloadedProviderDigest = new ConcurrentHashMap<>(); + + // Cached asset metadata from the most recent syncFromProvider, per connector: connectorId -> list of metadata + private final Map<String, List<ConnectorAssetMetadata>> cachedAssetMetadata = new ConcurrentHashMap<>(); + private volatile ExtensionManager extensionManager; private volatile ConnectorRequestReplicator requestReplicator; private volatile SecretsManager secretsManager; private volatile ConnectorAssetRepository assetRepository; + private volatile AssetManager assetManager; private volatile ConnectorConfigurationProvider configurationProvider; @Override @@ -62,7 +77,8 @@ public class StandardConnectorRepository implements ConnectorRepository { this.extensionManager = context.getExtensionManager(); this.requestReplicator = context.getRequestReplicator(); this.secretsManager = context.getSecretsManager(); - this.assetRepository = new StandardConnectorAssetRepository(context.getAssetManager()); + this.assetManager = context.getAssetManager(); + this.assetRepository = new StandardConnectorAssetRepository(assetManager); this.configurationProvider = context.getConnectorConfigurationProvider(); logger.debug("Successfully initialized ConnectorRepository with configurationProvider={}", configurationProvider != null ? configurationProvider.getClass().getSimpleName() : "null"); } @@ -101,6 +117,9 @@ public class StandardConnectorRepository implements ConnectorRepository { if (configurationProvider != null) { configurationProvider.delete(connectorId); } + nifiToExternalId.remove(connectorId); + externalToNifiId.remove(connectorId); + cachedAssetMetadata.remove(connectorId); connectors.remove(connectorId); final Class<?> taskClass = connectorNode.getConnector().getClass(); @@ -176,6 +195,9 @@ public class StandardConnectorRepository implements ConnectorRepository { // so that the update is applied using the latest externally managed configuration. syncFromProvider(connector); + // Download any missing or changed asset binaries from the provider (fail-hard). + syncAssetsFromProvider(connector); + // Transition the connector's state to PREPARING_FOR_UPDATE before starting the background process. // This allows us to ensure that if we poll and see the state in the same state it was in before that // we know the update has already completed (successfully or otherwise). @@ -262,22 +284,37 @@ public class StandardConnectorRepository implements ConnectorRepository { } private void cleanUpAssets(final ConnectorNode connector) { + final String connectorId = connector.getIdentifier(); final Set<String> referencedAssetIds = new HashSet<>(); collectReferencedAssetIds(connector.getActiveFlowContext(), referencedAssetIds); collectReferencedAssetIds(connector.getWorkingFlowContext(), referencedAssetIds); - logger.debug("Found {} assets referenced for Connector [{}]", referencedAssetIds.size(), connector.getIdentifier()); + logger.debug("Found {} assets referenced for Connector [{}]", referencedAssetIds.size(), connectorId); - final ConnectorAssetRepository assetRepository = getAssetRepository(); - final List<Asset> allConnectorAssets = assetRepository.getAssets(connector.getIdentifier()); + final List<Asset> allConnectorAssets = assetRepository.getAssets(connectorId); for (final Asset asset : allConnectorAssets) { final String assetId = asset.getIdentifier(); if (!referencedAssetIds.contains(assetId)) { try { - logger.info("Deleting unreferenced asset [id={},name={}] for connector [{}]", assetId, asset.getName(), connector.getIdentifier()); + logger.info("Deleting unreferenced asset [id={},name={}] for connector [{}]", assetId, asset.getName(), connectorId); + + // Delete from external provider if mapped + if (configurationProvider != null) { + final String externalId = lookupExternalId(connectorId, assetId); + if (externalId != null) { + try { + configurationProvider.deleteAsset(connectorId, externalId); + } catch (final Exception providerEx) { + logger.warn("Failed to delete asset [externalId={}] from provider for connector [{}]", externalId, connectorId, providerEx); + } + } + } + assetRepository.deleteAsset(assetId); + removeIdMapping(connectorId, assetId); + lastDownloadedProviderDigest.remove(assetId); } catch (final Exception e) { - logger.warn("Unable to delete unreferenced asset [id={},name={}] for connector [{}]", assetId, asset.getName(), connector.getIdentifier(), e); + logger.warn("Unable to delete unreferenced asset [id={},name={}] for connector [{}]", assetId, asset.getName(), connectorId, e); } } } @@ -367,9 +404,149 @@ public class StandardConnectorRepository implements ConnectorRepository { return new StandardConnectorInitializationContext.Builder(); } + // ConnectorAssetRepository is an internal implementation detail; + // all external callers should use the asset methods on ConnectorRepository directly. + + @Override + public Asset storeAsset(final String connectorId, final String assetId, final String assetName, final InputStream content) throws IOException { + if (configurationProvider == null) { + return assetRepository.storeAsset(connectorId, assetId, assetName, content); + } + + // Buffer content so we can send it to both the local store and the provider + final byte[] contentBytes = content.readAllBytes(); + + final Asset localAsset = assetRepository.storeAsset(connectorId, assetId, assetName, new java.io.ByteArrayInputStream(contentBytes)); + + try { + final String externalId = configurationProvider.storeAsset(connectorId, assetName, new java.io.ByteArrayInputStream(contentBytes)); + recordIdMapping(connectorId, assetId, externalId); + logger.debug("Stored asset [nifiId={}, externalId={}] for connector [{}]", assetId, externalId, connectorId); + } catch (final Exception e) { + logger.error("Failed to store asset [{}] to provider for connector [{}]; rolling back local asset", assetName, connectorId, e); + assetRepository.deleteAsset(assetId); + throw new IOException("Failed to store asset to provider", e); + } + + return localAsset; + } + + @Override + public Optional<Asset> getAsset(final String assetId) { + return assetRepository.getAsset(assetId); + } + @Override - public ConnectorAssetRepository getAssetRepository() { - return assetRepository; + public List<Asset> getAssets(final String connectorId) { + return assetRepository.getAssets(connectorId); + } + + @Override + public void deleteAssets(final String connectorId) { + if (configurationProvider != null) { + final Map<String, String> nifiToExt = nifiToExternalId.getOrDefault(connectorId, Map.of()); + for (final Map.Entry<String, String> entry : nifiToExt.entrySet()) { + try { + configurationProvider.deleteAsset(connectorId, entry.getValue()); + } catch (final Exception e) { + logger.warn("Failed to delete asset [externalId={}] from provider for connector [{}]", entry.getValue(), connectorId, e); + } + } + nifiToExternalId.remove(connectorId); + externalToNifiId.remove(connectorId); + } + cachedAssetMetadata.remove(connectorId); + assetRepository.deleteAssets(connectorId); + } + + // --- ID Mapping Helpers --- + + private void recordIdMapping(final String connectorId, final String nifiUuid, final String externalId) { + nifiToExternalId.computeIfAbsent(connectorId, k -> new ConcurrentHashMap<>()).put(nifiUuid, externalId); + externalToNifiId.computeIfAbsent(connectorId, k -> new ConcurrentHashMap<>()).put(externalId, nifiUuid); + } + + private void removeIdMapping(final String connectorId, final String nifiUuid) { + final Map<String, String> nifiToExt = nifiToExternalId.get(connectorId); + if (nifiToExt != null) { + final String externalId = nifiToExt.remove(nifiUuid); + if (externalId != null) { + final Map<String, String> extToNifi = externalToNifiId.get(connectorId); + if (extToNifi != null) { + extToNifi.remove(externalId); + } + } + } + } + + private String lookupNifiUuid(final String connectorId, final String externalId) { + final Map<String, String> extToNifi = externalToNifiId.get(connectorId); + return extToNifi != null ? extToNifi.get(externalId) : null; + } + + private String lookupExternalId(final String connectorId, final String nifiUuid) { + final Map<String, String> nifiToExt = nifiToExternalId.get(connectorId); + return nifiToExt != null ? nifiToExt.get(nifiUuid) : null; + } + + private ConnectorAssetMetadata findAssetMetadata(final List<ConnectorAssetMetadata> metadataList, final String externalId) { + for (final ConnectorAssetMetadata metadata : metadataList) { + if (externalId.equals(metadata.getIdentifier())) { + return metadata; + } + } + return null; + } + + // --- Asset Sync from Provider --- + + @Override + public void syncAssetsFromProvider(final ConnectorNode connector) { + if (configurationProvider == null) { + return; + } + + final String connectorId = connector.getIdentifier(); + final List<ConnectorAssetMetadata> metadataList = cachedAssetMetadata.getOrDefault(connectorId, List.of()); + + for (final ConnectorAssetMetadata metadata : metadataList) { + final String externalId = metadata.getIdentifier(); + final String nifiUuid = lookupNifiUuid(connectorId, externalId); + if (nifiUuid == null) { + logger.warn("No NiFi UUID mapping found for external asset [{}] in connector [{}]; skipping sync", externalId, connectorId); + continue; + } + + final Optional<Asset> localAssetOpt = assetRepository.getAsset(nifiUuid); + final boolean localFileMissing = localAssetOpt.isEmpty() || !localAssetOpt.get().getFile().exists(); + + final String providerDigest = metadata.getDigest(); + final String lastDigest = lastDownloadedProviderDigest.get(nifiUuid); + final boolean digestChanged = providerDigest != null && !providerDigest.equals(lastDigest); + + if (localFileMissing || digestChanged || lastDigest == null) { + logger.info("Downloading asset [externalId={}, name={}] for connector [{}] (localMissing={}, digestChanged={})", + externalId, metadata.getName(), connectorId, localFileMissing, digestChanged); + try { + final Optional<InputStream> contentOpt = configurationProvider.loadAsset(connectorId, externalId); + if (contentOpt.isPresent()) { + try (final InputStream content = contentOpt.get()) { + assetRepository.storeAsset(connectorId, nifiUuid, metadata.getName(), content); + } + if (providerDigest != null) { + lastDownloadedProviderDigest.put(nifiUuid, providerDigest); + } + logger.info("Successfully downloaded asset [externalId={}, nifiId={}] for connector [{}]", externalId, nifiUuid, connectorId); + } else { + logger.warn("Provider returned empty content for asset [externalId={}] in connector [{}]", externalId, connectorId); + } + } catch (final Exception e) { + throw new RuntimeException("Failed to download asset [externalId=%s] for connector [%s]".formatted(externalId, connectorId), e); + } + } else { + logger.debug("Asset [externalId={}, nifiId={}] is up to date for connector [{}]", externalId, nifiUuid, connectorId); + } + } } private void syncFromProvider(final ConnectorNode connector) { @@ -377,7 +554,8 @@ public class StandardConnectorRepository implements ConnectorRepository { return; } - final Optional<ConnectorWorkingConfiguration> externalConfig = configurationProvider.load(connector.getIdentifier()); + final String connectorId = connector.getIdentifier(); + final Optional<ConnectorWorkingConfiguration> externalConfig = configurationProvider.load(connectorId); if (externalConfig.isEmpty()) { return; } @@ -387,8 +565,17 @@ public class StandardConnectorRepository implements ConnectorRepository { connector.setName(config.getName()); } + // Cache the provider-supplied asset metadata (with external IDs and digests) for later use by syncAssetsFromProvider + final List<ConnectorAssetMetadata> assetMetadata = config.getAssets(); + if (assetMetadata != null && !assetMetadata.isEmpty()) { + cachedAssetMetadata.put(connectorId, List.copyOf(assetMetadata)); + } + final List<VersionedConfigurationStep> workingFlowConfiguration = config.getWorkingFlowConfiguration(); if (workingFlowConfiguration != null) { + // Translate external asset IDs to NiFi UUIDs before applying configuration + translateExternalToNifiIds(connector, workingFlowConfiguration, assetMetadata != null ? assetMetadata : List.of()); + final MutableConnectorConfigurationContext workingConfigContext = connector.getWorkingFlowContext().getConfigurationContext(); for (final VersionedConfigurationStep step : workingFlowConfiguration) { final StepConfiguration stepConfiguration = toStepConfiguration(step); @@ -397,13 +584,148 @@ public class StandardConnectorRepository implements ConnectorRepository { } } + /** + * Translates external asset IDs in the configuration steps to NiFi UUIDs using property-based + name-based matching. + * For externally-originated assets with no existing mapping, creates missing-asset placeholders. + */ + private void translateExternalToNifiIds(final ConnectorNode connector, final List<VersionedConfigurationStep> externalSteps, + final List<ConnectorAssetMetadata> assetMetadata) { + final FrameworkFlowContext workingContext = connector.getWorkingFlowContext(); + + for (final VersionedConfigurationStep step : externalSteps) { + final Map<String, VersionedConnectorValueReference> properties = step.getProperties(); + if (properties == null) { + continue; + } + + for (final Map.Entry<String, VersionedConnectorValueReference> entry : properties.entrySet()) { + final String propertyName = entry.getKey(); + final VersionedConnectorValueReference ref = entry.getValue(); + if (ref == null || ref.getAssetIds() == null || ref.getAssetIds().isEmpty()) { + continue; + } + if (!ConnectorValueType.ASSET_REFERENCE.name().equals(ref.getValueType())) { + continue; + } + + final Set<String> translatedIds = new LinkedHashSet<>(); + for (final String externalId : ref.getAssetIds()) { + final String nifiUuid = translateSingleExternalId(connector, step.getName(), propertyName, externalId, assetMetadata, workingContext); + translatedIds.add(nifiUuid); + } + ref.setAssetIds(translatedIds); + } + } + } + + /** + * Translates a single external asset ID to a NiFi UUID. + * First checks the existing mapping, then attempts property+name matching, then creates a missing-asset placeholder. + */ + private String translateSingleExternalId(final ConnectorNode connector, final String stepName, final String propertyName, + final String externalId, final List<ConnectorAssetMetadata> assetMetadata, + final FrameworkFlowContext workingContext) { + final String connectorId = connector.getIdentifier(); + + // 1. Check existing mapping + final String existingNifiUuid = lookupNifiUuid(connectorId, externalId); + if (existingNifiUuid != null) { + return existingNifiUuid; + } + + // 2. Look up asset name from metadata + final ConnectorAssetMetadata metadata = findAssetMetadata(assetMetadata, externalId); + final String assetName = metadata != null ? metadata.getName() : externalId; + + // 3. Try property+name matching against existing NiFi working context + if (workingContext != null) { + final ConnectorConfiguration nifiConfig = workingContext.getConfigurationContext().toConnectorConfiguration(); + final NamedStepConfiguration nifiStep = nifiConfig.getNamedStepConfiguration(stepName); + if (nifiStep != null) { + final Map<String, ConnectorValueReference> nifiProperties = nifiStep.configuration().getPropertyValues(); + final ConnectorValueReference nifiRef = nifiProperties != null ? nifiProperties.get(propertyName) : null; + if (nifiRef instanceof AssetReference nifiAssetRef) { + for (final String candidateNifiUuid : nifiAssetRef.getAssetIdentifiers()) { + final Optional<Asset> candidateAsset = assetRepository.getAsset(candidateNifiUuid); + if (candidateAsset.isPresent() && assetName.equals(candidateAsset.get().getName())) { + recordIdMapping(connectorId, candidateNifiUuid, externalId); + logger.debug("Matched external asset [{}] to existing NiFi asset [id={}, name={}] via property+name matching", + externalId, candidateNifiUuid, assetName); + return candidateNifiUuid; + } + } + } + } + } + + // 4. No match found: create a missing-asset placeholder + final Asset missingAsset = assetManager.createMissingAsset(connectorId, assetName); + recordIdMapping(connectorId, missingAsset.getIdentifier(), externalId); + logger.info("Created missing-asset placeholder [nifiId={}, name={}] for external asset [{}] in connector [{}]", + missingAsset.getIdentifier(), assetName, externalId, connectorId); + return missingAsset.getIdentifier(); + } + private ConnectorWorkingConfiguration buildWorkingConfiguration(final ConnectorNode connector) { + final String connectorId = connector.getIdentifier(); final ConnectorWorkingConfiguration config = new ConnectorWorkingConfiguration(); config.setName(connector.getName()); - config.setWorkingFlowConfiguration(buildVersionedConfigurationSteps(connector.getWorkingFlowContext())); + + final List<VersionedConfigurationStep> steps = buildVersionedConfigurationSteps(connector.getWorkingFlowContext()); + if (configurationProvider != null) { + translateNifiToExternalIds(connectorId, steps); + config.setAssets(buildAssetMetadataList(connectorId)); + } + config.setWorkingFlowConfiguration(steps); return config; } + /** + * Translates NiFi UUIDs in configuration steps to external IDs for the provider. + */ + private void translateNifiToExternalIds(final String connectorId, final List<VersionedConfigurationStep> steps) { + for (final VersionedConfigurationStep step : steps) { + final Map<String, VersionedConnectorValueReference> properties = step.getProperties(); + if (properties == null) { + continue; + } + + for (final VersionedConnectorValueReference ref : properties.values()) { + if (ref == null || ref.getAssetIds() == null || ref.getAssetIds().isEmpty()) { + continue; + } + if (!ConnectorValueType.ASSET_REFERENCE.name().equals(ref.getValueType())) { + continue; + } + + final Set<String> translatedIds = new LinkedHashSet<>(); + for (final String nifiUuid : ref.getAssetIds()) { + final String externalId = lookupExternalId(connectorId, nifiUuid); + translatedIds.add(externalId != null ? externalId : nifiUuid); + } + ref.setAssetIds(translatedIds); + } + } + } + + /** + * Builds a list of ConnectorAssetMetadata with external IDs and names for the provider. + */ + private List<ConnectorAssetMetadata> buildAssetMetadataList(final String connectorId) { + final Map<String, String> nifiToExt = nifiToExternalId.getOrDefault(connectorId, Map.of()); + final List<ConnectorAssetMetadata> metadataList = new ArrayList<>(); + + for (final Map.Entry<String, String> entry : nifiToExt.entrySet()) { + final String nifiUuid = entry.getKey(); + final String externalId = entry.getValue(); + final Optional<Asset> localAsset = assetRepository.getAsset(nifiUuid); + final String assetName = localAsset.map(Asset::getName).orElse(externalId); + metadataList.add(new ConnectorAssetMetadata(externalId, assetName, null)); + } + + return metadataList; + } + private List<VersionedConfigurationStep> buildVersionedConfigurationSteps(final FrameworkFlowContext flowContext) { if (flowContext == null) { return List.of(); @@ -423,9 +745,10 @@ public class StandardConnectorRepository implements ConnectorRepository { } private ConnectorWorkingConfiguration buildMergedWorkingConfiguration(final ConnectorNode connector, final String stepName, final StepConfiguration incomingConfiguration) { + final String connectorId = connector.getIdentifier(); final ConnectorWorkingConfiguration existingConfig; if (configurationProvider != null) { - final Optional<ConnectorWorkingConfiguration> externalConfig = configurationProvider.load(connector.getIdentifier()); + final Optional<ConnectorWorkingConfiguration> externalConfig = configurationProvider.load(connectorId); existingConfig = externalConfig.orElseGet(() -> buildWorkingConfiguration(connector)); } else { existingConfig = buildWorkingConfiguration(connector); @@ -456,15 +779,38 @@ public class StandardConnectorRepository implements ConnectorRepository { for (final Map.Entry<String, ConnectorValueReference> entry : incomingConfiguration.getPropertyValues().entrySet()) { if (entry.getValue() != null) { - mergedProperties.put(entry.getKey(), toVersionedValueReference(entry.getValue())); + final VersionedConnectorValueReference versionedRef = toVersionedValueReference(entry.getValue()); + // Translate NiFi UUIDs to external IDs for asset references before merging with external config + if (configurationProvider != null && entry.getValue() instanceof AssetReference) { + translateNifiToExternalIdsInRef(connectorId, versionedRef); + } + mergedProperties.put(entry.getKey(), versionedRef); } } targetStep.setProperties(mergedProperties); existingConfig.setWorkingFlowConfiguration(existingSteps); + + // Update asset metadata list + if (configurationProvider != null) { + existingConfig.setAssets(buildAssetMetadataList(connectorId)); + } + return existingConfig; } + private void translateNifiToExternalIdsInRef(final String connectorId, final VersionedConnectorValueReference ref) { + if (ref.getAssetIds() == null || ref.getAssetIds().isEmpty()) { + return; + } + final Set<String> translatedIds = new LinkedHashSet<>(); + for (final String nifiUuid : ref.getAssetIds()) { + final String externalId = lookupExternalId(connectorId, nifiUuid); + translatedIds.add(externalId != null ? externalId : nifiUuid); + } + ref.setAssetIds(translatedIds); + } + private Map<String, VersionedConnectorValueReference> toVersionedProperties(final StepConfiguration configuration) { final Map<String, VersionedConnectorValueReference> versionedProperties = new HashMap<>(); for (final Map.Entry<String, ConnectorValueReference> entry : configuration.getPropertyValues().entrySet()) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java index 2e209a722c..dcb439f640 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java @@ -25,6 +25,10 @@ import org.apache.nifi.nar.ExtensionManager; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -33,6 +37,7 @@ import java.util.Optional; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; @@ -530,6 +535,347 @@ public class TestStandardConnectorRepository { verify(provider, never()).verifyCreate(anyString()); } + // --- Asset Lifecycle Tests --- + + @Test + public void testStoreAssetDelegatesToProviderFirst() throws IOException { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + final Asset localAsset = mock(Asset.class); + when(localAsset.getIdentifier()).thenReturn("nifi-uuid-1"); + when(localAsset.getName()).thenReturn("driver.jar"); + + when(assetManager.saveAsset(eq("connector-1"), eq("nifi-uuid-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn(localAsset); + when(provider.storeAsset(eq("connector-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn("assets/driver.jar"); + + final InputStream content = new ByteArrayInputStream("test-content".getBytes()); + final Asset result = repository.storeAsset("connector-1", "nifi-uuid-1", "driver.jar", content); + + assertNotNull(result); + assertEquals("nifi-uuid-1", result.getIdentifier()); + verify(provider).storeAsset(eq("connector-1"), eq("driver.jar"), any(InputStream.class)); + } + + @Test + public void testStoreAssetRollsBackOnProviderFailure() throws IOException { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + final Asset localAsset = mock(Asset.class); + when(localAsset.getIdentifier()).thenReturn("nifi-uuid-1"); + when(localAsset.getName()).thenReturn("driver.jar"); + + when(assetManager.saveAsset(eq("connector-1"), eq("nifi-uuid-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn(localAsset); + when(provider.storeAsset(eq("connector-1"), eq("driver.jar"), any(InputStream.class))) + .thenThrow(new RuntimeException("Provider upload failed")); + + final InputStream content = new ByteArrayInputStream("test-content".getBytes()); + assertThrows(IOException.class, () -> repository.storeAsset("connector-1", "nifi-uuid-1", "driver.jar", content)); + + verify(assetManager).deleteAsset("nifi-uuid-1"); + } + + @Test + public void testGetAssetDelegatesToRepository() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + final Asset mockAsset = mock(Asset.class); + when(mockAsset.getIdentifier()).thenReturn("asset-1"); + when(assetManager.getAsset("asset-1")).thenReturn(Optional.of(mockAsset)); + + final Optional<Asset> result = repository.getAsset("asset-1"); + assertTrue(result.isPresent()); + assertEquals("asset-1", result.get().getIdentifier()); + } + + @Test + public void testGetAssetsForConnector() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + final Asset asset1 = mock(Asset.class); + final Asset asset2 = mock(Asset.class); + when(assetManager.getAssets("connector-1")).thenReturn(List.of(asset1, asset2)); + + final List<Asset> result = repository.getAssets("connector-1"); + assertEquals(2, result.size()); + } + + @Test + public void testDeleteAssetsDeletesFromProviderAndLocal() throws IOException { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + final Asset localAsset = mock(Asset.class); + when(localAsset.getIdentifier()).thenReturn("nifi-uuid-1"); + when(localAsset.getName()).thenReturn("driver.jar"); + + when(assetManager.saveAsset(eq("connector-1"), eq("nifi-uuid-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn(localAsset); + when(provider.storeAsset(eq("connector-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn("assets/driver.jar"); + repository.storeAsset("connector-1", "nifi-uuid-1", "driver.jar", new ByteArrayInputStream("content".getBytes())); + + when(assetManager.getAssets("connector-1")).thenReturn(List.of(localAsset)); + + repository.deleteAssets("connector-1"); + + verify(provider).deleteAsset("connector-1", "assets/driver.jar"); + verify(assetManager).deleteAsset("nifi-uuid-1"); + } + + // --- ID Translation Tests --- + + @Test + public void testSyncFromProviderTranslatesExternalIdsToNifiUuids() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + // Set up working config context with empty configuration + final MutableConnectorConfigurationContext workingConfigContext = mock(MutableConnectorConfigurationContext.class); + when(workingConfigContext.toConnectorConfiguration()).thenReturn(new ConnectorConfiguration(Set.of())); + + final FrameworkFlowContext workingFlowContext = mock(FrameworkFlowContext.class); + when(workingFlowContext.getConfigurationContext()).thenReturn(workingConfigContext); + + final ConnectorNode connector = mock(ConnectorNode.class); + when(connector.getIdentifier()).thenReturn("connector-1"); + when(connector.getName()).thenReturn("Test Connector"); + when(connector.getWorkingFlowContext()).thenReturn(workingFlowContext); + repository.addConnector(connector); + + // Configure provider to return external asset IDs + final VersionedConnectorValueReference assetRef = new VersionedConnectorValueReference(); + assetRef.setValueType("ASSET_REFERENCE"); + assetRef.setAssetIds(Set.of("assets/driver.jar")); + final VersionedConfigurationStep step = createVersionedStep("step1", Map.of("driver", assetRef)); + + final ConnectorAssetMetadata metadata = new ConnectorAssetMetadata("assets/driver.jar", "driver.jar", "abc123md5"); + final ConnectorWorkingConfiguration externalConfig = new ConnectorWorkingConfiguration(); + externalConfig.setName("Test Connector"); + externalConfig.setWorkingFlowConfiguration(List.of(step)); + externalConfig.setAssets(List.of(metadata)); + when(provider.load("connector-1")).thenReturn(Optional.of(externalConfig)); + + // Mock missing asset creation + final Asset missingAsset = mock(Asset.class); + when(missingAsset.getIdentifier()).thenReturn("generated-nifi-uuid"); + when(missingAsset.getName()).thenReturn("driver.jar"); + when(assetManager.createMissingAsset("connector-1", "driver.jar")).thenReturn(missingAsset); + + // Trigger sync via getConnector + repository.getConnector("connector-1"); + + // Verify that createMissingAsset was called because no existing mapping exists + verify(assetManager).createMissingAsset("connector-1", "driver.jar"); + + // Verify the step was applied with the translated NiFi UUID + final ArgumentCaptor<StepConfiguration> stepCaptor = ArgumentCaptor.forClass(StepConfiguration.class); + verify(workingConfigContext).replaceProperties(eq("step1"), stepCaptor.capture()); + } + + @Test + public void testSyncFromProviderMatchesExistingAssetByPropertyAndName() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + // Set up working context with an existing NiFi asset + final Asset existingAsset = mock(Asset.class); + when(existingAsset.getIdentifier()).thenReturn("existing-nifi-uuid"); + when(existingAsset.getName()).thenReturn("driver.jar"); + when(assetManager.getAsset("existing-nifi-uuid")).thenReturn(Optional.of(existingAsset)); + + final StepConfiguration existingStepConfig = new StepConfiguration( + Map.of("driver", new AssetReference(Set.of("existing-nifi-uuid"))) + ); + final ConnectorConfiguration existingConfig = new ConnectorConfiguration( + Set.of(new NamedStepConfiguration("step1", existingStepConfig)) + ); + final MutableConnectorConfigurationContext workingConfigContext = mock(MutableConnectorConfigurationContext.class); + when(workingConfigContext.toConnectorConfiguration()).thenReturn(existingConfig); + + final FrameworkFlowContext workingFlowContext = mock(FrameworkFlowContext.class); + when(workingFlowContext.getConfigurationContext()).thenReturn(workingConfigContext); + + final ConnectorNode connector = mock(ConnectorNode.class); + when(connector.getIdentifier()).thenReturn("connector-1"); + when(connector.getName()).thenReturn("Test Connector"); + when(connector.getWorkingFlowContext()).thenReturn(workingFlowContext); + repository.addConnector(connector); + + // Configure provider to return external asset IDs + final VersionedConnectorValueReference assetRef = new VersionedConnectorValueReference(); + assetRef.setValueType("ASSET_REFERENCE"); + assetRef.setAssetIds(Set.of("assets/driver.jar")); + final VersionedConfigurationStep step = createVersionedStep("step1", Map.of("driver", assetRef)); + + final ConnectorAssetMetadata metadata = new ConnectorAssetMetadata("assets/driver.jar", "driver.jar", "abc123md5"); + final ConnectorWorkingConfiguration externalConfig = new ConnectorWorkingConfiguration(); + externalConfig.setName("Test Connector"); + externalConfig.setWorkingFlowConfiguration(List.of(step)); + externalConfig.setAssets(List.of(metadata)); + when(provider.load("connector-1")).thenReturn(Optional.of(externalConfig)); + + // Trigger sync via getConnector + repository.getConnector("connector-1"); + + // Should NOT create a missing asset because it matched by property+name + verify(assetManager, never()).createMissingAsset(anyString(), anyString()); + } + + @Test + public void testBuildWorkingConfigurationTranslatesNifiToExternalIds() throws IOException, FlowUpdateException { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + // Store asset to establish mapping: nifi-uuid-1 <-> assets/driver.jar + final File assetFile = mock(File.class); + when(assetFile.getPath()).thenReturn("/tmp/fake/driver.jar"); + final Asset localAsset = mock(Asset.class); + when(localAsset.getIdentifier()).thenReturn("nifi-uuid-1"); + when(localAsset.getName()).thenReturn("driver.jar"); + when(localAsset.getFile()).thenReturn(assetFile); + when(assetManager.saveAsset(eq("connector-1"), eq("nifi-uuid-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn(localAsset); + when(assetManager.getAsset("nifi-uuid-1")).thenReturn(Optional.of(localAsset)); + when(provider.storeAsset(eq("connector-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn("assets/driver.jar"); + repository.storeAsset("connector-1", "nifi-uuid-1", "driver.jar", new ByteArrayInputStream("content".getBytes())); + + // Create connector with working config referencing the NiFi UUID + final StepConfiguration stepConfig = new StepConfiguration( + Map.of("driver", new AssetReference(Set.of("nifi-uuid-1"))) + ); + final ConnectorConfiguration nifiConfig = new ConnectorConfiguration( + Set.of(new NamedStepConfiguration("step1", stepConfig)) + ); + final MutableConnectorConfigurationContext workingConfigContext = mock(MutableConnectorConfigurationContext.class); + when(workingConfigContext.toConnectorConfiguration()).thenReturn(nifiConfig); + + final FrameworkFlowContext workingFlowContext = mock(FrameworkFlowContext.class); + when(workingFlowContext.getConfigurationContext()).thenReturn(workingConfigContext); + final FrameworkFlowContext activeFlowContext = mock(FrameworkFlowContext.class); + final MutableConnectorConfigurationContext activeConfigContext = mock(MutableConnectorConfigurationContext.class); + when(activeConfigContext.toConnectorConfiguration()).thenReturn(new ConnectorConfiguration(Set.of())); + when(activeFlowContext.getConfigurationContext()).thenReturn(activeConfigContext); + + final ConnectorNode connector = mock(ConnectorNode.class); + when(connector.getIdentifier()).thenReturn("connector-1"); + when(connector.getName()).thenReturn("Test Connector"); + when(connector.getWorkingFlowContext()).thenReturn(workingFlowContext); + when(connector.getActiveFlowContext()).thenReturn(activeFlowContext); + repository.addConnector(connector); + + when(provider.load("connector-1")).thenReturn(Optional.empty()); + + // Call configureConnector which builds a working configuration with NiFi->External translation + final StepConfiguration incomingConfig = new StepConfiguration( + Map.of("driver", new AssetReference(Set.of("nifi-uuid-1"))) + ); + repository.configureConnector(connector, "step1", incomingConfig); + + final ArgumentCaptor<ConnectorWorkingConfiguration> configCaptor = ArgumentCaptor.forClass(ConnectorWorkingConfiguration.class); + verify(provider).save(eq("connector-1"), configCaptor.capture()); + + final ConnectorWorkingConfiguration savedConfig = configCaptor.getValue(); + final VersionedConfigurationStep savedStep = savedConfig.getWorkingFlowConfiguration().stream() + .filter(s -> "step1".equals(s.getName())).findFirst().orElse(null); + assertNotNull(savedStep); + final VersionedConnectorValueReference savedRef = savedStep.getProperties().get("driver"); + assertNotNull(savedRef); + assertTrue(savedRef.getAssetIds().contains("assets/driver.jar"), "Expected external ID in saved config"); + assertFalse(savedRef.getAssetIds().contains("nifi-uuid-1"), "NiFi UUID should be translated to external ID"); + } + + // --- Digest-based Sync Tests --- + + @Test + public void testSyncFromProviderCachesAssetMetadata() throws IOException { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final AssetManager assetManager = mock(AssetManager.class); + final StandardConnectorRepository repository = createRepositoryWithProviderAndAssetManager(provider, assetManager); + + // Store asset to establish mapping: nifi-uuid-1 <-> assets/driver.jar + final Asset localAsset = mock(Asset.class); + when(localAsset.getIdentifier()).thenReturn("nifi-uuid-1"); + when(localAsset.getName()).thenReturn("driver.jar"); + when(assetManager.saveAsset(eq("connector-1"), eq("nifi-uuid-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn(localAsset); + when(assetManager.getAsset("nifi-uuid-1")).thenReturn(Optional.of(localAsset)); + when(provider.storeAsset(eq("connector-1"), eq("driver.jar"), any(InputStream.class))) + .thenReturn("assets/driver.jar"); + repository.storeAsset("connector-1", "nifi-uuid-1", "driver.jar", new ByteArrayInputStream("content".getBytes())); + + // Set up connector with working context that references this asset + final StepConfiguration existingStepConfig = new StepConfiguration( + Map.of("driver", new AssetReference(Set.of("nifi-uuid-1"))) + ); + final ConnectorConfiguration existingNifiConfig = new ConnectorConfiguration( + Set.of(new NamedStepConfiguration("step1", existingStepConfig)) + ); + final MutableConnectorConfigurationContext workingConfigContext = mock(MutableConnectorConfigurationContext.class); + when(workingConfigContext.toConnectorConfiguration()).thenReturn(existingNifiConfig); + + final FrameworkFlowContext workingFlowContext = mock(FrameworkFlowContext.class); + when(workingFlowContext.getConfigurationContext()).thenReturn(workingConfigContext); + + final ConnectorNode connector = mock(ConnectorNode.class); + when(connector.getIdentifier()).thenReturn("connector-1"); + when(connector.getName()).thenReturn("Test Connector"); + when(connector.getWorkingFlowContext()).thenReturn(workingFlowContext); + when(connector.getActiveFlowContext()).thenReturn(workingFlowContext); + repository.addConnector(connector); + + // Set up provider response with asset digest "abc123" + final VersionedConnectorValueReference assetRef = new VersionedConnectorValueReference(); + assetRef.setValueType("ASSET_REFERENCE"); + assetRef.setAssetIds(Set.of("assets/driver.jar")); + final VersionedConfigurationStep step = createVersionedStep("step1", Map.of("driver", assetRef)); + + final ConnectorAssetMetadata metadata = new ConnectorAssetMetadata("assets/driver.jar", "driver.jar", "abc123"); + final ConnectorWorkingConfiguration externalConfig = new ConnectorWorkingConfiguration(); + externalConfig.setName("Test Connector"); + externalConfig.setWorkingFlowConfiguration(List.of(step)); + externalConfig.setAssets(List.of(metadata)); + when(provider.load("connector-1")).thenReturn(Optional.of(externalConfig)); + + // Trigger syncFromProvider (via getConnector) + repository.getConnector("connector-1"); + + // The mapping should have been found by property+name matching (since nifi-uuid-1 maps to assets/driver.jar) + // and no new missing asset should be created + verify(assetManager, never()).createMissingAsset(anyString(), anyString()); + + // The step should have been applied with the translated NiFi UUID + verify(workingConfigContext).replaceProperties(eq("step1"), any(StepConfiguration.class)); + } + + // --- Helper Methods --- + + private StandardConnectorRepository createRepositoryWithProviderAndAssetManager( + final ConnectorConfigurationProvider provider, final AssetManager assetManager) { + final StandardConnectorRepository repository = new StandardConnectorRepository(); + final ConnectorRepositoryInitializationContext initContext = mock(ConnectorRepositoryInitializationContext.class); + when(initContext.getExtensionManager()).thenReturn(mock(ExtensionManager.class)); + when(initContext.getAssetManager()).thenReturn(assetManager); + when(initContext.getConnectorConfigurationProvider()).thenReturn(provider); + repository.initialize(initContext); + return repository; + } + private StandardConnectorRepository createRepositoryWithProvider(final ConnectorConfigurationProvider provider) { final StandardConnectorRepository repository = new StandardConnectorRepository(); final ConnectorRepositoryInitializationContext initContext = mock(ConnectorRepositoryInitializationContext.class); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6d4ef631a3..53775117b3 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -71,11 +71,11 @@ import org.apache.nifi.components.connector.AssetReference; import org.apache.nifi.components.connector.ConfigurationStep; import org.apache.nifi.components.connector.ConfigurationStepDependency; import org.apache.nifi.components.connector.ConnectorAction; -import org.apache.nifi.components.connector.ConnectorAssetRepository; import org.apache.nifi.components.connector.ConnectorConfiguration; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorPropertyDescriptor; import org.apache.nifi.components.connector.ConnectorPropertyGroup; +import org.apache.nifi.components.connector.ConnectorRepository; import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.components.connector.NamedStepConfiguration; @@ -321,7 +321,7 @@ public final class DtoFactory { private EntityFactory entityFactory; private Authorizer authorizer; private ExtensionManager extensionManager; - private ConnectorAssetRepository connectorAssetRepository; + private ConnectorRepository connectorRepository; private RuntimeManifestService runtimeManifestService; public ControllerConfigurationDTO createControllerConfigurationDto(final ControllerFacade controllerFacade) { @@ -5220,8 +5220,8 @@ public final class DtoFactory { this.extensionManager = extensionManager; } - public void setConnectorAssetRepository(final ConnectorAssetRepository connectorAssetRepository) { - this.connectorAssetRepository = connectorAssetRepository; + public void setConnectorRepository(final ConnectorRepository connectorRepository) { + this.connectorRepository = connectorRepository; } public void setRuntimeManifestService(RuntimeManifestService runtimeManifestService) { @@ -5434,7 +5434,7 @@ public final class DtoFactory { } private AssetReferenceDTO createConnectorAssetReferenceDto(final String assetId) { - final String assetName = connectorAssetRepository.getAsset(assetId).map(Asset::getName).orElse(assetId); + final String assetName = connectorRepository.getAsset(assetId).map(Asset::getName).orElse(assetId); return new AssetReferenceDTO(assetId, assetName); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/configuration/WebApplicationConfiguration.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/configuration/WebApplicationConfiguration.java index 904fdcb9d1..7ed4627cfb 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/configuration/WebApplicationConfiguration.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/configuration/WebApplicationConfiguration.java @@ -186,7 +186,7 @@ public class WebApplicationConfiguration { dtoFactory.setControllerServiceProvider(flowController.getControllerServiceProvider()); dtoFactory.setEntityFactory(entityFactory()); dtoFactory.setExtensionManager(extensionManager); - dtoFactory.setConnectorAssetRepository(flowController.getConnectorRepository().getAssetRepository()); + dtoFactory.setConnectorRepository(flowController.getConnectorRepository()); dtoFactory.setRuntimeManifestService(runtimeManifestService); return dtoFactory; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java index cb7680cb0b..697e8c5de6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java @@ -21,7 +21,6 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.DescribedValue; import org.apache.nifi.components.connector.AssetReference; -import org.apache.nifi.components.connector.ConnectorAssetRepository; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; import org.apache.nifi.components.connector.ConnectorUpdateContext; @@ -76,10 +75,6 @@ public class StandardConnectorDAO implements ConnectorDAO { return flowController.getConnectorRepository(); } - private ConnectorAssetRepository getConnectorAssetRepository() { - return flowController.getConnectorRepository().getAssetRepository(); - } - @Override public void verifyCreate(final ConnectorDTO connectorDTO) { final String id = connectorDTO.getId(); @@ -124,8 +119,8 @@ public class StandardConnectorDAO implements ConnectorDAO { @Override public void deleteConnector(final String id) { + getConnectorRepository().deleteAssets(id); getConnectorRepository().removeConnector(id); - getConnectorAssetRepository().deleteAssets(id); } @Override @@ -250,6 +245,7 @@ public class StandardConnectorDAO implements ConnectorDAO { @Override public List<ConfigVerificationResult> verifyConfigurationStep(final String id, final String configurationStepName, final ConfigurationStepConfigurationDTO configurationStepDto) { final ConnectorNode connector = getConnector(id); + getConnectorRepository().syncAssetsFromProvider(connector); final StepConfiguration stepConfiguration = convertToStepConfiguration(configurationStepDto); return connector.verifyConfigurationStep(configurationStepName, stepConfiguration); } @@ -271,20 +267,17 @@ public class StandardConnectorDAO implements ConnectorDAO { @Override public Asset createAsset(final String id, final String assetId, final String assetName, final InputStream content) throws IOException { - final ConnectorAssetRepository assetRepository = getConnectorAssetRepository(); - return assetRepository.storeAsset(id, assetId, assetName, content); + return getConnectorRepository().storeAsset(id, assetId, assetName, content); } @Override public List<Asset> getAssets(final String id) { - final ConnectorAssetRepository assetRepository = getConnectorAssetRepository(); - return assetRepository.getAssets(id); + return getConnectorRepository().getAssets(id); } @Override public Optional<Asset> getAsset(final String assetId) { - final ConnectorAssetRepository assetRepository = getConnectorAssetRepository(); - return assetRepository.getAsset(assetId); + return getConnectorRepository().getAsset(assetId); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java index e642add1a8..d1c589fd22 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java @@ -18,7 +18,6 @@ package org.apache.nifi.web.dao.impl; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.DescribedValue; -import org.apache.nifi.components.connector.ConnectorAssetRepository; import org.apache.nifi.components.connector.ConnectorConfiguration; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; @@ -29,22 +28,24 @@ import org.apache.nifi.components.connector.MutableConnectorConfigurationContext import org.apache.nifi.controller.FlowController; import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO; import org.apache.nifi.web.api.dto.ConnectorDTO; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.util.Collections; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -68,9 +69,6 @@ class StandardConnectorDAOTest { @Mock private ConnectorUpdateContext connectorUpdateContext; - @Mock - private ConnectorAssetRepository connectorAssetRepository; - @Mock private FrameworkFlowContext frameworkFlowContext; @@ -90,7 +88,6 @@ class StandardConnectorDAOTest { connectorDAO.setFlowController(flowController); when(flowController.getConnectorRepository()).thenReturn(connectorRepository); - when(connectorRepository.getAssetRepository()).thenReturn(connectorAssetRepository); final MutableConnectorConfigurationContext configContext = mock(MutableConnectorConfigurationContext.class); when(configContext.toConnectorConfiguration()).thenReturn(mock(ConnectorConfiguration.class)); @@ -102,13 +99,6 @@ class StandardConnectorDAOTest { @Test void testApplyConnectorUpdate() throws Exception { when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); - when(connectorRepository.getAssetRepository()).thenReturn(connectorAssetRepository); - when(connectorNode.getActiveFlowContext()).thenReturn(frameworkFlowContext); - when(frameworkFlowContext.getConfigurationContext()).thenReturn(configurationContext); - when(configurationContext.toConnectorConfiguration()).thenReturn(connectorConfiguration); - when(connectorConfiguration.getNamedStepConfigurations()).thenReturn(Collections.emptySet()); - when(connectorNode.getIdentifier()).thenReturn(CONNECTOR_ID); - when(connectorAssetRepository.getAssets(CONNECTOR_ID)).thenReturn(Collections.emptyList()); connectorDAO.applyConnectorUpdate(CONNECTOR_ID, connectorUpdateContext); @@ -251,25 +241,35 @@ class StandardConnectorDAOTest { } @Test - void testDeleteConnectorRemovesConnectorAndAssets() { - when(connectorRepository.getAssetRepository()).thenReturn(connectorAssetRepository); + void testVerifyConfigurationStepSyncsAssetsBeforeVerification() { + when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + final ConfigurationStepConfigurationDTO stepConfigDto = new ConfigurationStepConfigurationDTO(); + + connectorDAO.verifyConfigurationStep(CONNECTOR_ID, STEP_NAME, stepConfigDto); + final InOrder inOrder = inOrder(connectorRepository, connectorNode); + inOrder.verify(connectorRepository).syncAssetsFromProvider(connectorNode); + inOrder.verify(connectorNode).verifyConfigurationStep(any(), any()); + } + + @Test + void testDeleteConnectorRemovesAssetsAndConnector() { connectorDAO.deleteConnector(CONNECTOR_ID); + verify(connectorRepository).deleteAssets(CONNECTOR_ID); verify(connectorRepository).removeConnector(CONNECTOR_ID); - verify(connectorAssetRepository).deleteAssets(CONNECTOR_ID); } @Test - void testDeleteConnectorDoesNotDeleteAssetsWhenRemovalFails() { - doThrow(new RuntimeException("Removal failed")).when(connectorRepository).removeConnector(CONNECTOR_ID); + void testDeleteConnectorDoesNotRemoveConnectorWhenAssetDeletionFails() { + doThrow(new RuntimeException("Asset deletion failed")).when(connectorRepository).deleteAssets(CONNECTOR_ID); assertThrows(RuntimeException.class, () -> connectorDAO.deleteConnector(CONNECTOR_ID) ); - verify(connectorRepository).removeConnector(CONNECTOR_ID); - verify(connectorAssetRepository, never()).deleteAssets(any()); + verify(connectorRepository).deleteAssets(CONNECTOR_ID); + verify(connectorRepository, never()).removeConnector(any()); } @Test
