bbende commented on code in PR #10909:
URL: https://github.com/apache/nifi/pull/10909#discussion_r2819179165
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -367,17 +404,158 @@ public FrameworkConnectorInitializationContextBuilder
createInitializationContex
return new StandardConnectorInitializationContext.Builder();
}
+ // ConnectorAssetRepository is an internal implementation detail;
+ // all external callers should use the asset methods on
ConnectorRepository directly.
+
+ @Override
+ public Asset storeAsset(final String connectorId, final String assetId,
final String assetName, final InputStream content) throws IOException {
+ if (configurationProvider == null) {
+ return assetRepository.storeAsset(connectorId, assetId, assetName,
content);
+ }
+
+ // Buffer content so we can send it to both the local store and the
provider
+ final byte[] contentBytes = content.readAllBytes();
+
+ final Asset localAsset = assetRepository.storeAsset(connectorId,
assetId, assetName, new java.io.ByteArrayInputStream(contentBytes));
+
+ try {
+ final String externalId =
configurationProvider.storeAsset(connectorId, assetName, new
java.io.ByteArrayInputStream(contentBytes));
+ recordIdMapping(connectorId, assetId, externalId);
+ logger.debug("Stored asset [nifiId={}, externalId={}] for
connector [{}]", assetId, externalId, connectorId);
+ } catch (final Exception e) {
+ logger.error("Failed to store asset [{}] to provider for connector
[{}]; rolling back local asset", assetName, connectorId, e);
+ assetRepository.deleteAsset(assetId);
+ throw new IOException("Failed to store asset to provider", e);
+ }
+
+ return localAsset;
+ }
+
+ @Override
+ public Optional<Asset> getAsset(final String assetId) {
+ return assetRepository.getAsset(assetId);
+ }
+
@Override
- public ConnectorAssetRepository getAssetRepository() {
- return assetRepository;
+ public List<Asset> getAssets(final String connectorId) {
+ return assetRepository.getAssets(connectorId);
+ }
+
+ @Override
+ public void deleteAssets(final String connectorId) {
+ if (configurationProvider != null) {
+ final Map<String, String> nifiToExt =
nifiToExternalId.getOrDefault(connectorId, Map.of());
+ for (final Map.Entry<String, String> entry : nifiToExt.entrySet())
{
+ try {
+ configurationProvider.deleteAsset(connectorId,
entry.getValue());
+ } catch (final Exception e) {
+ logger.warn("Failed to delete asset [externalId={}] from
provider for connector [{}]", entry.getValue(), connectorId, e);
+ }
+ }
+ nifiToExternalId.remove(connectorId);
+ externalToNifiId.remove(connectorId);
+ }
+ cachedAssetMetadata.remove(connectorId);
+ assetRepository.deleteAssets(connectorId);
+ }
+
+ // --- ID Mapping Helpers ---
+
+ private void recordIdMapping(final String connectorId, final String
nifiUuid, final String externalId) {
+ nifiToExternalId.computeIfAbsent(connectorId, k -> new
ConcurrentHashMap<>()).put(nifiUuid, externalId);
+ externalToNifiId.computeIfAbsent(connectorId, k -> new
ConcurrentHashMap<>()).put(externalId, nifiUuid);
+ }
+
+ private void removeIdMapping(final String connectorId, final String
nifiUuid) {
+ final Map<String, String> nifiToExt =
nifiToExternalId.get(connectorId);
+ if (nifiToExt != null) {
+ final String externalId = nifiToExt.remove(nifiUuid);
+ if (externalId != null) {
+ final Map<String, String> extToNifi =
externalToNifiId.get(connectorId);
+ if (extToNifi != null) {
+ extToNifi.remove(externalId);
+ }
+ }
+ }
+ }
+
+ private String lookupNifiUuid(final String connectorId, final String
externalId) {
+ final Map<String, String> extToNifi =
externalToNifiId.get(connectorId);
+ return extToNifi != null ? extToNifi.get(externalId) : null;
+ }
+
+ private String lookupExternalId(final String connectorId, final String
nifiUuid) {
+ final Map<String, String> nifiToExt =
nifiToExternalId.get(connectorId);
+ return nifiToExt != null ? nifiToExt.get(nifiUuid) : null;
+ }
+
+ private ConnectorAssetMetadata findAssetMetadata(final
List<ConnectorAssetMetadata> metadataList, final String externalId) {
+ for (final ConnectorAssetMetadata metadata : metadataList) {
+ if (externalId.equals(metadata.getIdentifier())) {
+ return metadata;
+ }
+ }
+ return null;
+ }
+
+ // --- Asset Sync from Provider ---
+
+ @Override
+ public void syncAssetsFromProvider(final ConnectorNode connector) {
+ if (configurationProvider == null) {
+ return;
+ }
+
+ final String connectorId = connector.getIdentifier();
+ final List<ConnectorAssetMetadata> metadataList =
cachedAssetMetadata.getOrDefault(connectorId, List.of());
+
+ for (final ConnectorAssetMetadata metadata : metadataList) {
+ final String externalId = metadata.getIdentifier();
+ final String nifiUuid = lookupNifiUuid(connectorId, externalId);
+ if (nifiUuid == null) {
+ logger.warn("No NiFi UUID mapping found for external asset
[{}] in connector [{}]; skipping sync", externalId, connectorId);
+ continue;
+ }
+
+ final Optional<Asset> localAssetOpt =
assetRepository.getAsset(nifiUuid);
+ final boolean localFileMissing = localAssetOpt.isEmpty() ||
!localAssetOpt.get().getFile().exists();
+
+ final String providerDigest = metadata.getDigest();
+ final String lastDigest =
lastDownloadedProviderDigest.get(nifiUuid);
+ final boolean digestChanged = providerDigest != null &&
!providerDigest.equals(lastDigest);
+
+ if (localFileMissing || digestChanged || lastDigest == null) {
Review Comment:
Not totally sure if this is true, but on a restart `lastDigest` seems like
it won't exist so we'd always retrieve all assets from the provider?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -367,17 +404,158 @@ public FrameworkConnectorInitializationContextBuilder
createInitializationContex
return new StandardConnectorInitializationContext.Builder();
}
+ // ConnectorAssetRepository is an internal implementation detail;
+ // all external callers should use the asset methods on
ConnectorRepository directly.
+
+ @Override
+ public Asset storeAsset(final String connectorId, final String assetId,
final String assetName, final InputStream content) throws IOException {
+ if (configurationProvider == null) {
+ return assetRepository.storeAsset(connectorId, assetId, assetName,
content);
+ }
+
+ // Buffer content so we can send it to both the local store and the
provider
+ final byte[] contentBytes = content.readAllBytes();
Review Comment:
Not sure if we are concerned about memory here, wondering if we should store
it in the local `AssetRepository` first, and add a new method there like
`InputStream getAssetContent` that we can then pass to the external provider?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]