This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 39ffa4a NIFI-8737: Fixed incorrect provenance data in HDFS processors
when Directory property is inconsistent with core-site.xml
39ffa4a is described below
commit 39ffa4a8ac21b83f0b269828d4b162131e393903
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Jun 23 17:04:08 2021 +0200
NIFI-8737: Fixed incorrect provenance data in HDFS processors when
Directory property is inconsistent with core-site.xml
---
.../processors/hadoop/AbstractHadoopProcessor.java | 23 +++++++++
.../nifi/processors/hadoop/AbstractPutHDFS.java | 25 +++++----
.../processors/hadoop/AbstractFetchHDFSRecord.java | 2 +-
.../processors/hadoop/AbstractPutHDFSRecord.java | 3 +-
.../apache/nifi/processors/hadoop/DeleteHDFS.java | 2 +-
.../apache/nifi/processors/hadoop/FetchHDFS.java | 2 +-
.../org/apache/nifi/processors/hadoop/GetHDFS.java | 4 +-
.../nifi/processors/hadoop/GetHDFSFileInfo.java | 3 +-
.../apache/nifi/processors/hadoop/ListHDFS.java | 4 +-
.../apache/nifi/processors/hadoop/MoveHDFS.java | 21 ++++----
.../org/apache/nifi/processors/hadoop/PutHDFS.java | 12 ++---
.../nifi/processors/hadoop/AbstractHadoopTest.java | 59 ++++++++++++++++++++++
12 files changed, 117 insertions(+), 43 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 1cce996..2838a38 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -34,6 +34,7 @@ import
org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
@@ -606,4 +607,26 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
}
}
+ protected Path getNormalizedPath(ProcessContext context,
PropertyDescriptor property) {
+ return getNormalizedPath(context, property, null);
+ }
+
+ protected Path getNormalizedPath(ProcessContext context,
PropertyDescriptor property, FlowFile flowFile) {
+ final String propertyValue =
context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
+ final Path path = new Path(propertyValue);
+ final URI uri = path.toUri();
+
+ final URI fileSystemUri = getFileSystem().getUri();
+
+ if (uri.getScheme() != null) {
+ if (!uri.getScheme().equals(fileSystemUri.getScheme()) ||
!uri.getAuthority().equals(fileSystemUri.getAuthority())) {
+ getLogger().warn("The filesystem component of the URI
configured in the '{}' property ({}) does not match the filesystem URI from the
Hadoop configuration file ({}) " +
+ "and will be ignored.", property.getDisplayName(),
uri, fileSystemUri);
+ }
+
+ return new Path(uri.getPath());
+ } else {
+ return path;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
index 44fae65..b86b7dc 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
@@ -100,13 +100,12 @@ public abstract class AbstractPutHDFS extends
AbstractHadoopProcessor {
Path tempDotCopyFile = null;
FlowFile putFlowFile = flowFile;
try {
- final String dirValue =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
- final Path configuredRootDirPath = new Path(dirValue);
+ final Path dirPath = getNormalizedPath(context, DIRECTORY,
putFlowFile);
final String conflictResponse =
context.getProperty(CONFLICT_RESOLUTION).getValue();
- final long blockSize = getBlockSize(context, session,
putFlowFile);
+ final long blockSize = getBlockSize(context, session,
putFlowFile, dirPath);
final int bufferSize = getBufferSize(context, session,
putFlowFile);
- final short replication = getReplication(context, session,
putFlowFile);
+ final short replication = getReplication(context, session,
putFlowFile, dirPath);
final CompressionCodec codec =
getCompressionCodec(context, configuration);
@@ -114,19 +113,19 @@ public abstract class AbstractPutHDFS extends
AbstractHadoopProcessor {
?
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) +
codec.getDefaultExtension()
:
putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
- final Path tempCopyFile = new Path(configuredRootDirPath,
"." + filename);
- final Path copyFile = new Path(configuredRootDirPath,
filename);
+ final Path tempCopyFile = new Path(dirPath, "." +
filename);
+ final Path copyFile = new Path(dirPath, filename);
// Create destination directory if it does not exist
try {
- if
(!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
- throw new
IOException(configuredRootDirPath.toString() + " already exists and is not a
directory");
+ if (!hdfs.getFileStatus(dirPath).isDirectory()) {
+ throw new IOException(dirPath.toString() + "
already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(configuredRootDirPath)) {
- throw new
IOException(configuredRootDirPath.toString() + " could not be created");
+ if (!hdfs.mkdirs(dirPath)) {
+ throw new IOException(dirPath.toString() + " could
not be created");
}
- changeOwner(context, hdfs, configuredRootDirPath,
flowFile);
+ changeOwner(context, hdfs, dirPath, flowFile);
}
final boolean destinationExists = hdfs.exists(copyFile);
@@ -274,7 +273,7 @@ public abstract class AbstractPutHDFS extends
AbstractHadoopProcessor {
/**
* Returns with the expected block size.
*/
- protected abstract long getBlockSize(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile);
+ protected abstract long getBlockSize(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile, final Path dirPath);
/**
* Returns with the expected buffer size.
@@ -284,7 +283,7 @@ public abstract class AbstractPutHDFS extends
AbstractHadoopProcessor {
/**
* Returns with the expected replication factor.
*/
- protected abstract short getReplication(final ProcessContext context,
final ProcessSession session, final FlowFile flowFile);
+ protected abstract short getReplication(final ProcessContext context,
final ProcessSession session, final FlowFile flowFile, final Path dirPath);
/**
* Returns if file system should ignore locality.
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 7248e8f..33e762f 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -179,7 +179,7 @@ public abstract class AbstractFetchHDFSRecord extends
AbstractHadoopProcessor {
FlowFile child = null;
final String filenameValue =
context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
try {
- final Path path = new Path(filenameValue);
+ final Path path = getNormalizedPath(context, FILENAME,
originalFlowFile);
final AtomicReference<Throwable> exceptionHolder = new
AtomicReference<>(null);
final AtomicReference<WriteResult> writeResult = new
AtomicReference<>();
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index 5ee54e3..a595128 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -273,10 +273,9 @@ public abstract class AbstractPutHDFSRecord extends
AbstractHadoopProcessor {
FlowFile putFlowFile = flowFile;
try {
final String filenameValue =
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
- final String directoryValue =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
// create the directory if it doesn't exist
- final Path directoryPath = new Path(directoryValue);
+ final Path directoryPath = getNormalizedPath(context,
DIRECTORY, putFlowFile);
createDirectory(fileSystem, directoryPath, remoteOwner,
remoteGroup);
// write to tempFile first and on success rename to destFile
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 8788085..9296507 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -145,7 +145,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
// We need a FlowFile to report provenance correctly.
final FlowFile finalFlowFile = originalFlowFile != null ?
originalFlowFile : session.create();
- final String fileOrDirectoryName =
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(finalFlowFile).getValue();
+ final String fileOrDirectoryName = getNormalizedPath(context,
FILE_OR_DIRECTORY, finalFlowFile).toString();
final FileSystem fileSystem = getFileSystem();
final UserGroupInformation ugi = getUserGroupInformation();
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 26fd382..b60ee5b 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -125,7 +125,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
final Path path;
try {
- path = new Path(filenameValue);
+ path = getNormalizedPath(context, FILENAME, flowFile);
} catch (IllegalArgumentException e) {
getLogger().error("Failed to retrieve content from {} for {} due
to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason",
e.getMessage());
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index ba53377..3f08da0 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -345,7 +345,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
final Double bufferSizeProp =
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() :
conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
- final Path rootDir = new
Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+ final Path rootDir = getNormalizedPath(context, DIRECTORY);
final CompressionType compressionType =
CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
final boolean inferCompressionCodec = compressionType ==
CompressionType.AUTOMATIC;
@@ -427,7 +427,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
if (System.currentTimeMillis() >= nextPollTime &&
listingLock.tryLock()) {
try {
final FileSystem hdfs = getFileSystem();
- final Path directoryPath = new
Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+ final Path directoryPath = getNormalizedPath(context,
DIRECTORY);
if (!hdfs.exists(directoryPath)) {
context.yield();
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
index f2864f0..8383732 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
@@ -582,7 +582,8 @@ public class GetHDFSFileInfo extends
AbstractHadoopProcessor {
*/
protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context,
FlowFile ff) {
HDFSFileInfoRequest req = new HDFSFileInfoRequest();
-
req.setFullPath(context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue());
+ String fullPath = getNormalizedPath(context, FULL_PATH, ff).toString();
+ req.setFullPath(fullPath);
req.setRecursive(context.getProperty(RECURSE_SUBDIRS).asBoolean());
PropertyValue pv;
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 02209e3..583b0b8 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -402,8 +402,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
lastRunTimestamp = now;
- final String directory =
context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
-
// Ensure that we are using the latest listing information before we
try to perform a listing of HDFS files.
try {
final StateMap stateMap = session.getState(Scope.CLUSTER);
@@ -443,7 +441,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
final Set<FileStatus> statuses;
try {
- final Path rootPath = new Path(directory);
+ final Path rootPath = getNormalizedPath(context, DIRECTORY);
statuses = getStatuses(rootPath, recursive, hdfs,
createPathFilter(context), fileFilterMode);
getLogger().debug("Found a total of {} files in HDFS", new
Object[] {statuses.size()});
} catch (final IOException | IllegalArgumentException e) {
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index 76a03d0..acb4c85 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.hadoop;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -245,7 +246,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
Path inputPath;
try {
- inputPath = new Path(filenameValue);
+ inputPath = getNormalizedPath(context, INPUT_DIRECTORY_OR_FILE,
flowFile);
if (!hdfs.exists(inputPath)) {
throw new IOException("Input Directory or File does not exist
in HDFS");
}
@@ -348,9 +349,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
FlowFile flowFile = session.create(parentFlowFile);
try {
final String originalFilename = file.getName();
- final String outputDirValue =
context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions(parentFlowFile).getValue();
- final Path configuredRootOutputDirPath = new
Path(outputDirValue);
- final Path newFile = new
Path(configuredRootOutputDirPath, originalFilename);
+ final Path outputDirPath = getNormalizedPath(context,
OUTPUT_DIRECTORY, parentFlowFile);
+ final Path newFile = new Path(outputDirPath,
originalFilename);
final boolean destinationExists = hdfs.exists(newFile);
// If destination file already exists, resolve that
// based on processor configuration
@@ -382,15 +382,15 @@ public class MoveHDFS extends AbstractHadoopProcessor {
// Create destination directory if it does not exist
try {
- if
(!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
- throw new
IOException(configuredRootOutputDirPath.toString()
+ if
(!hdfs.getFileStatus(outputDirPath).isDirectory()) {
+ throw new IOException(outputDirPath.toString()
+ " already exists and is not a
directory");
}
} catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
- throw new
IOException(configuredRootOutputDirPath.toString() + " could not be created");
+ if (!hdfs.mkdirs(outputDirPath)) {
+ throw new IOException(outputDirPath.toString()
+ " could not be created");
}
- changeOwner(context, hdfs,
configuredRootOutputDirPath);
+ changeOwner(context, hdfs, outputDirPath);
}
boolean moved = false;
@@ -419,8 +419,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
final String hdfsPath = newFile.getParent().toString();
flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile,
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
- final String transitUri = (outputPath.startsWith("/"))
? "hdfs:/" + outputPath
- : "hdfs://" + outputPath;
+ final String transitUri = hdfs.getUri() +
StringUtils.prependIfMissing(outputPath, "/");
session.getProvenanceReporter().send(flowFile,
transitUri);
session.transfer(flowFile, REL_SUCCESS);
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index fef0805..00942e3 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -189,11 +189,9 @@ public class PutHDFS extends AbstractPutHDFS {
}
@Override
- protected long getBlockSize(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile) {
- final String dirValue =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final Path configuredRootDirPath = new Path(dirValue);
+ protected long getBlockSize(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Double blockSizeProp =
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
- return blockSizeProp != null ? blockSizeProp.longValue() :
getFileSystem().getDefaultBlockSize(configuredRootDirPath);
+ return blockSizeProp != null ? blockSizeProp.longValue() :
getFileSystem().getDefaultBlockSize(dirPath);
}
@Override
@@ -203,12 +201,10 @@ public class PutHDFS extends AbstractPutHDFS {
}
@Override
- protected short getReplication(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile) {
- final String dirValue =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final Path configuredRootDirPath = new Path(dirValue);
+ protected short getReplication(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Integer replicationProp =
context.getProperty(REPLICATION_FACTOR).asInteger();
return replicationProp != null ? replicationProp.shortValue() :
getFileSystem()
- .getDefaultReplication(configuredRootDirPath);
+ .getDefaultReplication(dirPath);
}
@Override
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index d60ba7f..0cd069e 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.hadoop;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.FileResourceReference;
@@ -39,11 +41,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -207,4 +212,58 @@ public class AbstractHadoopTest {
runner.setProperty(kerberosProperties.getKerberosKeytab(),
temporaryFile.getAbsolutePath());
runner.assertValid();
}
+
+ @Test
+ public void testGetNormalizedPathWithoutFileSystem() throws
URISyntaxException {
+ AbstractHadoopProcessor processor =
initProcessorForTestGetNormalizedPath("abfs://container1@storageaccount1");
+ TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor,
"/dir1");
+
+ Path path = processor.getNormalizedPath(runner.getProcessContext(),
AbstractHadoopProcessor.DIRECTORY);
+
+ assertEquals("/dir1", path.toString());
+ assertTrue(runner.getLogger().getWarnMessages().isEmpty());
+ }
+
+ @Test
+ public void testGetNormalizedPathWithCorrectFileSystem() throws
URISyntaxException {
+ AbstractHadoopProcessor processor =
initProcessorForTestGetNormalizedPath("abfs://container2@storageaccount2");
+ TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor,
"abfs://container2@storageaccount2/dir2");
+
+ Path path = processor.getNormalizedPath(runner.getProcessContext(),
AbstractHadoopProcessor.DIRECTORY);
+
+ assertEquals("/dir2", path.toString());
+ assertTrue(runner.getLogger().getWarnMessages().isEmpty());
+ }
+
+ @Test
+ public void testGetNormalizedPathWithIncorrectFileSystem() throws
URISyntaxException {
+ AbstractHadoopProcessor processor =
initProcessorForTestGetNormalizedPath("abfs://container3@storageaccount3");
+ TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor,
"abfs://container*@storageaccount*/dir3");
+
+ Path path = processor.getNormalizedPath(runner.getProcessContext(),
AbstractHadoopProcessor.DIRECTORY);
+
+ assertEquals("/dir3", path.toString());
+ assertFalse(runner.getLogger().getWarnMessages().isEmpty());
+ }
+
+ private AbstractHadoopProcessor
initProcessorForTestGetNormalizedPath(String fileSystemUri) throws
URISyntaxException {
+ final FileSystem fileSystem = mock(FileSystem.class);
+ when(fileSystem.getUri()).thenReturn(new URI(fileSystemUri));
+
+ final PutHDFS processor = new PutHDFS() {
+ @Override
+ protected FileSystem getFileSystem() {
+ return fileSystem;
+ }
+ };
+
+ return processor;
+ }
+
+ private TestRunner
initTestRunnerForTestGetNormalizedPath(AbstractHadoopProcessor processor,
String directory) throws URISyntaxException {
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(AbstractHadoopProcessor.DIRECTORY, directory);
+
+ return runner;
+ }
}