bbende commented on code in PR #10909:
URL: https://github.com/apache/nifi/pull/10909#discussion_r2819179165


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -367,17 +404,158 @@ public FrameworkConnectorInitializationContextBuilder 
createInitializationContex
         return new StandardConnectorInitializationContext.Builder();
     }
 
+    // ConnectorAssetRepository is an internal implementation detail;
+    // all external callers should use the asset methods on 
ConnectorRepository directly.
+
+    @Override
+    public Asset storeAsset(final String connectorId, final String assetId, 
final String assetName, final InputStream content) throws IOException {
+        if (configurationProvider == null) {
+            return assetRepository.storeAsset(connectorId, assetId, assetName, 
content);
+        }
+
+        // Buffer content so we can send it to both the local store and the 
provider
+        final byte[] contentBytes = content.readAllBytes();
+
+        final Asset localAsset = assetRepository.storeAsset(connectorId, 
assetId, assetName, new java.io.ByteArrayInputStream(contentBytes));
+
+        try {
+            final String externalId = 
configurationProvider.storeAsset(connectorId, assetName, new 
java.io.ByteArrayInputStream(contentBytes));
+            recordIdMapping(connectorId, assetId, externalId);
+            logger.debug("Stored asset [nifiId={}, externalId={}] for 
connector [{}]", assetId, externalId, connectorId);
+        } catch (final Exception e) {
+            logger.error("Failed to store asset [{}] to provider for connector 
[{}]; rolling back local asset", assetName, connectorId, e);
+            assetRepository.deleteAsset(assetId);
+            throw new IOException("Failed to store asset to provider", e);
+        }
+
+        return localAsset;
+    }
+
+    @Override
+    public Optional<Asset> getAsset(final String assetId) {
+        return assetRepository.getAsset(assetId);
+    }
+
     @Override
-    public ConnectorAssetRepository getAssetRepository() {
-        return assetRepository;
+    public List<Asset> getAssets(final String connectorId) {
+        return assetRepository.getAssets(connectorId);
+    }
+
+    @Override
+    public void deleteAssets(final String connectorId) {
+        if (configurationProvider != null) {
+            final Map<String, String> nifiToExt = 
nifiToExternalId.getOrDefault(connectorId, Map.of());
+            for (final Map.Entry<String, String> entry : nifiToExt.entrySet()) 
{
+                try {
+                    configurationProvider.deleteAsset(connectorId, 
entry.getValue());
+                } catch (final Exception e) {
+                    logger.warn("Failed to delete asset [externalId={}] from 
provider for connector [{}]", entry.getValue(), connectorId, e);
+                }
+            }
+            nifiToExternalId.remove(connectorId);
+            externalToNifiId.remove(connectorId);
+        }
+        cachedAssetMetadata.remove(connectorId);
+        assetRepository.deleteAssets(connectorId);
+    }
+
+    // --- ID Mapping Helpers ---
+
+    private void recordIdMapping(final String connectorId, final String 
nifiUuid, final String externalId) {
+        nifiToExternalId.computeIfAbsent(connectorId, k -> new 
ConcurrentHashMap<>()).put(nifiUuid, externalId);
+        externalToNifiId.computeIfAbsent(connectorId, k -> new 
ConcurrentHashMap<>()).put(externalId, nifiUuid);
+    }
+
+    private void removeIdMapping(final String connectorId, final String 
nifiUuid) {
+        final Map<String, String> nifiToExt = 
nifiToExternalId.get(connectorId);
+        if (nifiToExt != null) {
+            final String externalId = nifiToExt.remove(nifiUuid);
+            if (externalId != null) {
+                final Map<String, String> extToNifi = 
externalToNifiId.get(connectorId);
+                if (extToNifi != null) {
+                    extToNifi.remove(externalId);
+                }
+            }
+        }
+    }
+
+    private String lookupNifiUuid(final String connectorId, final String 
externalId) {
+        final Map<String, String> extToNifi = 
externalToNifiId.get(connectorId);
+        return extToNifi != null ? extToNifi.get(externalId) : null;
+    }
+
+    private String lookupExternalId(final String connectorId, final String 
nifiUuid) {
+        final Map<String, String> nifiToExt = 
nifiToExternalId.get(connectorId);
+        return nifiToExt != null ? nifiToExt.get(nifiUuid) : null;
+    }
+
+    private ConnectorAssetMetadata findAssetMetadata(final 
List<ConnectorAssetMetadata> metadataList, final String externalId) {
+        for (final ConnectorAssetMetadata metadata : metadataList) {
+            if (externalId.equals(metadata.getIdentifier())) {
+                return metadata;
+            }
+        }
+        return null;
+    }
+
+    // --- Asset Sync from Provider ---
+
+    @Override
+    public void syncAssetsFromProvider(final ConnectorNode connector) {
+        if (configurationProvider == null) {
+            return;
+        }
+
+        final String connectorId = connector.getIdentifier();
+        final List<ConnectorAssetMetadata> metadataList = 
cachedAssetMetadata.getOrDefault(connectorId, List.of());
+
+        for (final ConnectorAssetMetadata metadata : metadataList) {
+            final String externalId = metadata.getIdentifier();
+            final String nifiUuid = lookupNifiUuid(connectorId, externalId);
+            if (nifiUuid == null) {
+                logger.warn("No NiFi UUID mapping found for external asset 
[{}] in connector [{}]; skipping sync", externalId, connectorId);
+                continue;
+            }
+
+            final Optional<Asset> localAssetOpt = 
assetRepository.getAsset(nifiUuid);
+            final boolean localFileMissing = localAssetOpt.isEmpty() || 
!localAssetOpt.get().getFile().exists();
+
+            final String providerDigest = metadata.getDigest();
+            final String lastDigest = 
lastDownloadedProviderDigest.get(nifiUuid);
+            final boolean digestChanged = providerDigest != null && 
!providerDigest.equals(lastDigest);
+
+            if (localFileMissing || digestChanged || lastDigest == null) {

Review Comment:
   Not totally sure if this is true, but on a restart `lastDigest` seems like 
it won't exist so we'd always retrieve all assets from the provider? 



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -367,17 +404,158 @@ public FrameworkConnectorInitializationContextBuilder 
createInitializationContex
         return new StandardConnectorInitializationContext.Builder();
     }
 
+    // ConnectorAssetRepository is an internal implementation detail;
+    // all external callers should use the asset methods on 
ConnectorRepository directly.
+
+    @Override
+    public Asset storeAsset(final String connectorId, final String assetId, 
final String assetName, final InputStream content) throws IOException {
+        if (configurationProvider == null) {
+            return assetRepository.storeAsset(connectorId, assetId, assetName, 
content);
+        }
+
+        // Buffer content so we can send it to both the local store and the 
provider
+        final byte[] contentBytes = content.readAllBytes();

Review Comment:
   Not sure if we are concerned about memory here, wondering if we should store 
it in the local `AssetRepository` first, and add a new method there like 
`InputStream getAssetContent` that we can then pass to the external provider?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to