This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a21c2544ad NIFI-12801 Add local file upload option in PutHDFS processor
a21c2544ad is described below

commit a21c2544ad6c961b7ba8330481d462981e5e0d7d
Author: shubhamsharma <[email protected]>
AuthorDate: Thu Feb 15 10:31:09 2024 -0800

    NIFI-12801 Add local file upload option in PutHDFS processor
    
    This closes #8415.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../nifi-hdfs-processors/pom.xml                   | 15 +++++
 .../org/apache/nifi/processors/hadoop/PutHDFS.java | 14 ++++-
 .../apache/nifi/processors/hadoop/PutHDFSTest.java | 64 ++++++++++++++++++++++
 3 files changed, 91 insertions(+), 2 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index a405c449a2..22eede1958 100644
--- a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -112,6 +112,21 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kerberos-user-service-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-resource-transfer</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-file-resource-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-file-resource-service</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>com.github.ben-manes.caffeine</groupId>
             <artifactId>caffeine</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index f2d1ed111e..8acce88fd2 100644
--- 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -48,6 +48,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.fileresource.service.api.FileResource;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.RequiredPermission;
 import org.apache.nifi.components.ValidationContext;
@@ -80,6 +81,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.io.InputStream;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
 
 /**
  * This processor copies FlowFiles to HDFS.
@@ -260,6 +266,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
         props.add(REMOTE_GROUP);
         props.add(COMPRESSION_CODEC);
         props.add(IGNORE_LOCALITY);
+        props.add(RESOURCE_TRANSFER_SOURCE);
+        props.add(FILE_RESOURCE_SERVICE);
         return props;
     }
 
@@ -402,7 +410,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
                     // Write FlowFile to temp file on HDFS
                     final StopWatch stopWatch = new StopWatch(true);
-                    session.read(putFlowFile, in -> {
+                    final ResourceTransferSource resourceTransferSource = 
context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
+                    try (final InputStream in = 
getFileResource(resourceTransferSource, context, flowFile.getAttributes())
+                            .map(FileResource::getInputStream).orElseGet(() -> 
session.read(flowFile))) {
                         OutputStream fos = null;
                         Path createdFile = null;
                         try {
@@ -463,7 +473,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                             }
                             fos = null;
                         }
-                    });
+                    }
                     stopWatch.stop();
                     final String dataRate = 
stopWatch.calculateDataRate(putFlowFile.getSize());
                     final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
diff --git 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 7219817297..e64a6ae7ee 100644
--- 
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.fileresource.service.StandardFileResourceService;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.transfer.ResourceTransferProperties;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
@@ -46,12 +50,17 @@ import javax.security.sasl.SaslException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.Collections;
 
 import static org.apache.nifi.processors.hadoop.CompressionType.GZIP;
 import static org.apache.nifi.processors.hadoop.CompressionType.NONE;
@@ -703,6 +712,61 @@ public class PutHDFSTest {
         mockFileSystem.delete(p, true);
     }
 
+    @Test
+    public void testPutFileFromLocalFile() throws Exception {
+        final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+        final PutHDFS proc = new TestablePutHDFS(kerberosProperties, 
spyFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, 
PutHDFS.REPLACE_RESOLUTION);
+        runner.setProperty(PutHDFS.WRITING_STRATEGY, PutHDFS.SIMPLE_WRITE);
+
+        //Adding StandardFileResourceService controller service
+        String attributeName = "file.path";
+
+        String serviceId = FileResourceService.class.getSimpleName();
+        FileResourceService service = new StandardFileResourceService();
+        byte[] FILE_DATA = "0123456789".getBytes(StandardCharsets.UTF_8);
+        byte[] EMPTY_CONTENT = new byte[0];
+        runner.addControllerService(serviceId, service);
+        runner.setProperty(service, StandardFileResourceService.FILE_PATH, 
String.format("${%s}", attributeName));
+        runner.enableControllerService(service);
+
+        
runner.setProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE, 
ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+        runner.setProperty(ResourceTransferProperties.FILE_RESOURCE_SERVICE, 
serviceId);
+        java.nio.file.Path tempFilePath = 
Files.createTempFile("PutHDFS_testPutFileFromLocalFile_", "");
+        Files.write(tempFilePath, FILE_DATA);
+
+        Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
+        attributes.put(attributeName, tempFilePath.toString());
+        runner.enqueue(EMPTY_CONTENT, attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutHDFS.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(EMPTY_CONTENT);
+
+        //assert HDFS File and Directory structures
+        assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + 
FILE_NAME)));
+        assertEquals(FILE_NAME, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals(TARGET_DIRECTORY, 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+        assertEquals("true", 
flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
+        
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY
 + "/" + FILE_NAME));
+
+        verify(spyFileSystem, Mockito.never()).rename(any(Path.class), 
any(Path.class));
+
+        //assert Provenance events
+        Set<ProvenanceEventType> expectedEventTypes = 
Collections.singleton(ProvenanceEventType.SEND);
+        Set<ProvenanceEventType> actualEventTypes = 
runner.getProvenanceEvents().stream()
+                .map(ProvenanceEventRecord::getEventType)
+                .collect(Collectors.toSet());
+        assertEquals(expectedEventTypes, actualEventTypes);
+
+    }
+
     @Test
     public void testPutFileWithCreateException() throws IOException {
         mockFileSystem = new MockFileSystem();

Reply via email to