markap14 commented on a change in pull request #5175:
URL: https://github.com/apache/nifi/pull/5175#discussion_r656283356



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsCreateModes;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+import org.ietf.jgss.GSSException;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
+    protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+    protected static final int BUFFER_SIZE_DEFAULT = 4096;
+
+    protected static final String REPLACE_RESOLUTION = "replace";
+    protected static final String IGNORE_RESOLUTION = "ignore";
+    protected static final String FAIL_RESOLUTION = "fail";
+    protected static final String APPEND_RESOLUTION = "append";
+
+    protected static final AllowableValue REPLACE_RESOLUTION_AV = new 
AllowableValue(REPLACE_RESOLUTION,
+            REPLACE_RESOLUTION, "Replaces the existing file if any.");
+    protected static final AllowableValue IGNORE_RESOLUTION_AV = new 
AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
+            "Ignores the flow file and routes it to success.");
+    protected static final AllowableValue FAIL_RESOLUTION_AV = new 
AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+            "Penalizes the flow file and routes it to failure.");
+    protected static final AllowableValue APPEND_RESOLUTION_AV = new 
AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
+            "Appends to the existing file if any, creates a new file 
otherwise.");
+
+    protected static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
+            .name("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the 
same name already exists in the output directory")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION_AV.getValue())
+            .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, 
FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
+            .build();
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final FileSystem hdfs = getFileSystem();
+        final Configuration configuration = getConfiguration();
+        final UserGroupInformation ugi = getUserGroupInformation();
+
+        if (configuration == null || hdfs == null || ugi == null) {
+            getLogger().error("HDFS not configured properly");
+            session.transfer(flowFile, getFailureRelationship());
+            context.yield();
+            return;
+        }
+
+        ugi.doAs(new PrivilegedAction<Object>() {
+            @Override
+            public Object run() {
+                Path tempDotCopyFile = null;
+                FlowFile putFlowFile = flowFile;
+                try {
+                    final String dirValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
+                    final Path configuredRootDirPath = new Path(dirValue);
+
+                    final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+                    final long blockSize = getBlockSize(context, session, 
putFlowFile);
+                    final int bufferSize = getBufferSize(context, session, 
putFlowFile);
+                    final short replication = getReplication(context, session, 
putFlowFile);
+
+                    final CompressionCodec codec = 
getCompressionCodec(context, configuration);
+
+                    final String filename = codec != null
+                            ? 
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + 
codec.getDefaultExtension()
+                            : 
putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+                    final Path tempCopyFile = new Path(configuredRootDirPath, 
"." + filename);
+                    final Path copyFile = new Path(configuredRootDirPath, 
filename);
+
+                    // Create destination directory if it does not exist
+                    try {
+                        if 
(!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
+                            throw new 
IOException(configuredRootDirPath.toString() + " already exists and is not a 
directory");
+                        }
+                    } catch (FileNotFoundException fe) {
+                        if (!hdfs.mkdirs(configuredRootDirPath)) {
+                            throw new 
IOException(configuredRootDirPath.toString() + " could not be created");
+                        }
+                        changeOwner(context, hdfs, configuredRootDirPath, 
flowFile);
+                    }
+
+                    final boolean destinationExists = hdfs.exists(copyFile);
+
+                    // If destination file already exists, resolve that based 
on processor configuration
+                    if (destinationExists) {
+                        switch (conflictResponse) {
+                            case REPLACE_RESOLUTION:
+                                if (hdfs.delete(copyFile, false)) {
+                                    getLogger().info("deleted {} in order to 
replace with the contents of {}",
+                                            new Object[]{copyFile, 
putFlowFile});
+                                }
+                                break;
+                            case IGNORE_RESOLUTION:
+                                session.transfer(putFlowFile, 
getSuccessRelationship());
+                                getLogger().info("transferring {} to success 
because file with same name already exists",
+                                        new Object[]{putFlowFile});
+                                return null;
+                            case FAIL_RESOLUTION:
+                                
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+                                getLogger().warn("penalizing {} and routing to 
failure because file with same name already exists",
+                                        new Object[]{putFlowFile});
+                                return null;
+                            default:
+                                break;
+                        }
+                    }
+
+                    // Write FlowFile to temp file on HDFS
+                    final StopWatch stopWatch = new StopWatch(true);
+                    session.read(putFlowFile, new InputStreamCallback() {
+
+                        @Override
+                        public void process(InputStream in) throws IOException 
{
+                            OutputStream fos = null;
+                            Path createdFile = null;
+                            try {
+                                if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
+                                    fos = hdfs.append(copyFile, bufferSize);
+                                } else {
+                                    final EnumSet<CreateFlag> cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+
+                                    if (shouldIgnoreLocality(context, 
session)) {
+                                        
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
+                                    }
+
+                                    fos = hdfs.create(tempCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                            
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+                                            null, null);
+                                }
+
+                                if (codec != null) {
+                                    fos = codec.createOutputStream(fos);
+                                }
+                                createdFile = tempCopyFile;
+                                BufferedInputStream bis = new 
BufferedInputStream(in);
+                                StreamUtils.copy(bis, fos);
+                                bis = null;
+                                fos.flush();
+                            } finally {
+                                try {
+                                    if (fos != null) {
+                                        fos.close();
+                                    }
+                                } catch (Throwable t) {
+                                    // when talking to remote HDFS clusters, 
we don't notice problems until fos.close()
+                                    if (createdFile != null) {
+                                        try {
+                                            hdfs.delete(createdFile, false);
+                                        } catch (Throwable ignore) {
+                                        }
+                                    }
+                                    throw t;
+                                }
+                                fos = null;
+                            }
+                        }
+
+                    });
+                    stopWatch.stop();
+                    final String dataRate = 
stopWatch.calculateDataRate(putFlowFile.getSize());
+                    final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                    tempDotCopyFile = tempCopyFile;
+
+                    if (!conflictResponse.equals(APPEND_RESOLUTION)
+                            || (conflictResponse.equals(APPEND_RESOLUTION) && 
!destinationExists)) {
+                        boolean renamed = false;
+                        for (int i = 0; i < 10; i++) { // try to rename 
multiple times.
+                            if (hdfs.rename(tempCopyFile, copyFile)) {
+                                renamed = true;
+                                break;// rename was successful
+                            }
+                            Thread.sleep(200L);// try waiting to let whatever 
might cause rename failure to resolve
+                        }
+                        if (!renamed) {
+                            hdfs.delete(tempCopyFile, false);
+                            throw new ProcessException("Copied file to HDFS 
but could not rename dot file " + tempCopyFile
+                                    + " to its final filename");
+                        }
+
+                        changeOwner(context, hdfs, copyFile, flowFile);
+                    }
+
+                    getLogger().info("copied {} to HDFS at {} in {} 
milliseconds at a rate of {}",
+                            new Object[]{putFlowFile, copyFile, millis, 
dataRate});
+
+                    final String newFilename = copyFile.getName();
+                    final String hdfsPath = copyFile.getParent().toString();
+                    putFlowFile = session.putAttribute(putFlowFile, 
CoreAttributes.FILENAME.key(), newFilename);
+                    putFlowFile = session.putAttribute(putFlowFile, 
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                    final Path qualifiedPath = 
copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+                    session.getProvenanceReporter().send(putFlowFile, 
qualifiedPath.toString());
+
+                    session.transfer(putFlowFile, getSuccessRelationship());
+
+                } catch (final IOException e) {
+                    Optional<GSSException> causeOptional = findCause(e, 
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+                    if (causeOptional.isPresent()) {
+                        getLogger().warn("An error occurred while connecting 
to HDFS. "
+                                        + "Rolling back session, and 
penalizing flow file {}",
+                                new Object[] 
{putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
+                        session.rollback(true);
+                    } else {
+                        getLogger().error("Failed to access HDFS due to {}", 
new Object[]{e});
+                        session.transfer(putFlowFile, 
getFailureRelationship());
+                    }
+                } catch (final Throwable t) {
+                    if (tempDotCopyFile != null) {
+                        try {
+                            hdfs.delete(tempDotCopyFile, false);
+                        } catch (Exception e) {
+                            getLogger().error("Unable to remove temporary file 
{} due to {}", new Object[]{tempDotCopyFile, e});
+                        }
+                    }
+                    getLogger().error("Failed to write to HDFS due to {}", new 
Object[]{t});
+                    session.transfer(session.penalize(putFlowFile), 
getFailureRelationship());
+                    context.yield();
+                }
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Returns with the expected block size. Note: this might be overwritten 
by implementations.
+     */
+    protected long getBlockSize(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile) {
+        final String dirValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        final Path configuredRootDirPath = new Path(dirValue);
+
+        try {
+            return 
getFileSystem().getFileStatus(configuredRootDirPath).getBlockSize();
+        } catch (final IOException e) {
+            getLogger().warn("Error happened during acquiring status for {}. 
Determining the block size using default value.", configuredRootDirPath, e);
+            return getFileSystem().getDefaultBlockSize(configuredRootDirPath);
+        }
+    }
+
+    /**
+     * Returns with the expected buffer size. Note: this might be overwritten 
by implementations.
+     */
+    protected int getBufferSize(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile) {
+        return getConfiguration().getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
+    }
+
+    /**
+     * Returns with the expected replication factor. Note: this might be 
overwritten by implementations.
+     */
+    protected short getReplication(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile) {
+        final String dirValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        final Path configuredRootDirPath = new Path(dirValue);
+        return getFileSystem().getDefaultReplication(configuredRootDirPath);
+    }
+
+    /**
+     * Returns if file system should ignore location. Note: this might be 
overwritten by implementations.
+     */
+    protected boolean shouldIgnoreLocality(final ProcessContext context, final 
ProcessSession session) {
+        return false;
+    }
+
+    /**
+     * I returns a non-null value, changes the owner of the uploaded file to 
this value after it is written. This only
+     * works if NiFi is running as a user that has privilege to change owner. 
Note: this might be overwritten by implementations.
+     */
+    protected String getOwner(final ProcessContext context, final FlowFile 
flowFile) {
+        return null;
+    }
+
+    /**
+     * I returns a non-null value, changes the group of the uploaded file to 
this value after it is written. This only
+     * works if NiFi is running as a user that has privilege to change group. 
Note: this might be overwritten by implementations.
+     */
+    protected String getGroup(final ProcessContext context, final FlowFile 
flowFile) {

Review comment:
       These methods all have 'default' implementations but are then overridden 
in the existing concrete implementation. Probably makes more sense to just make 
them abstract?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to