[ 
https://issues.apache.org/jira/browse/NIFI-3003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667088#comment-15667088
 ] 

ASF GitHub Bot commented on NIFI-3003:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1219#discussion_r88011234
  
    --- Diff: 
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 ---
    @@ -205,177 +207,186 @@ public void onScheduled(ProcessContext context) 
throws Exception {
     
         @Override
         public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    -        FlowFile flowFile = session.get();
    +        final FlowFile flowFile = session.get();
             if (flowFile == null) {
                 return;
             }
     
    -        final Configuration configuration = getConfiguration();
             final FileSystem hdfs = getFileSystem();
    -        if (configuration == null || hdfs == null) {
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || hdfs == null || ugi == null) {
                 getLogger().error("HDFS not configured properly");
                 session.transfer(flowFile, REL_FAILURE);
                 context.yield();
                 return;
             }
     
    -        Path tempDotCopyFile = null;
    -        try {
    -            final String dirValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
    -            final Path configuredRootDirPath = new Path(dirValue);
    -
    -            final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
    -
    -            final Double blockSizeProp = 
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
    -            final long blockSize = blockSizeProp != null ? 
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
    +        ugi.doAs(new PrivilegedAction<Object>() {
    +            @Override
    +            public Object run() {
    +                Path tempDotCopyFile = null;
    +                FlowFile putFlowFile = flowFile;
    +                try {
    +                    final String dirValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
    +                    final Path configuredRootDirPath = new Path(dirValue);
     
    -            final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
    -            final int bufferSize = bufferSizeProp != null ? 
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, 
BUFFER_SIZE_DEFAULT);
    +                    final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
     
    -            final Integer replicationProp = 
context.getProperty(REPLICATION_FACTOR).asInteger();
    -            final short replication = replicationProp != null ? 
replicationProp.shortValue() : hdfs
    -                    .getDefaultReplication(configuredRootDirPath);
    +                    final Double blockSizeProp = 
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
    +                    final long blockSize = blockSizeProp != null ? 
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
     
    -            final CompressionCodec codec = getCompressionCodec(context, 
configuration);
    +                    final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
    +                    final int bufferSize = bufferSizeProp != null ? 
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, 
BUFFER_SIZE_DEFAULT);
     
    -            final String filename = codec != null
    -                    ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) 
+ codec.getDefaultExtension()
    -                    : flowFile.getAttribute(CoreAttributes.FILENAME.key());
    +                    final Integer replicationProp = 
context.getProperty(REPLICATION_FACTOR).asInteger();
    +                    final short replication = replicationProp != null ? 
replicationProp.shortValue() : hdfs
    +                            .getDefaultReplication(configuredRootDirPath);
     
    -            final Path tempCopyFile = new Path(configuredRootDirPath, "." 
+ filename);
    -            final Path copyFile = new Path(configuredRootDirPath, 
filename);
    +                    final CompressionCodec codec = 
getCompressionCodec(context, configuration);
     
    -            // Create destination directory if it does not exist
    -            try {
    -                if 
(!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
    -                    throw new IOException(configuredRootDirPath.toString() 
+ " already exists and is not a directory");
    -                }
    -            } catch (FileNotFoundException fe) {
    -                if (!hdfs.mkdirs(configuredRootDirPath)) {
    -                    throw new IOException(configuredRootDirPath.toString() 
+ " could not be created");
    -                }
    -                changeOwner(context, hdfs, configuredRootDirPath);
    -            }
    +                    final String filename = codec != null
    +                            ? 
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + 
codec.getDefaultExtension()
    +                            : 
putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
     
    -            final boolean destinationExists = hdfs.exists(copyFile);
    +                    final Path tempCopyFile = new 
Path(configuredRootDirPath, "." + filename);
    +                    final Path copyFile = new Path(configuredRootDirPath, 
filename);
     
    -            // If destination file already exists, resolve that based on 
processor configuration
    -            if (destinationExists) {
    -                switch (conflictResponse) {
    -                case REPLACE_RESOLUTION:
    -                        if (hdfs.delete(copyFile, false)) {
    -                            getLogger().info("deleted {} in order to 
replace with the contents of {}",
    -                                    new Object[]{copyFile, flowFile});
    +                    // Create destination directory if it does not exist
    +                    try {
    +                        if 
(!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
    +                            throw new 
IOException(configuredRootDirPath.toString() + " already exists and is not a 
directory");
                             }
    -                        break;
    -                case IGNORE_RESOLUTION:
    -                        session.transfer(flowFile, REL_SUCCESS);
    -                        getLogger().info("transferring {} to success 
because file with same name already exists",
    -                                new Object[]{flowFile});
    -                        return;
    -                case FAIL_RESOLUTION:
    -                        flowFile = session.penalize(flowFile);
    -                        session.transfer(flowFile, REL_FAILURE);
    -                        getLogger().warn("penalizing {} and routing to 
failure because file with same name already exists",
    -                                new Object[]{flowFile});
    -                        return;
    -                    default:
    -                        break;
    -                }
    -            }
    +                    } catch (FileNotFoundException fe) {
    +                        if (!hdfs.mkdirs(configuredRootDirPath)) {
    +                            throw new 
IOException(configuredRootDirPath.toString() + " could not be created");
    +                        }
    +                        changeOwner(context, hdfs, configuredRootDirPath);
    +                    }
     
    -            // Write FlowFile to temp file on HDFS
    -            final StopWatch stopWatch = new StopWatch(true);
    -            session.read(flowFile, new InputStreamCallback() {
    +                    final boolean destinationExists = 
hdfs.exists(copyFile);
     
    -                @Override
    -                public void process(InputStream in) throws IOException {
    -                    OutputStream fos = null;
    -                    Path createdFile = null;
    -                    try {
    -                        if 
(conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) 
{
    -                            fos = hdfs.append(copyFile, bufferSize);
    -                        } else {
    -                            fos = hdfs.create(tempCopyFile, true, 
bufferSize, replication, blockSize);
    -                        }
    -                        if (codec != null) {
    -                            fos = codec.createOutputStream(fos);
    +                    // If destination file already exists, resolve that 
based on processor configuration
    +                    if (destinationExists) {
    +                        switch (conflictResponse) {
    +                        case REPLACE_RESOLUTION:
    +                                if (hdfs.delete(copyFile, false)) {
    +                                    getLogger().info("deleted {} in order 
to replace with the contents of {}",
    +                                            new Object[]{copyFile, 
putFlowFile});
    +                                }
    +                                break;
    +                        case IGNORE_RESOLUTION:
    +                                session.transfer(putFlowFile, REL_SUCCESS);
    +                                getLogger().info("transferring {} to 
success because file with same name already exists",
    +                                        new Object[]{putFlowFile});
    +                                return null;
    +                        case FAIL_RESOLUTION:
    +                                
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
    +                                getLogger().warn("penalizing {} and 
routing to failure because file with same name already exists",
    +                                        new Object[]{putFlowFile});
    +                                return null;
    +                            default:
    +                                break;
                             }
    -                        createdFile = tempCopyFile;
    -                        BufferedInputStream bis = new 
BufferedInputStream(in);
    -                        StreamUtils.copy(bis, fos);
    -                        bis = null;
    -                        fos.flush();
    -                    } finally {
    -                        try {
    -                            if (fos != null) {
    -                                fos.close();
    -                            }
    -                        } catch (RemoteException re) {
    -                            // when talking to remote HDFS clusters, we 
don't notice problems until fos.close()
    -                            if (createdFile != null) {
    +                    }
    +
    +                    // Write FlowFile to temp file on HDFS
    +                    final StopWatch stopWatch = new StopWatch(true);
    +                    session.read(putFlowFile, new InputStreamCallback() {
    +
    +                        @Override
    +                        public void process(InputStream in) throws 
IOException {
    +                            OutputStream fos = null;
    +                            Path createdFile = null;
    +                            try {
    +                                if 
(conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) 
{
    +                                    fos = hdfs.append(copyFile, 
bufferSize);
    +                                } else {
    +                                    fos = hdfs.create(tempCopyFile, true, 
bufferSize, replication, blockSize);
    +                                }
    +                                if (codec != null) {
    +                                    fos = codec.createOutputStream(fos);
    +                                }
    +                                createdFile = tempCopyFile;
    +                                BufferedInputStream bis = new 
BufferedInputStream(in);
    +                                StreamUtils.copy(bis, fos);
    +                                bis = null;
    +                                fos.flush();
    +                            } finally {
                                     try {
    -                                    hdfs.delete(createdFile, false);
    +                                    if (fos != null) {
    +                                        fos.close();
    +                                    }
    +                                } catch (RemoteException re) {
    +                                    // when talking to remote HDFS 
clusters, we don't notice problems until fos.close()
    +                                    if (createdFile != null) {
    +                                        try {
    +                                            hdfs.delete(createdFile, 
false);
    +                                        } catch (Throwable ignore) {
    +                                        }
    --- End diff --
    
    Are you sure we don't even wang to log here?


> Move to Hadoop 2.7.x client libs
> --------------------------------
>
>                 Key: NIFI-3003
>                 URL: https://issues.apache.org/jira/browse/NIFI-3003
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 1.1.0
>
>
> We should upgrade the Hadoop libraries NAR to use the Hadoop 2.7.x client 
> libs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to