This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit b0481062fabe3e2d8a744d7225615a896ae578e4 Author: Mark Payne <[email protected]> AuthorDate: Fri Jan 16 18:29:54 2026 -0500 NIFI-15472: Added addAsset methods to ConnectorTestRunner (#10776) --- .../mock/connector/server/ConnectorTestRunner.java | 7 + .../server/MockConnectorAssetManager.java | 179 +++++++++++++++ .../server/StandardConnectorMockServer.java | 29 +++ .../server/MockConnectorAssetManagerTest.java | 245 +++++++++++++++++++++ .../connector/StandardConnectorTestRunner.java | 11 + .../src/main/resources/nifi.properties | 3 + 6 files changed, 474 insertions(+) diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java b/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java index 5fe172fa78..25a5215db3 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java @@ -18,12 +18,15 @@ package org.apache.nifi.mock.connector.server; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.AssetReference; import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.SecretReference; import org.apache.nifi.components.connector.StepConfiguration; import java.io.Closeable; +import java.io.File; +import java.io.InputStream; import java.time.Duration; import java.util.List; import java.util.Map; @@ -50,6 +53,10 @@ public interface ConnectorTestRunner extends Closeable { void addSecret(String name, String value); + AssetReference addAsset(File file); + + AssetReference addAsset(String assetName, InputStream contents); + void startConnector(); void stopConnector(); diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java new file mode 100644 index 0000000000..e8272be173 --- /dev/null +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mock.connector.server; + +import org.apache.nifi.asset.Asset; +import org.apache.nifi.asset.AssetManager; +import org.apache.nifi.asset.AssetManagerInitializationContext; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A Mock implementation of AssetManager for use in ConnectorTestRunner tests. + * This implementation stores assets in a temporary directory structure. + */ +public class MockConnectorAssetManager implements AssetManager { + + private static final String ASSET_STORAGE_LOCATION_PROPERTY = "directory"; + private static final String DEFAULT_ASSET_STORAGE_LOCATION = "target/mock-connector-assets"; + + private final Map<String, Asset> assets = new ConcurrentHashMap<>(); + private volatile File assetStorageLocation; + + @Override + public void initialize(final AssetManagerInitializationContext context) { + final String storageLocation = context.getProperties().getOrDefault(ASSET_STORAGE_LOCATION_PROPERTY, DEFAULT_ASSET_STORAGE_LOCATION); + assetStorageLocation = new File(storageLocation); + + if (!assetStorageLocation.exists()) { + try { + Files.createDirectories(assetStorageLocation.toPath()); + } catch (final IOException e) { + throw new RuntimeException("Failed to create asset storage directory: " + storageLocation, e); + } + } + } + + @Override + public Asset createAsset(final String ownerId, final String assetName, final InputStream contents) throws IOException { + final String assetId = UUID.randomUUID().toString(); + return saveAsset(ownerId, assetId, assetName, contents); + } + + @Override + public Asset saveAsset(final String ownerId, final String assetId, final String assetName, final InputStream contents) throws IOException { + final File assetFile = getFile(ownerId, assetId, assetName); + final File parentDir = assetFile.getParentFile(); + + if (!parentDir.exists()) { + Files.createDirectories(parentDir.toPath()); + } + + Files.copy(contents, assetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + + final Asset asset = new MockAsset(assetId, ownerId, assetName, assetFile, null); + assets.put(assetId, asset); + return asset; + } + + @Override + public Optional<Asset> getAsset(final String id) { + return Optional.ofNullable(assets.get(id)); + } + + @Override + public List<Asset> getAssets(final String ownerId) { + final List<Asset> ownerAssets = new ArrayList<>(); + for (final Asset asset : assets.values()) { + if (asset.getOwnerIdentifier().equals(ownerId)) { + ownerAssets.add(asset); + } + } + return ownerAssets; + } + + @Override + public Asset createMissingAsset(final String ownerId, final String assetName) { + final String assetId = UUID.randomUUID().toString(); + final File file = getFile(ownerId, assetId, assetName); + final Asset asset = new MockAsset(assetId, ownerId, assetName, file, null); + assets.put(assetId, asset); + return asset; + } + + @Override + public Optional<Asset> deleteAsset(final String id) { + final Asset removed = assets.remove(id); + if (removed != null && removed.getFile().exists()) { + try { + Files.delete(removed.getFile().toPath()); + } catch (final IOException e) { + throw new RuntimeException("Failed to delete asset " + id + " from storage file " + removed.getFile().getAbsolutePath(), e); + } + } + + return Optional.ofNullable(removed); + } + + private File getFile(final String ownerId, final String assetId, final String assetName) { + final Path parentPath = assetStorageLocation.toPath().normalize(); + final Path assetPath = Path.of(ownerId, assetId, assetName).normalize(); + return parentPath.resolve(assetPath).toFile(); + } + + /** + * A simple Asset implementation for the mock. + */ + private static class MockAsset implements Asset { + private final String identifier; + private final String ownerIdentifier; + private final String name; + private final File file; + private final String digest; + + MockAsset(final String identifier, final String ownerIdentifier, final String name, final File file, final String digest) { + this.identifier = identifier; + this.ownerIdentifier = ownerIdentifier; + this.name = name; + this.file = file; + this.digest = digest; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + @Deprecated + public String getParameterContextIdentifier() { + return ownerIdentifier; + } + + @Override + public String getOwnerIdentifier() { + return ownerIdentifier; + } + + @Override + public String getName() { + return name; + } + + @Override + public File getFile() { + return file; + } + + @Override + public Optional<String> getDigest() { + return Optional.ofNullable(digest); + } + } +} diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java index 38a61ae9bd..685e84748a 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java @@ -18,6 +18,8 @@ package org.apache.nifi.mock.connector.server; import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.asset.Asset; +import org.apache.nifi.asset.AssetManager; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; @@ -25,6 +27,7 @@ import org.apache.nifi.bundle.BundleDetails; import org.apache.nifi.cluster.ClusterDetailsFactory; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.AssetReference; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; import org.apache.nifi.components.connector.ConnectorState; @@ -58,7 +61,9 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.validation.RuleViolationsManager; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -265,6 +270,30 @@ public class StandardConnectorMockServer implements ConnectorMockServer { return new SecretReference(ConnectorTestRunner.SECRET_PROVIDER_ID, ConnectorTestRunner.SECRET_PROVIDER_NAME, secretName, secretName); } + @Override + public AssetReference addAsset(final File file) { + final AssetManager assetManager = flowController.getConnectorAssetManager(); + + try (final InputStream inputStream = new FileInputStream(file)) { + final Asset asset = assetManager.createAsset(CONNECTOR_ID, file.getName(), inputStream); + return new AssetReference(Set.of(asset.getIdentifier())); + } catch (final IOException e) { + throw new RuntimeException("Failed to add asset from file: " + file.getAbsolutePath(), e); + } + } + + @Override + public AssetReference addAsset(final String assetName, final InputStream contents) { + final AssetManager assetManager = flowController.getConnectorAssetManager(); + + try { + final Asset asset = assetManager.createAsset(CONNECTOR_ID, assetName, contents); + return new AssetReference(Set.of(asset.getIdentifier())); + } catch (final IOException e) { + throw new RuntimeException("Failed to add asset: " + assetName, e); + } + } + @Override public void startConnector() { initialFlowFileTransferCounts = connectorNode.getFlowFileTransferCounts(); diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/test/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManagerTest.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/test/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManagerTest.java new file mode 100644 index 0000000000..2952178764 --- /dev/null +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/test/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManagerTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mock.connector.server; + +import org.apache.nifi.asset.Asset; +import org.apache.nifi.asset.AssetManagerInitializationContext; +import org.apache.nifi.asset.AssetReferenceLookup; +import org.apache.nifi.controller.NodeTypeProvider; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class MockConnectorAssetManagerTest { + + private static final String CONNECTOR_ID = "test-connector"; + private static final String ASSET_NAME = "test-asset.txt"; + private static final String ASSET_CONTENTS = "test-contents"; + + @TempDir + private Path tempDir; + + private MockConnectorAssetManager assetManager; + + @BeforeEach + void setUp() { + assetManager = new MockConnectorAssetManager(); + final AssetManagerInitializationContext context = createInitializationContext(tempDir); + assetManager.initialize(context); + } + + @Test + void testCreateAsset() throws IOException { + final InputStream contents = new ByteArrayInputStream(ASSET_CONTENTS.getBytes(StandardCharsets.UTF_8)); + + final Asset asset = assetManager.createAsset(CONNECTOR_ID, ASSET_NAME, contents); + + assertNotNull(asset); + assertNotNull(asset.getIdentifier()); + assertEquals(CONNECTOR_ID, asset.getOwnerIdentifier()); + assertEquals(ASSET_NAME, asset.getName()); + assertTrue(asset.getFile().exists()); + assertEquals(ASSET_CONTENTS, Files.readString(asset.getFile().toPath(), StandardCharsets.UTF_8)); + } + + @Test + void testCreateAssetGeneratesUniqueIdentifiers() throws IOException { + final InputStream contents1 = new ByteArrayInputStream(ASSET_CONTENTS.getBytes(StandardCharsets.UTF_8)); + final InputStream contents2 = new ByteArrayInputStream(ASSET_CONTENTS.getBytes(StandardCharsets.UTF_8)); + + final Asset asset1 = assetManager.createAsset(CONNECTOR_ID, ASSET_NAME, contents1); + final Asset asset2 = assetManager.createAsset(CONNECTOR_ID, ASSET_NAME, contents2); + + assertNotEquals(asset1.getIdentifier(), asset2.getIdentifier()); + } + + @Test + void testSaveAsset() throws IOException { + final String assetId = "specific-asset-id"; + final InputStream contents = new ByteArrayInputStream(ASSET_CONTENTS.getBytes(StandardCharsets.UTF_8)); + + final Asset asset = assetManager.saveAsset(CONNECTOR_ID, assetId, ASSET_NAME, contents); + + assertNotNull(asset); + assertEquals(assetId, asset.getIdentifier()); + assertEquals(CONNECTOR_ID, asset.getOwnerIdentifier()); + assertEquals(ASSET_NAME, asset.getName()); + assertTrue(asset.getFile().exists()); + assertEquals(ASSET_CONTENTS, Files.readString(asset.getFile().toPath(), StandardCharsets.UTF_8)); + } + + @Test + void testSaveAssetOverwritesExisting() throws IOException { + final String assetId = "overwrite-asset-id"; + final String originalContents = "original"; + final String updatedContents = "updated"; + + final InputStream originalStream = new ByteArrayInputStream(originalContents.getBytes(StandardCharsets.UTF_8)); + final Asset originalAsset = assetManager.saveAsset(CONNECTOR_ID, assetId, ASSET_NAME, originalStream); + assertEquals(originalContents, Files.readString(originalAsset.getFile().toPath(), StandardCharsets.UTF_8)); + + final InputStream updatedStream = new ByteArrayInputStream(updatedContents.getBytes(StandardCharsets.UTF_8)); + final Asset updatedAsset = assetManager.saveAsset(CONNECTOR_ID, assetId, ASSET_NAME, updatedStream); + assertEquals(updatedContents, Files.readString(updatedAsset.getFile().toPath(), StandardCharsets.UTF_8)); + } + + @Test + void testGetAsset() throws IOException { + final InputStream contents = new ByteArrayInputStream(ASSET_CONTENTS.getBytes(StandardCharsets.UTF_8)); + final Asset createdAsset = assetManager.createAsset(CONNECTOR_ID, ASSET_NAME, contents); + + final Optional<Asset> retrieved = assetManager.getAsset(createdAsset.getIdentifier()); + + assertTrue(retrieved.isPresent()); + assertEquals(createdAsset.getIdentifier(), retrieved.get().getIdentifier()); + assertEquals(createdAsset.getName(), retrieved.get().getName()); + assertEquals(createdAsset.getOwnerIdentifier(), retrieved.get().getOwnerIdentifier()); + } + + @Test + void testGetAssetReturnsEmptyForNonExistent() { + final Optional<Asset> retrieved = assetManager.getAsset("non-existent-id"); + + assertTrue(retrieved.isEmpty()); + } + + @Test + void testGetAssets() throws IOException { + final InputStream contents1 = new ByteArrayInputStream("content1".getBytes(StandardCharsets.UTF_8)); + final InputStream contents2 = new ByteArrayInputStream("content2".getBytes(StandardCharsets.UTF_8)); + final InputStream contents3 = new ByteArrayInputStream("content3".getBytes(StandardCharsets.UTF_8)); + + assetManager.createAsset(CONNECTOR_ID, "asset1.txt", contents1); + assetManager.createAsset(CONNECTOR_ID, "asset2.txt", contents2); + assetManager.createAsset("other-connector", "asset3.txt", contents3); + + final List<Asset> connectorAssets = assetManager.getAssets(CONNECTOR_ID); + assertEquals(2, connectorAssets.size()); + assertTrue(connectorAssets.stream().allMatch(a -> a.getOwnerIdentifier().equals(CONNECTOR_ID))); + + final List<Asset> otherAssets = assetManager.getAssets("other-connector"); + assertEquals(1, otherAssets.size()); + assertEquals("other-connector", otherAssets.get(0).getOwnerIdentifier()); + } + + @Test + void testGetAssetsReturnsEmptyListForUnknownOwner() { + final List<Asset> assets = assetManager.getAssets("unknown-owner"); + + assertNotNull(assets); + assertTrue(assets.isEmpty()); + } + + @Test + void testCreateMissingAsset() { + final Asset missingAsset = assetManager.createMissingAsset(CONNECTOR_ID, ASSET_NAME); + + assertNotNull(missingAsset); + assertNotNull(missingAsset.getIdentifier()); + assertEquals(CONNECTOR_ID, missingAsset.getOwnerIdentifier()); + assertEquals(ASSET_NAME, missingAsset.getName()); + assertFalse(missingAsset.getFile().exists()); + assertTrue(missingAsset.getDigest().isEmpty()); + } + + @Test + void testCreateMissingAssetCanBeRetrieved() { + final Asset missingAsset = assetManager.createMissingAsset(CONNECTOR_ID, ASSET_NAME); + + final Optional<Asset> retrieved = assetManager.getAsset(missingAsset.getIdentifier()); + assertTrue(retrieved.isPresent()); + assertEquals(missingAsset.getIdentifier(), retrieved.get().getIdentifier()); + + final List<Asset> ownerAssets = assetManager.getAssets(CONNECTOR_ID); + assertEquals(1, ownerAssets.size()); + } + + @Test + void testDeleteAsset() throws IOException { + final InputStream contents = new ByteArrayInputStream(ASSET_CONTENTS.getBytes(StandardCharsets.UTF_8)); + final Asset createdAsset = assetManager.createAsset(CONNECTOR_ID, ASSET_NAME, contents); + final Path assetFilePath = createdAsset.getFile().toPath(); + assertTrue(Files.exists(assetFilePath)); + + final Optional<Asset> deleted = assetManager.deleteAsset(createdAsset.getIdentifier()); + + assertTrue(deleted.isPresent()); + assertEquals(createdAsset.getIdentifier(), deleted.get().getIdentifier()); + assertFalse(Files.exists(assetFilePath)); + assertTrue(assetManager.getAsset(createdAsset.getIdentifier()).isEmpty()); + } + + @Test + void testDeleteAssetReturnsEmptyForNonExistent() { + final Optional<Asset> deleted = assetManager.deleteAsset("non-existent-id"); + + assertTrue(deleted.isEmpty()); + } + + @Test + void testInitializeCreatesStorageDirectory() { + final Path newStorageDir = tempDir.resolve("new-storage"); + assertFalse(Files.exists(newStorageDir)); + + final MockConnectorAssetManager newManager = new MockConnectorAssetManager(); + final AssetManagerInitializationContext context = createInitializationContext(newStorageDir); + newManager.initialize(context); + + assertTrue(Files.exists(newStorageDir)); + assertTrue(Files.isDirectory(newStorageDir)); + } + + private AssetManagerInitializationContext createInitializationContext(final Path storageDirectory) { + final Map<String, String> properties = new HashMap<>(); + properties.put("directory", storageDirectory.toAbsolutePath().toString()); + + return new AssetManagerInitializationContext() { + @Override + public AssetReferenceLookup getAssetReferenceLookup() { + return null; + } + + @Override + public Map<String, String> getProperties() { + return properties; + } + + @Override + public NodeTypeProvider getNodeTypeProvider() { + return null; + } + }; + } +} diff --git a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java index 71636a5af1..d2b7e79f0b 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java @@ -20,6 +20,7 @@ package org.apache.nifi.mock.connector; import org.apache.nifi.NiFiServer; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.AssetReference; import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.SecretReference; @@ -165,6 +166,16 @@ public class StandardConnectorTestRunner implements ConnectorTestRunner, Closeab mockServer.addSecret(name, value); } + @Override + public AssetReference addAsset(final File file) { + return mockServer.addAsset(file); + } + + @Override + public AssetReference addAsset(final String assetName, final InputStream contents) { + return mockServer.addAsset(assetName, contents); + } + @Override public void startConnector() { mockServer.startConnector(); diff --git a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/resources/nifi.properties b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/resources/nifi.properties index 071f1cca3c..44deabc71a 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/resources/nifi.properties +++ b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/resources/nifi.properties @@ -94,6 +94,9 @@ nifi.nar.persistence.provider.properties.directory=target/nifi-storage/nar_repos # Asset Management nifi.asset.manager.properties.directory=target/nifi-storage/assets +# Connector Asset Manager +nifi.connector.asset.manager.implementation=org.apache.nifi.mock.connector.server.MockConnectorAssetManager + # Secrets Manager nifi.secrets.manager.implementation=org.apache.nifi.mock.connector.server.secrets.ConnectorTestRunnerSecretsManager
