Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1219#discussion_r88030680
--- Diff:
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
---
@@ -205,177 +207,186 @@ public void onScheduled(ProcessContext context)
throws Exception {
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- FlowFile flowFile = session.get();
+ final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
- final Configuration configuration = getConfiguration();
final FileSystem hdfs = getFileSystem();
- if (configuration == null || hdfs == null) {
+ final Configuration configuration = getConfiguration();
+ final UserGroupInformation ugi = getUserGroupInformation();
+
+ if (configuration == null || hdfs == null || ugi == null) {
getLogger().error("HDFS not configured properly");
session.transfer(flowFile, REL_FAILURE);
context.yield();
return;
}
- Path tempDotCopyFile = null;
- try {
- final String dirValue =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final Path configuredRootDirPath = new Path(dirValue);
-
- final String conflictResponse =
context.getProperty(CONFLICT_RESOLUTION).getValue();
-
- final Double blockSizeProp =
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
- final long blockSize = blockSizeProp != null ?
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
+ ugi.doAs(new PrivilegedAction<Object>() {
+ @Override
+ public Object run() {
+ Path tempDotCopyFile = null;
+ FlowFile putFlowFile = flowFile;
+ try {
+ final String dirValue =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
+ final Path configuredRootDirPath = new Path(dirValue);
- final Double bufferSizeProp =
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
- final int bufferSize = bufferSizeProp != null ?
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
+ final String conflictResponse =
context.getProperty(CONFLICT_RESOLUTION).getValue();
- final Integer replicationProp =
context.getProperty(REPLICATION_FACTOR).asInteger();
- final short replication = replicationProp != null ?
replicationProp.shortValue() : hdfs
- .getDefaultReplication(configuredRootDirPath);
+ final Double blockSizeProp =
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
+ final long blockSize = blockSizeProp != null ?
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
- final CompressionCodec codec = getCompressionCodec(context,
configuration);
+ final Double bufferSizeProp =
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+ final int bufferSize = bufferSizeProp != null ?
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
- final String filename = codec != null
- ? flowFile.getAttribute(CoreAttributes.FILENAME.key())
+ codec.getDefaultExtension()
- : flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final Integer replicationProp =
context.getProperty(REPLICATION_FACTOR).asInteger();
+ final short replication = replicationProp != null ?
replicationProp.shortValue() : hdfs
+ .getDefaultReplication(configuredRootDirPath);
- final Path tempCopyFile = new Path(configuredRootDirPath, "."
+ filename);
- final Path copyFile = new Path(configuredRootDirPath,
filename);
+ final CompressionCodec codec =
getCompressionCodec(context, configuration);
- // Create destination directory if it does not exist
- try {
- if
(!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
- throw new IOException(configuredRootDirPath.toString()
+ " already exists and is not a directory");
- }
- } catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(configuredRootDirPath)) {
- throw new IOException(configuredRootDirPath.toString()
+ " could not be created");
- }
- changeOwner(context, hdfs, configuredRootDirPath);
- }
+ final String filename = codec != null
+ ?
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) +
codec.getDefaultExtension()
+ :
putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
- final boolean destinationExists = hdfs.exists(copyFile);
+ final Path tempCopyFile = new
Path(configuredRootDirPath, "." + filename);
+ final Path copyFile = new Path(configuredRootDirPath,
filename);
- // If destination file already exists, resolve that based on
processor configuration
- if (destinationExists) {
- switch (conflictResponse) {
- case REPLACE_RESOLUTION:
- if (hdfs.delete(copyFile, false)) {
- getLogger().info("deleted {} in order to
replace with the contents of {}",
- new Object[]{copyFile, flowFile});
+ // Create destination directory if it does not exist
+ try {
+ if
(!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
+ throw new
IOException(configuredRootDirPath.toString() + " already exists and is not a
directory");
}
- break;
- case IGNORE_RESOLUTION:
- session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("transferring {} to success
because file with same name already exists",
- new Object[]{flowFile});
- return;
- case FAIL_RESOLUTION:
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- getLogger().warn("penalizing {} and routing to
failure because file with same name already exists",
- new Object[]{flowFile});
- return;
- default:
- break;
- }
- }
+ } catch (FileNotFoundException fe) {
+ if (!hdfs.mkdirs(configuredRootDirPath)) {
+ throw new
IOException(configuredRootDirPath.toString() + " could not be created");
+ }
+ changeOwner(context, hdfs, configuredRootDirPath);
+ }
- // Write FlowFile to temp file on HDFS
- final StopWatch stopWatch = new StopWatch(true);
- session.read(flowFile, new InputStreamCallback() {
+ final boolean destinationExists =
hdfs.exists(copyFile);
- @Override
- public void process(InputStream in) throws IOException {
- OutputStream fos = null;
- Path createdFile = null;
- try {
- if
(conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists)
{
- fos = hdfs.append(copyFile, bufferSize);
- } else {
- fos = hdfs.create(tempCopyFile, true,
bufferSize, replication, blockSize);
- }
- if (codec != null) {
- fos = codec.createOutputStream(fos);
+ // If destination file already exists, resolve that
based on processor configuration
+ if (destinationExists) {
+ switch (conflictResponse) {
+ case REPLACE_RESOLUTION:
+ if (hdfs.delete(copyFile, false)) {
+ getLogger().info("deleted {} in order
to replace with the contents of {}",
+ new Object[]{copyFile,
putFlowFile});
+ }
+ break;
+ case IGNORE_RESOLUTION:
+ session.transfer(putFlowFile, REL_SUCCESS);
+ getLogger().info("transferring {} to
success because file with same name already exists",
+ new Object[]{putFlowFile});
+ return null;
+ case FAIL_RESOLUTION:
+
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
+ getLogger().warn("penalizing {} and
routing to failure because file with same name already exists",
+ new Object[]{putFlowFile});
+ return null;
+ default:
+ break;
}
- createdFile = tempCopyFile;
- BufferedInputStream bis = new
BufferedInputStream(in);
- StreamUtils.copy(bis, fos);
- bis = null;
- fos.flush();
- } finally {
- try {
- if (fos != null) {
- fos.close();
- }
- } catch (RemoteException re) {
- // when talking to remote HDFS clusters, we
don't notice problems until fos.close()
- if (createdFile != null) {
+ }
+
+ // Write FlowFile to temp file on HDFS
+ final StopWatch stopWatch = new StopWatch(true);
+ session.read(putFlowFile, new InputStreamCallback() {
+
+ @Override
+ public void process(InputStream in) throws
IOException {
+ OutputStream fos = null;
+ Path createdFile = null;
+ try {
+ if
(conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists)
{
+ fos = hdfs.append(copyFile,
bufferSize);
+ } else {
+ fos = hdfs.create(tempCopyFile, true,
bufferSize, replication, blockSize);
+ }
+ if (codec != null) {
+ fos = codec.createOutputStream(fos);
+ }
+ createdFile = tempCopyFile;
+ BufferedInputStream bis = new
BufferedInputStream(in);
+ StreamUtils.copy(bis, fos);
+ bis = null;
+ fos.flush();
+ } finally {
try {
- hdfs.delete(createdFile, false);
+ if (fos != null) {
+ fos.close();
+ }
+ } catch (RemoteException re) {
+ // when talking to remote HDFS
clusters, we don't notice problems until fos.close()
+ if (createdFile != null) {
+ try {
+ hdfs.delete(createdFile,
false);
+ } catch (Throwable ignore) {
+ }
--- End diff --
I can add a logging statement, this wasn't really changed though, it was
existing code that I simply wrapped in doAs(...)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---