This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a21c2544ad NIFI-12801 Add local file upload option in PutHDFS processor
a21c2544ad is described below
commit a21c2544ad6c961b7ba8330481d462981e5e0d7d
Author: shubhamsharma <[email protected]>
AuthorDate: Thu Feb 15 10:31:09 2024 -0800
NIFI-12801 Add local file upload option in PutHDFS processor
This closes #8415.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi-hdfs-processors/pom.xml | 15 +++++
.../org/apache/nifi/processors/hadoop/PutHDFS.java | 14 ++++-
.../apache/nifi/processors/hadoop/PutHDFSTest.java | 64 ++++++++++++++++++++++
3 files changed, 91 insertions(+), 2 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index a405c449a2..22eede1958 100644
--- a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -112,6 +112,21 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-resource-transfer</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-file-resource-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-file-resource-service</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
diff --git
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index f2d1ed111e..8acce88fd2 100644
---
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -48,6 +48,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
@@ -80,6 +81,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.io.InputStream;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
/**
* This processor copies FlowFiles to HDFS.
@@ -260,6 +266,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
props.add(REMOTE_GROUP);
props.add(COMPRESSION_CODEC);
props.add(IGNORE_LOCALITY);
+ props.add(RESOURCE_TRANSFER_SOURCE);
+ props.add(FILE_RESOURCE_SERVICE);
return props;
}
@@ -402,7 +410,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
// Write FlowFile to temp file on HDFS
final StopWatch stopWatch = new StopWatch(true);
- session.read(putFlowFile, in -> {
+ final ResourceTransferSource resourceTransferSource =
context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
+ try (final InputStream in =
getFileResource(resourceTransferSource, context, flowFile.getAttributes())
+ .map(FileResource::getInputStream).orElseGet(() ->
session.read(flowFile))) {
OutputStream fos = null;
Path createdFile = null;
try {
@@ -463,7 +473,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
}
fos = null;
}
- });
+ }
stopWatch.stop();
final String dataRate =
stopWatch.calculateDataRate(putFlowFile.getSize());
final long millis =
stopWatch.getDuration(TimeUnit.MILLISECONDS);
diff --git
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 7219817297..e64a6ae7ee 100644
---
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.fileresource.service.StandardFileResourceService;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.transfer.ResourceTransferProperties;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
@@ -46,12 +50,17 @@ import javax.security.sasl.SaslException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.Collections;
import static org.apache.nifi.processors.hadoop.CompressionType.GZIP;
import static org.apache.nifi.processors.hadoop.CompressionType.NONE;
@@ -703,6 +712,61 @@ public class PutHDFSTest {
mockFileSystem.delete(p, true);
}
+ @Test
+ public void testPutFileFromLocalFile() throws Exception {
+ final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+ final PutHDFS proc = new TestablePutHDFS(kerberosProperties,
spyFileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION,
PutHDFS.REPLACE_RESOLUTION);
+ runner.setProperty(PutHDFS.WRITING_STRATEGY, PutHDFS.SIMPLE_WRITE);
+
+ //Adding StandardFileResourceService controller service
+ String attributeName = "file.path";
+
+ String serviceId = FileResourceService.class.getSimpleName();
+ FileResourceService service = new StandardFileResourceService();
+ byte[] FILE_DATA = "0123456789".getBytes(StandardCharsets.UTF_8);
+ byte[] EMPTY_CONTENT = new byte[0];
+ runner.addControllerService(serviceId, service);
+ runner.setProperty(service, StandardFileResourceService.FILE_PATH,
String.format("${%s}", attributeName));
+ runner.enableControllerService(service);
+
+
runner.setProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE,
ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+ runner.setProperty(ResourceTransferProperties.FILE_RESOURCE_SERVICE,
serviceId);
+ java.nio.file.Path tempFilePath =
Files.createTempFile("PutHDFS_testPutFileFromLocalFile_", "");
+ Files.write(tempFilePath, FILE_DATA);
+
+ Map<String, String> attributes = new HashMap<>();
+
+ attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
+ attributes.put(attributeName, tempFilePath.toString());
+ runner.enqueue(EMPTY_CONTENT, attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutHDFS.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals(EMPTY_CONTENT);
+
+ //assert HDFS File and Directory structures
+ assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" +
FILE_NAME)));
+ assertEquals(FILE_NAME,
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ assertEquals(TARGET_DIRECTORY,
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+ assertEquals("true",
flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+ // If it runs with a real HDFS, the protocol will be "hdfs://", but
with a local filesystem, just assert the filename.
+
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY
+ "/" + FILE_NAME));
+
+ verify(spyFileSystem, Mockito.never()).rename(any(Path.class),
any(Path.class));
+
+ //assert Provenance events
+ Set<ProvenanceEventType> expectedEventTypes =
Collections.singleton(ProvenanceEventType.SEND);
+ Set<ProvenanceEventType> actualEventTypes =
runner.getProvenanceEvents().stream()
+ .map(ProvenanceEventRecord::getEventType)
+ .collect(Collectors.toSet());
+ assertEquals(expectedEventTypes, actualEventTypes);
+
+ }
+
@Test
public void testPutFileWithCreateException() throws IOException {
mockFileSystem = new MockFileSystem();