[
https://issues.apache.org/jira/browse/NIFI-2547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423638#comment-15423638
]
ASF GitHub Bot commented on NIFI-2547:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/850#discussion_r75044228
--- Diff:
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem" })
+@CapabilityDescription("Deletes a file from HDFS. The file can be provided
as an attribute from an incoming FlowFile, "
+ + "or a statically set file that is periodically removed. If this
processor has an incoming connection, it"
+ + "will ignore running on a periodic basis and instead rely on
incoming FlowFiles to trigger a delete. "
+ + "Optionally, you may specify use a wildcard character to match
multiple files or directories.")
+public class DeleteHDFS extends AbstractHadoopProcessor {
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles will be routed here if the delete
command was successful")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles will be routed here if the delete
command was unsuccessful")
+ .build();
+
+ public static final PropertyDescriptor FILE_OR_DIRECTORY = new
PropertyDescriptor.Builder()
+ .name("File or Directory")
+ .description("The HDFS file or directory to delete. A wildcard
expression may be used to only delete certain files")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor RECURSIVE = new
PropertyDescriptor.Builder()
+ .name("Recursive")
+ .description("Remove contents of a non-empty directory
recursively")
+ .allowableValues("true", "false")
+ .required(true)
+ .defaultValue("true")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private static final Set<Relationship> relationships;
+
+ static {
+ final Set<Relationship> relationshipSet = new HashSet<>();
+ relationshipSet.add(REL_SUCCESS);
+ relationshipSet.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(relationshipSet);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ List<PropertyDescriptor> props = new ArrayList<>(properties);
+ props.add(FILE_OR_DIRECTORY);
+ props.add(RECURSIVE);
+ return props;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ String fileOrDirectoryName = null;
+ FlowFile flowFile = session.get();
+
+ // If this processor has an incoming connection, then do not run
unless a
+ // FlowFile is actually sent through
+ if (flowFile == null && context.hasIncomingConnection()) {
+ context.yield();
+ return;
+ }
+
+ if (flowFile != null) {
+ fileOrDirectoryName =
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+ } else {
+ fileOrDirectoryName =
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue();
+ }
+
+ final FileSystem fileSystem = getFileSystem();
+ try {
+ // Check if the user has supplied a file or directory pattern
+ List<Path> pathList = Lists.newArrayList();
+ if (fileOrDirectoryName.contains("*")) {
--- End diff --
Are there other types of glob patterns that could/should be recognized?
> Add DeleteHDFS Processor
> -------------------------
>
> Key: NIFI-2547
> URL: https://issues.apache.org/jira/browse/NIFI-2547
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Ricky Saltzer
> Assignee: Ricky Saltzer
>
> There are times where a user may want to remove a file or directory from
> HDFS. The reasons for this vary, but to provide some context, I currently
> have a pipeline where I need to periodically delete files that my NiFi
> pipeline is producing. In my case, it's a "Delete files after they are 7 days
> old".
> Currently, I have to use the {{ExecuteStreamCommand}} processor and manually
> call {{hdfs dfs -rm}}, which is awful when dealing with a large amount of
> files. For one, an entire JVM is spun up for each delete, and two, when
> deleting directories with thousands of files, it can sometimes cause the
> command to hang indefinitely.
> With that being said, I am proposing we add a {{DeleteHDFS}} processor which
> meets the following criteria.
> * Can delete both directories and files
> * Can delete directories recursively
> * Supports the dynamic expression language
> * Supports using glob paths (e.g. /data/for/2017/08/*)
> * Capable of being a downstream processor as well as a standalone processor
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)