unsubscribe ________________________________ From: nsabo...@apache.org <nsabo...@apache.org> Sent: Friday, July 14, 2023 4:34 PM To: comm...@nifi.apache.org <comm...@nifi.apache.org> Subject: [nifi] branch support/nifi-1.x updated: NIFI-11758: Added FileResourceService and used it in PutAzure*Storage processors for local file upload - Renamed classes from DataUpload to ResourceTransfer and updated references - Disabled testNonReadableFile()...
This is an automated email from the ASF dual-hosted git repository. nsabonyi pushed a commit to branch support/nifi-1.x in repository https://urldefense.com/v3/__https://gitbox.apache.org/repos/asf/nifi.git__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nqp1Jki1Y$ The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 2f9bb2095c NIFI-11758: Added FileResourceService and used it in PutAzure*Storage processors for local file upload - Renamed classes from DataUpload to ResourceTransfer and updated references - Disabled testNonReadableFile() on Windows due to Posix permissions - Replaced utility methods with functional handling of FileResource - Corrected FlowFile InputStream access using Optional.orElseGet() 2f9bb2095c is described below commit 2f9bb2095c18b8f1e84e3effcba7e60d93f63aa5 Author: Peter Turcsanyi <turcsa...@apache.org> AuthorDate: Mon Jul 3 13:58:50 2023 +0200 NIFI-11758: Added FileResourceService and used it in PutAzure*Storage processors for local file upload - Renamed classes from DataUpload to ResourceTransfer and updated references - Disabled testNonReadableFile() on Windows due to Posix permissions - Replaced utility methods with functional handling of FileResource - Corrected FlowFile InputStream access using Optional.orElseGet() Backported - Updated 2.0.0-SNAPSHOT references to 1.23.0-SNAPSHOT - Replaced InputStream.readAllBytes() with IOUtils.toByteArray(inputStream) to address Java8 incompatibility - Replaced Optional.isEmpty() with Optional.isPresent() to address Java8 incompatibility This closes: #7458 Co-authored-by: David Handermann <exceptionfact...@apache.org> Signed-off-by: Nandor Soma Abonyi <nsabo...@apache.org> (cherry picked from commit 437995b75a4237b7bf9d304f7693cf3b53371a9f) --- nifi-assembly/pom.xml | 6 + .../nifi-azure-processors/pom.xml | 15 ++ .../azure/storage/PutAzureBlobStorage_v12.java | 22 +- .../azure/storage/PutAzureDataLakeStorage.java | 46 +++-- .../storage/AbstractAzureBlobStorage_v12IT.java | 2 - .../azure/storage/AbstractAzureStorageIT.java | 2 + .../azure/storage/ITPutAzureBlobStorage_v12.java | 37 ++++ .../azure/storage/ITPutAzureDataLakeStorage.java | 47 ++++- .../azure/storage/TestPutAzureDataLakeStorage.java | 20 -- .../nifi-resource-transfer/pom.xml | 44 ++++ .../transfer/ResourceTransferProperties.java | 45 +++++ .../transfer/ResourceTransferSource.java | 48 +++++ .../processors/transfer/ResourceTransferUtils.java | 60 ++++++ .../transfer/ResourceTransferUtilsTest.java | 105 ++++++++++ nifi-nar-bundles/nifi-extension-utils/pom.xml | 1 + .../nifi-file-resource-service-api/pom.xml | 34 ++++ .../fileresource/service/api/FileResource.java | 49 +++++ .../service/api/FileResourceService.java | 35 ++++ .../nifi-file-resource-service-nar/pom.xml | 42 ++++ .../src/main/resources/META-INF/LICENSE | 224 +++++++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 12 ++ .../nifi-file-resource-service/pom.xml | 53 +++++ .../service/StandardFileResourceService.java | 113 +++++++++++ .../org.apache.nifi.controller.ControllerService | 15 ++ .../service/StandardFileResourceServiceTest.java | 171 ++++++++++++++++ .../nifi-file-resource-service-bundle/pom.xml | 33 +++ .../nifi-standard-services-api-nar/pom.xml | 5 + nifi-nar-bundles/nifi-standard-services/pom.xml | 2 + nifi-nar-bundles/pom.xml | 6 + 29 files changed, 1245 insertions(+), 49 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 44a9e5374a..871a3c7d2b 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -1001,6 +1001,12 @@ language governing permissions and limitations under the License. --> <version>1.23.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service-nar</artifactId> + <version>1.23.0-SNAPSHOT</version> + <type>nar</type> + </dependency> <!-- AspectJ library needed by the Java Agent used for native library loading (see bootstrap.conf) --> <dependency> <groupId>org.aspectj</groupId> diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index a65269254f..dbbcf8093d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -37,6 +37,15 @@ <artifactId>nifi-service-utils</artifactId> <version>1.23.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-resource-transfer</artifactId> + <version>1.23.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service-api</artifactId> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-record-serialization-service-api</artifactId> @@ -180,6 +189,12 @@ <version>1.23.0-SNAPSHOT</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service</artifactId> + <version>1.23.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java index a7dfca488f..2e7e8bd80d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java @@ -38,6 +38,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.fileresource.service.api.FileResource; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -46,6 +47,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.ClientSideEncryptionSupport; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy; import java.io.InputStream; @@ -56,6 +58,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM; @@ -82,6 +85,9 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_MIME_TYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_PRIMARY_URI; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_TIMESTAMP; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; +import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource; @Tags({"azure", "microsoft", "cloud", "storage", "blob"}) @SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class}) @@ -129,6 +135,8 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 impl CREATE_CONTAINER, CONFLICT_RESOLUTION, BLOB_NAME, + RESOURCE_TRANSFER_SOURCE, + FILE_RESOURCE_SERVICE, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE, CSE_KEY_TYPE, CSE_KEY_ID, @@ -157,6 +165,8 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 impl final boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean(); final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue(); final AzureStorageConflictResolutionStrategy conflictResolution = AzureStorageConflictResolutionStrategy.valueOf(context.getProperty(CONFLICT_RESOLUTION).getValue()); + final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue()); + final Optional<FileResource> fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes()); long startNanos = System.nanoTime(); try { @@ -183,13 +193,17 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 impl blobRequestConditions.setIfNoneMatch("*"); } - try (InputStream rawIn = session.read(flowFile)) { - final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn)); + final long transferSize = fileResourceFound.map(FileResource::getSize).orElse(flowFile.getSize()); + final FlowFile sourceFlowFile = flowFile; + try (InputStream sourceInputStream = fileResourceFound + .map(FileResource::getInputStream) + .orElseGet(() -> session.read(sourceFlowFile)) + ) { + final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(sourceInputStream)); blobParallelUploadOptions.setRequestConditions(blobRequestConditions); Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE); BlockBlobItem blob = response.getValue(); - long length = flowFile.getSize(); - applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length); + applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, transferSize); applyBlobMetadata(attributes, blobClient); if (ignore) { attributes.put(ATTR_NAME_IGNORED, "false"); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 284bb031e2..ab6014abfd 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -32,22 +32,24 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.fileresource.service.api.FileResource; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.util.StringUtils; import java.io.BufferedInputStream; -import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -61,6 +63,9 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; +import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) @SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) @@ -106,6 +111,8 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess FILE, BASE_TEMPORARY_PATH, CONFLICT_RESOLUTION, + RESOURCE_TRANSFER_SOURCE, + FILE_RESOURCE_SERVICE, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE )); @@ -135,14 +142,30 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess final String tempFilePrefix = UUID.randomUUID().toString(); final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory); final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); + final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue()); + final Optional<FileResource> fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes()); + final long transferSize = fileResourceFound.map(FileResource::getSize).orElse(flowFile.getSize()); final DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true); - appendContent(flowFile, tempFileClient, session); + if (transferSize > 0) { + final FlowFile sourceFlowFile = flowFile; + try ( + final InputStream inputStream = new BufferedInputStream( + fileResourceFound.map(FileResource::getInputStream) + .orElseGet(() -> session.read(sourceFlowFile)) + ) + ) { + uploadContent(tempFileClient, inputStream, transferSize); + } catch (final Exception e) { + removeTempFile(tempFileClient); + throw e; + } + } createDirectoryIfNotExists(directoryClient); final String fileUrl = renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution); if (fileUrl != null) { - final Map<String, String> attributes = createAttributeMap(flowFile, fileSystem, originalDirectory, fileName, fileUrl); + final Map<String, String> attributes = createAttributeMap(fileSystem, originalDirectory, fileName, fileUrl, transferSize); flowFile = session.putAllAttributes(flowFile, attributes); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); @@ -162,13 +185,13 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess return storageClient.getFileSystemClient(fileSystem); } - private Map<String, String> createAttributeMap(FlowFile flowFile, String fileSystem, String originalDirectory, String fileName, String fileUrl) { + private Map<String, String> createAttributeMap(String fileSystem, String originalDirectory, String fileName, String fileUrl, long length) { final Map<String, String> attributes = new HashMap<>(); attributes.put(ATTR_NAME_FILESYSTEM, fileSystem); attributes.put(ATTR_NAME_DIRECTORY, originalDirectory); attributes.put(ATTR_NAME_FILENAME, fileName); attributes.put(ATTR_NAME_PRIMARY_URI, fileUrl); - attributes.put(ATTR_NAME_LENGTH, String.valueOf(flowFile.getSize())); + attributes.put(ATTR_NAME_LENGTH, String.valueOf(length)); return attributes; } @@ -178,19 +201,6 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess } } - //Visible for testing - void appendContent(FlowFile flowFile, DataLakeFileClient fileClient, ProcessSession session) throws IOException { - final long length = flowFile.getSize(); - if (length > 0) { - try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) { - uploadContent(fileClient, bufferedIn, length); - } catch (Exception e) { - removeTempFile(fileClient); - throw e; - } - } - } - //Visible for testing static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) { long chunkStart = 0; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java index c9716e80d3..cc77fa2c1d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java @@ -67,8 +67,6 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag protected static final String EL_CONTAINER_NAME = "az.containername"; protected static final String EL_BLOB_NAME = "az.blobname"; - protected static final byte[] EMPTY_CONTENT = new byte[0]; - private static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container"; private BlobServiceClient storageClient; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java index ab50166c63..3197d9a475 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java @@ -37,6 +37,8 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; public abstract class AbstractAzureStorageIT { + protected static final byte[] EMPTY_CONTENT = new byte[0]; + private static final Properties CREDENTIALS_CONFIG; private static final Properties PROXY_CONFIG; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java index 78e2a65719..713d2f4c8e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java @@ -19,10 +19,14 @@ package org.apache.nifi.processors.azure.storage; import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.models.BlobErrorCode; +import org.apache.nifi.fileresource.service.StandardFileResourceService; +import org.apache.nifi.fileresource.service.api.FileResourceService; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.azure.ClientSideEncryptionSupport; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod; +import org.apache.nifi.processors.transfer.ResourceTransferProperties; +import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy; @@ -30,7 +34,10 @@ import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.Set; @@ -247,6 +254,36 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA); } + @Test + public void testPutBlobFromLocalFile() throws Exception { + String attributeName = "file.path"; + + String serviceId = FileResourceService.class.getSimpleName(); + FileResourceService service = new StandardFileResourceService(); + runner.addControllerService(serviceId, service); + runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName)); + runner.enableControllerService(service); + + runner.setProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue()); + runner.setProperty(ResourceTransferProperties.FILE_RESOURCE_SERVICE, serviceId); + + Path tempFilePath = Files.createTempFile("ITPutAzureBlobStorage_v12_testPutBlobFromLocalFile_", ""); + Files.write(tempFilePath, BLOB_DATA); + + Map<String, String> attributes = new HashMap<>(); + attributes.put(attributeName, tempFilePath.toString()); + + runProcessor(EMPTY_CONTENT, attributes); + + runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureBlobStorage_v12.REL_SUCCESS).get(0); + assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), BLOB_NAME); + assertFlowFileResultBlobAttributes(flowFile, BLOB_DATA.length); + + assertAzureBlob(getContainerName(), BLOB_NAME, BLOB_DATA); + assertProvenanceEvents(); + } + private void runProcessor(byte[] data) { runProcessor(data, Collections.emptyMap()); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 711b9f6d5e..171e3a5e75 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -19,7 +19,11 @@ package org.apache.nifi.processors.azure.storage; import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.fileresource.service.StandardFileResourceService; +import org.apache.nifi.fileresource.service.api.FileResourceService; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.transfer.ResourceTransferProperties; +import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; @@ -27,6 +31,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -246,6 +252,33 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } + @Test + public void testPutFileFromLocalFile() throws Exception { + String attributeName = "file.path"; + + String serviceId = FileResourceService.class.getSimpleName(); + FileResourceService service = new StandardFileResourceService(); + runner.addControllerService(serviceId, service); + runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName)); + runner.enableControllerService(service); + + runner.setProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue()); + runner.setProperty(ResourceTransferProperties.FILE_RESOURCE_SERVICE, serviceId); + + Path tempFilePath = Files.createTempFile("ITPutAzureDataLakeStorage_testPutFileFromLocalFile_", ""); + Files.write(tempFilePath, FILE_DATA); + + Map<String, String> attributes = new HashMap<>(); + attributes.put(attributeName, tempFilePath.toString()); + + runProcessor(EMPTY_CONTENT, attributes); + + MockFlowFile flowFile = assertFlowFile(EMPTY_CONTENT); + assertFlowFileAttributes(flowFile, DIRECTORY, FILE_NAME, FILE_DATA.length); + assertAzureFile(DIRECTORY, FILE_NAME, FILE_DATA); + assertProvenanceEvents(); + } + private Map<String, String> createAttributesMap() { Map<String, String> attributes = new HashMap<>(); @@ -286,11 +319,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { private void assertFlowFile(byte[] fileData, String fileName, String directory) throws Exception { MockFlowFile flowFile = assertFlowFile(fileData); - flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName); - flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, directory); - flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName); - - flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length)); + assertFlowFileAttributes(flowFile, directory, fileName, fileData.length); } private MockFlowFile assertFlowFile(byte[] fileData) throws Exception { @@ -303,6 +332,14 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { return flowFile; } + private void assertFlowFileAttributes(MockFlowFile flowFile, String directory, String fileName, int fileLength) { + flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName); + flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, directory); + flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName); + + flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileLength)); + } + private void assertAzureFile(String directory, String fileName, byte[] fileData) { DataLakeFileClient fileClient; if (StringUtils.isNotEmpty(directory)) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureDataLakeStorage.java index e7a22802e7..5ab571a5e1 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureDataLakeStorage.java @@ -20,24 +20,18 @@ import com.azure.core.http.rest.Response; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.models.DataLakeRequestConditions; import com.azure.storage.file.datalake.models.DataLakeStorageException; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockComponentLog; import org.junit.jupiter.api.Test; -import java.io.InputStream; - import static org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage.FAIL_RESOLUTION; import static org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage.IGNORE_RESOLUTION; import static org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage.REPLACE_RESOLUTION; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -47,20 +41,6 @@ public class TestPutAzureDataLakeStorage { private static final String FILE_NAME = "file1"; - @Test - public void testPutFileButFailedToAppend() { - final PutAzureDataLakeStorage processor = new PutAzureDataLakeStorage(); - final DataLakeFileClient fileClient = mock(DataLakeFileClient.class); - final ProcessSession session = mock(ProcessSession.class); - final FlowFile flowFile = mock(FlowFile.class); - - when(flowFile.getSize()).thenReturn(1L); - doThrow(IllegalArgumentException.class).when(fileClient).append(any(InputStream.class), anyLong(), anyLong()); - - assertThrows(IllegalArgumentException.class, () -> processor.appendContent(flowFile, fileClient, session)); - verify(fileClient).delete(); - } - @Test public void testPutFileButFailedToRenameWithUnrecoverableError() { final PutAzureDataLakeStorage processor = new PutAzureDataLakeStorage(); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/pom.xml new file mode 100644 index 0000000000..172dbdb9fa --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="https://urldefense.com/v3/__http://maven.apache.org/POM/4.0.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nq8EenJP4$ " + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://urldefense.com/v3/__http://maven.apache.org/xsd/maven-4.0.0.xsd__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nqxhz_2iU$ "> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-extension-utils</artifactId> + <version>1.23.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-resource-transfer</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>1.23.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service-api</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferProperties.java new file mode 100644 index 0000000000..bebb586171 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferProperties.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.transfer; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.fileresource.service.api.FileResourceService; + +import static org.apache.nifi.processors.transfer.ResourceTransferSource.FLOWFILE_CONTENT; + +public class ResourceTransferProperties { + + public static final PropertyDescriptor RESOURCE_TRANSFER_SOURCE = new PropertyDescriptor.Builder() + .name("Resource Transfer Source") + .displayName("Resource Transfer Source") + .description("The source of the content to be transferred") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .allowableValues(ResourceTransferSource.class) + .defaultValue(FLOWFILE_CONTENT.getValue()) + .build(); + + public static final PropertyDescriptor FILE_RESOURCE_SERVICE = new PropertyDescriptor.Builder() + .name("File Resource Service") + .displayName("File Resource Service") + .description("File Resource Service providing access to the local resource to be transferred") + .identifiesControllerService(FileResourceService.class) + .required(true) + .dependsOn(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE) + .build(); +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferSource.java b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferSource.java new file mode 100644 index 0000000000..9ec8a4fdec --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferSource.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.transfer; + +import org.apache.nifi.components.DescribedValue; + +public enum ResourceTransferSource implements DescribedValue { + + FLOWFILE_CONTENT("FlowFile Content", "The content of the incoming FlowFile provides the source for transfer"), + FILE_RESOURCE_SERVICE("File Resource Service", "The File Resource Service provides the source for transfer"); + + private final String displayName; + private final String description; + + ResourceTransferSource(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferUtils.java new file mode 100644 index 0000000000..b4b4ad2080 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/main/java/org/apache/nifi/processors/transfer/ResourceTransferUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.transfer; + +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; + +public final class ResourceTransferUtils { + + private ResourceTransferUtils() {} + + /** + * Get File Resource from File Resource Service based on provided Source otherwise return empty + * + * @param resourceTransferSource type of the data upload + * @param context process context with properties + * @param attributes Map of attributes passed to File Resource Service + * @return Optional FileResource retrieved from FileResourceService if Source is File Resource Service, otherwise empty + * @throws ProcessException Thrown if Source is File Resource but FileResourceService is not provided in the context + */ + public static Optional<FileResource> getFileResource(final ResourceTransferSource resourceTransferSource, final ProcessContext context, final Map<String, String> attributes) { + final Optional<FileResource> resource; + + if (resourceTransferSource == ResourceTransferSource.FILE_RESOURCE_SERVICE) { + final PropertyValue property = context.getProperty(FILE_RESOURCE_SERVICE); + if (property == null || !property.isSet()) { + throw new ProcessException("File Resource Service required but not configured"); + } + final FileResourceService fileResourceService = property.asControllerService(FileResourceService.class); + final FileResource fileResource = fileResourceService.getFileResource(attributes); + resource = Optional.ofNullable(fileResource); + } else { + resource = Optional.empty(); + } + + return resource; + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/test/java/org/apache/nifi/processors/transfer/ResourceTransferUtilsTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/test/java/org/apache/nifi/processors/transfer/ResourceTransferUtilsTest.java new file mode 100644 index 0000000000..858f1fe89e --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-resource-transfer/src/test/java/org/apache/nifi/processors/transfer/ResourceTransferUtilsTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.transfer; + +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.InputStream; +import java.util.Collections; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mock.Strictness.LENIENT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ResourceTransferUtilsTest { + + private static final long FLOW_FILE_SIZE = 2; + private static final long FILE_RESOURCE_SIZE = 5; + + @Mock + private InputStream flowFileInputstream; + + @Mock + private InputStream fileRessourceInputstream; + + @Mock + private ProcessContext context; + + @Mock(strictness = LENIENT) + private ProcessSession session; + + @Mock(strictness = LENIENT) + private FlowFile flowFile; + + private FileResource fileResource; + + @BeforeEach + void setUp() { + when(session.read(flowFile)).thenReturn(flowFileInputstream); + + when(flowFile.getSize()).thenReturn(FLOW_FILE_SIZE); + + fileResource = new FileResource(fileRessourceInputstream, FILE_RESOURCE_SIZE); + } + + @Test + void testGetFileResourceWhenDataUploadSourceIsLocalFile() { + final FileResourceService service = mock(FileResourceService.class); + when(service.getFileResource(any())).thenReturn(fileResource); + + final PropertyValue property = mock(PropertyValue.class); + when(property.isSet()).thenReturn(true); + when(property.asControllerService(FileResourceService.class)).thenReturn(service); + + when(context.getProperty(ResourceTransferProperties.FILE_RESOURCE_SERVICE)).thenReturn(property); + + final Optional<FileResource> fileResourceFound = ResourceTransferUtils.getFileResource(ResourceTransferSource.FILE_RESOURCE_SERVICE, context, Collections.emptyMap()); + + assertTrue(fileResourceFound.isPresent()); + assertSame(fileResource, fileResourceFound.get()); + } + + @Test + void testGetFileResourceWhenDataUploadSourceIsLocalFileButNoServiceConfigured() { + assertThrows(ProcessException.class, () -> ResourceTransferUtils.getFileResource(ResourceTransferSource.FILE_RESOURCE_SERVICE, context, Collections.emptyMap())); + } + + @Test + void testGetFileResourceWhenDataUploadSourceIsFlowFileContent() { + final Optional<FileResource> fileResourceFound = ResourceTransferUtils.getFileResource(ResourceTransferSource.FLOWFILE_CONTENT, context, Collections.emptyMap()); + + assertFalse(fileResourceFound.isPresent()); + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml index 04a7bb88aa..0408d486dc 100644 --- a/nifi-nar-bundles/nifi-extension-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml @@ -41,6 +41,7 @@ <module>nifi-put-pattern</module> <module>nifi-record-utils</module> <module>nifi-reporting-utils</module> + <module>nifi-resource-transfer</module> <module>nifi-service-utils</module> <module>nifi-syslog-utils</module> <module>nifi-conflict-resolution</module> diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/pom.xml new file mode 100644 index 0000000000..8d043f9fd7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/pom.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="https://urldefense.com/v3/__http://maven.apache.org/POM/4.0.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nq8EenJP4$ " + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://urldefense.com/v3/__http://maven.apache.org/xsd/maven-4.0.0.xsd__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nqxhz_2iU$ "> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services</artifactId> + <version>1.23.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-file-resource-service-api</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/src/main/java/org/apache/nifi/fileresource/service/api/FileResource.java b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/src/main/java/org/apache/nifi/fileresource/service/api/FileResource.java new file mode 100644 index 0000000000..47e5d48259 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/src/main/java/org/apache/nifi/fileresource/service/api/FileResource.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.fileresource.service.api; + +import java.io.InputStream; +import java.util.Objects; + +/** + * File Resource abstraction wraps an InputStream provides associated size in bytes + */ +public class FileResource { + + private final InputStream inputStream; + + private final long size; + + /** + * File Resource constructor with required Input Stream and associated size in bytes + * + * @param inputStream Input Stream required + * @param size Size of stream in bytes + */ + public FileResource(final InputStream inputStream, final long size) { + this.inputStream = Objects.requireNonNull(inputStream, "Input Stream required"); + this.size = size; + } + + public InputStream getInputStream() { + return inputStream; + } + + public long getSize() { + return size; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/src/main/java/org/apache/nifi/fileresource/service/api/FileResourceService.java b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/src/main/java/org/apache/nifi/fileresource/service/api/FileResourceService.java new file mode 100644 index 0000000000..fbd3603bb2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-api/src/main/java/org/apache/nifi/fileresource/service/api/FileResourceService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.fileresource.service.api; + +import org.apache.nifi.controller.ControllerService; + +import java.util.Map; + +/** + * Controller Service providing a file resource (locally available file). + */ +public interface FileResourceService extends ControllerService { + + /** + * Returns FileResource object representing the local file. + * + * @param attributes used to resolve expression language, typically FlowFile attributes + * @return FileResource object + */ + FileResource getFileResource(Map<String, String> attributes); +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/pom.xml new file mode 100644 index 0000000000..b4c365a274 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="https://urldefense.com/v3/__http://maven.apache.org/POM/4.0.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nq8EenJP4$ " + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://urldefense.com/v3/__http://maven.apache.org/xsd/maven-4.0.0.xsd__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nqxhz_2iU$ "> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service-bundle</artifactId> + <version>1.23.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-file-resource-service-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <version>1.23.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service</artifactId> + <version>1.23.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..15fc9ab15d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,224 @@ + + Apache License + Version 2.0, January 2004 + https://urldefense.com/v3/__http://www.apache.org/licenses/__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqOL-twHM$ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + + The binary distribution of this product bundles 'Jakarta Activation API 1.2.1' and 'Jakarta XML Binding API 2.3.3' modules under an EDL v1.0 license + + Copyright (c) 2007, Eclipse Foundation, Inc. and its licensors. + + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + Neither the name of the Eclipse Foundation, Inc. nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..0bf9a71fc3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,12 @@ +nifi-file-resource-service-nar +Copyright 2023 The Apache Software Foundation + + +************************ +Eclipse Distribution License 1.0 +************************ + +The following binary components are provided under the Eclipse Distribution License 1.0. + + (EDL 1.0) Jakarta Activation API (jakarta.activation:jakarta.activation-api:jar:1.2.1) + (EDL 1.0) Jakarta XML Binding API (jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/pom.xml new file mode 100644 index 0000000000..70de40ff9a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/pom.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="https://urldefense.com/v3/__http://maven.apache.org/POM/4.0.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nq8EenJP4$ " + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://urldefense.com/v3/__http://maven.apache.org/xsd/maven-4.0.0.xsd__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nqxhz_2iU$ "> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service-bundle</artifactId> + <version>1.23.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-file-resource-service</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>1.23.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/main/java/org/apache/nifi/fileresource/service/StandardFileResourceService.java b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/main/java/org/apache/nifi/fileresource/service/StandardFileResourceService.java new file mode 100644 index 0000000000..24e543d090 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/main/java/org/apache/nifi/fileresource/service/StandardFileResourceService.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.fileresource.service; + +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.Restriction; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.RequiredPermission; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceReference; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +@Tags({"file", "resource"}) +@CapabilityDescription("Provides a file resource for other components. The file needs to be available locally by Nifi (e.g. local disk or mounted storage). " + + "NiFi needs to have read permission to the file.") +@Restricted( + restrictions = { + @Restriction( + requiredPermission = RequiredPermission.READ_FILESYSTEM, + explanation = "Provides operator the ability to read from any file that NiFi has access to.") + } +) +public class StandardFileResourceService extends AbstractControllerService implements FileResourceService { + + public static final PropertyDescriptor FILE_PATH = new PropertyDescriptor.Builder() + .name("file-path") + .displayName("File Path") + .description("Path to a file that can be accessed locally.") + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .defaultValue("${absolute.path}/${filename}") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList( + FILE_PATH + ); + + private volatile PropertyContext context; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.context = context; + } + + @OnDisabled + public void onDisabled() { + this.context = null; + } + + @Override + public FileResource getFileResource(Map<String, String> attributes) { + final ResourceReference resourceReference = context.getProperty(FILE_PATH).evaluateAttributeExpressions(attributes).asResource(); + + if (resourceReference == null) { + throw new ProcessException("Evaluated path is empty. Path expression: " + context.getProperty(FILE_PATH).getValue()); + } + + final File file = resourceReference.asFile(); + + if (!file.exists() || !file.isFile()) { + throw new ProcessException("Path does not exist or it is not a file: " + file.getAbsolutePath()); + } + + try { + final InputStream inputStream = resourceReference.read(); + final long size = file.length(); + + return new FileResource(inputStream, size); + } catch (IOException e) { + throw new ProcessException("File cannot be read: " + file.getAbsolutePath(), e); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..120ef490dd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.fileresource.service.StandardFileResourceService diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/test/java/org/apache/nifi/fileresource/service/StandardFileResourceServiceTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/test/java/org/apache/nifi/fileresource/service/StandardFileResourceServiceTest.java new file mode 100644 index 0000000000..452cda2e07 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/nifi-file-resource-service/src/test/java/org/apache/nifi/fileresource/service/StandardFileResourceServiceTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.fileresource.service; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermission; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class StandardFileResourceServiceTest { + + private static final String TEST_NAME = StandardFileResourceServiceTest.class.getSimpleName(); + + private static final byte[] TEST_DATA = "nifi".getBytes(); + + private static Path directoryPath; + + private TestRunner runner; + + private StandardFileResourceService service; + + @BeforeAll + static void createTestDirectory() throws IOException { + directoryPath = Files.createTempDirectory(TEST_NAME); + } + + @AfterAll + static void removeTestDirectory() throws IOException { + FileUtils.deleteDirectory(directoryPath.toFile()); + } + + @BeforeEach + void setUpRunner() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + service = new StandardFileResourceService(); + runner.addControllerService(TEST_NAME, service); + } + + @Test + void testValidFile() throws IOException { + final Path filePath = createTestFile("validFile"); + + setUpService(filePath); + + final FileResource fileResource = service.getFileResource(Collections.emptyMap()); + + assertFileResource(fileResource); + } + + @Test + void testValidFileUsingEL() throws IOException { + final Path filePath = createTestFile("validFileUsingEL"); + + final Map<String, String> attributes = setUpServiceWithEL(filePath); + + final FileResource fileResource = service.getFileResource(attributes); + + assertFileResource(fileResource); + } + + @Test + void testValidFileUsingELButMissingAttribute() throws IOException { + final Path filePath = createTestFile("testValidFileUsingELButMissingAttribute"); + + runner.setValidateExpressionUsage(false); + + setUpServiceWithEL(filePath); + + assertThrows(ProcessException.class, () -> service.getFileResource(Collections.emptyMap())); + } + + @Test + void testNonExistingFile() { + final Path filePath = directoryPath.resolve("nonExistingFile"); + + final Map<String, String> attributes = setUpServiceWithEL(filePath); + + assertThrows(ProcessException.class, () -> service.getFileResource(attributes)); + } + + @Test + void testNonRegularFile() { + final Path filePath = directoryPath; + + final Map<String, String> attributes = setUpServiceWithEL(filePath); + + assertThrows(ProcessException.class, () -> service.getFileResource(attributes)); + } + + @DisabledOnOs(OS.WINDOWS) + @Test + void testNonReadableFile() throws IOException { + final Path filePath = createTestFile("nonReadableFile"); + + Files.setPosixFilePermissions(filePath, EnumSet.noneOf(PosixFilePermission.class)); + + final Map<String, String> attributes = setUpServiceWithEL(filePath); + + assertThrows(ProcessException.class, () -> service.getFileResource(attributes)); + } + + private Path createTestFile(final String filenamePrefix) throws IOException { + final Path filePath = Files.createTempFile(directoryPath, filenamePrefix, ""); + Files.write(filePath, TEST_DATA); + return filePath; + } + + private void setUpService(final Path filePath) { + setUpService(filePath.toString()); + } + + private void setUpService(final String filePath) { + runner.setProperty(service, StandardFileResourceService.FILE_PATH, filePath); + runner.enableControllerService(service); + } + + private Map<String, String> setUpServiceWithEL(final Path filePath) { + final String attributeName = "file.path"; + Map<String, String> attributes = Collections.singletonMap(attributeName, filePath.toString()); + + setUpService(String.format("${%s}", attributeName)); + + return attributes; + } + + private void assertFileResource(final FileResource fileResource) throws IOException { + assertNotNull(fileResource); + assertEquals(TEST_DATA.length, fileResource.getSize()); + try (final InputStream inputStream = fileResource.getInputStream()) { + assertArrayEquals(TEST_DATA, IOUtils.toByteArray(inputStream)); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/pom.xml new file mode 100644 index 0000000000..fbf6f707e5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-file-resource-service-bundle/pom.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + https://urldefense.com/v3/__http://www.apache.org/licenses/LICENSE-2.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2NqYz8afYY$ + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="https://urldefense.com/v3/__http://maven.apache.org/POM/4.0.0__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nq8EenJP4$ " + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://urldefense.com/v3/__http://maven.apache.org/xsd/maven-4.0.0.xsd__;!!Mih3wA!H97dhLRTtoZ_STvCS5bqv8Fh7HKiBC9qvQhQ3qtROoWdwmdlzZKBOBuvJa-ViUnRhgj1nEvkP2Nqxhz_2iU$ "> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services</artifactId> + <version>1.23.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-file-resource-service-bundle</artifactId> + <packaging>pom</packaging> + + <modules> + <module>nifi-file-resource-service</module> + <module>nifi-file-resource-service-nar</module> + </modules> +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml index 95da2897da..6c7f0c5b98 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml @@ -139,5 +139,10 @@ <version>1.23.0-SNAPSHOT</version> <scope>compile</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service-api</artifactId> + <scope>compile</scope> + </dependency> </dependencies> </project> diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml index 1020210797..cb7b82bf2b 100644 --- a/nifi-nar-bundles/nifi-standard-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/pom.xml @@ -57,5 +57,7 @@ <module>nifi-kerberos-user-service-api</module> <module>nifi-kerberos-user-service-bundle</module> <module>nifi-web-client-provider-bundle</module> + <module>nifi-file-resource-service-api</module> + <module>nifi-file-resource-service-bundle</module> </modules> </project> diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 384881fa99..e7c0f37e8b 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -378,6 +378,12 @@ <version>1.23.0-SNAPSHOT</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-resource-service-api</artifactId> + <version>1.23.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> <!-- Managed dependency versions applicable to all modules --> <dependency> <groupId>org.apache.nifi</groupId>