[
https://issues.apache.org/jira/browse/NIFI-3003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667443#comment-15667443
]
ASF GitHub Bot commented on NIFI-3003:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1219#discussion_r88040745
--- Diff:
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
---
@@ -205,177 +207,186 @@ public void onScheduled(ProcessContext context)
throws Exception {
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- FlowFile flowFile = session.get();
+ final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
- final Configuration configuration = getConfiguration();
final FileSystem hdfs = getFileSystem();
- if (configuration == null || hdfs == null) {
+ final Configuration configuration = getConfiguration();
+ final UserGroupInformation ugi = getUserGroupInformation();
+
+ if (configuration == null || hdfs == null || ugi == null) {
getLogger().error("HDFS not configured properly");
session.transfer(flowFile, REL_FAILURE);
context.yield();
return;
}
- Path tempDotCopyFile = null;
- try {
- final String dirValue =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final Path configuredRootDirPath = new Path(dirValue);
-
- final String conflictResponse =
context.getProperty(CONFLICT_RESOLUTION).getValue();
-
- final Double blockSizeProp =
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
- final long blockSize = blockSizeProp != null ?
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
+ ugi.doAs(new PrivilegedAction<Object>() {
+ @Override
+ public Object run() {
+ Path tempDotCopyFile = null;
+ FlowFile putFlowFile = flowFile;
+ try {
+ final String dirValue =
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
+ final Path configuredRootDirPath = new Path(dirValue);
- final Double bufferSizeProp =
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
- final int bufferSize = bufferSizeProp != null ?
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
+ final String conflictResponse =
context.getProperty(CONFLICT_RESOLUTION).getValue();
- final Integer replicationProp =
context.getProperty(REPLICATION_FACTOR).asInteger();
- final short replication = replicationProp != null ?
replicationProp.shortValue() : hdfs
- .getDefaultReplication(configuredRootDirPath);
+ final Double blockSizeProp =
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
+ final long blockSize = blockSizeProp != null ?
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
- final CompressionCodec codec = getCompressionCodec(context,
configuration);
+ final Double bufferSizeProp =
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+ final int bufferSize = bufferSizeProp != null ?
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
- final String filename = codec != null
- ? flowFile.getAttribute(CoreAttributes.FILENAME.key())
+ codec.getDefaultExtension()
- : flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final Integer replicationProp =
context.getProperty(REPLICATION_FACTOR).asInteger();
+ final short replication = replicationProp != null ?
replicationProp.shortValue() : hdfs
+ .getDefaultReplication(configuredRootDirPath);
- final Path tempCopyFile = new Path(configuredRootDirPath, "."
+ filename);
- final Path copyFile = new Path(configuredRootDirPath,
filename);
+ final CompressionCodec codec =
getCompressionCodec(context, configuration);
- // Create destination directory if it does not exist
- try {
- if
(!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
- throw new IOException(configuredRootDirPath.toString()
+ " already exists and is not a directory");
- }
- } catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(configuredRootDirPath)) {
- throw new IOException(configuredRootDirPath.toString()
+ " could not be created");
- }
- changeOwner(context, hdfs, configuredRootDirPath);
- }
+ final String filename = codec != null
+ ?
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) +
codec.getDefaultExtension()
+ :
putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
- final boolean destinationExists = hdfs.exists(copyFile);
+ final Path tempCopyFile = new
Path(configuredRootDirPath, "." + filename);
+ final Path copyFile = new Path(configuredRootDirPath,
filename);
- // If destination file already exists, resolve that based on
processor configuration
- if (destinationExists) {
- switch (conflictResponse) {
- case REPLACE_RESOLUTION:
- if (hdfs.delete(copyFile, false)) {
- getLogger().info("deleted {} in order to
replace with the contents of {}",
- new Object[]{copyFile, flowFile});
+ // Create destination directory if it does not exist
+ try {
+ if
(!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
+ throw new
IOException(configuredRootDirPath.toString() + " already exists and is not a
directory");
}
- break;
- case IGNORE_RESOLUTION:
- session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("transferring {} to success
because file with same name already exists",
- new Object[]{flowFile});
- return;
- case FAIL_RESOLUTION:
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- getLogger().warn("penalizing {} and routing to
failure because file with same name already exists",
- new Object[]{flowFile});
- return;
- default:
- break;
- }
- }
+ } catch (FileNotFoundException fe) {
+ if (!hdfs.mkdirs(configuredRootDirPath)) {
+ throw new
IOException(configuredRootDirPath.toString() + " could not be created");
+ }
+ changeOwner(context, hdfs, configuredRootDirPath);
+ }
- // Write FlowFile to temp file on HDFS
- final StopWatch stopWatch = new StopWatch(true);
- session.read(flowFile, new InputStreamCallback() {
+ final boolean destinationExists =
hdfs.exists(copyFile);
- @Override
- public void process(InputStream in) throws IOException {
- OutputStream fos = null;
- Path createdFile = null;
- try {
- if
(conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists)
{
- fos = hdfs.append(copyFile, bufferSize);
- } else {
- fos = hdfs.create(tempCopyFile, true,
bufferSize, replication, blockSize);
- }
- if (codec != null) {
- fos = codec.createOutputStream(fos);
+ // If destination file already exists, resolve that
based on processor configuration
+ if (destinationExists) {
+ switch (conflictResponse) {
+ case REPLACE_RESOLUTION:
+ if (hdfs.delete(copyFile, false)) {
+ getLogger().info("deleted {} in order
to replace with the contents of {}",
+ new Object[]{copyFile,
putFlowFile});
+ }
+ break;
+ case IGNORE_RESOLUTION:
+ session.transfer(putFlowFile, REL_SUCCESS);
+ getLogger().info("transferring {} to
success because file with same name already exists",
+ new Object[]{putFlowFile});
+ return null;
+ case FAIL_RESOLUTION:
+
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
+ getLogger().warn("penalizing {} and
routing to failure because file with same name already exists",
+ new Object[]{putFlowFile});
+ return null;
+ default:
+ break;
}
- createdFile = tempCopyFile;
- BufferedInputStream bis = new
BufferedInputStream(in);
- StreamUtils.copy(bis, fos);
- bis = null;
- fos.flush();
- } finally {
- try {
- if (fos != null) {
- fos.close();
- }
- } catch (RemoteException re) {
- // when talking to remote HDFS clusters, we
don't notice problems until fos.close()
- if (createdFile != null) {
+ }
+
+ // Write FlowFile to temp file on HDFS
+ final StopWatch stopWatch = new StopWatch(true);
+ session.read(putFlowFile, new InputStreamCallback() {
+
+ @Override
+ public void process(InputStream in) throws
IOException {
+ OutputStream fos = null;
+ Path createdFile = null;
+ try {
+ if
(conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists)
{
+ fos = hdfs.append(copyFile,
bufferSize);
+ } else {
+ fos = hdfs.create(tempCopyFile, true,
bufferSize, replication, blockSize);
+ }
+ if (codec != null) {
+ fos = codec.createOutputStream(fos);
+ }
+ createdFile = tempCopyFile;
+ BufferedInputStream bis = new
BufferedInputStream(in);
+ StreamUtils.copy(bis, fos);
+ bis = null;
+ fos.flush();
+ } finally {
try {
- hdfs.delete(createdFile, false);
+ if (fos != null) {
+ fos.close();
+ }
+ } catch (RemoteException re) {
+ // when talking to remote HDFS
clusters, we don't notice problems until fos.close()
+ if (createdFile != null) {
+ try {
+ hdfs.delete(createdFile,
false);
+ } catch (Throwable ignore) {
+ }
--- End diff --
Up to you. . . it's just personally I am not a fan of leaving empty catches
(not even a comment)
> Move to Hadoop 2.7.x client libs
> --------------------------------
>
> Key: NIFI-3003
> URL: https://issues.apache.org/jira/browse/NIFI-3003
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 1.1.0
>
>
> We should upgrade the Hadoop libraries NAR to use the Hadoop 2.7.x client
> libs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)