[ 
https://issues.apache.org/jira/browse/NIFI-1037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316935#comment-15316935
 ] 

ASF GitHub Bot commented on NIFI-1037:
--------------------------------------

Github user jjmeyer0 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/493#discussion_r65942750
  
    --- Diff: 
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/inotify/GetHDFSEvents.java
 ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop.inotify;
    +
    +
    +import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
    +import org.apache.hadoop.hdfs.client.HdfsAdmin;
    +import org.apache.hadoop.hdfs.inotify.Event;
    +import org.apache.hadoop.hdfs.inotify.EventBatch;
    +import org.apache.hadoop.hdfs.inotify.MissingEventsException;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
    +import org.apache.nifi.processors.hadoop.FetchHDFS;
    +import org.apache.nifi.processors.hadoop.GetHDFS;
    +import org.apache.nifi.processors.hadoop.ListHDFS;
    +import org.apache.nifi.processors.hadoop.PutHDFS;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +@TriggerSerially
    +@TriggerWhenEmpty
    +@Tags({"hadoop", "events", "inotify", "notifications", "filesystem"})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = EventAttributes.MIME_TYPE, 
description = "This is always application/json."),
    +        @WritesAttribute(attribute = EventAttributes.EVENT_TYPE, 
description = "This will specify the specific HDFS notification event type. 
Currently there are six types of events " +
    +                "(append, close, create, metadata, rename, and unlink)."),
    +        @WritesAttribute(attribute = EventAttributes.EVENT_PATH, 
description = "The specific path that the event is tied to.")
    +})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("This processor polls the notification events 
provided by the HdfsAdmin API. Since this uses the HdfsAdmin APIs it is 
required to run as an HDFS super user. Currently there " +
    +        "are six types of events (append, close, create, metadata, rename, 
and unlink). Please see org.apache.hadoop.hdfs.inotify.Event documentation for 
full explanations of each event. " +
    +        "This processor will poll for new events based on a defined 
duration. For each event received a new flow file will be created with the 
expected attributes and the event itself serialized " +
    +        "to JSON and written to the flow file's content. For example, if 
event.type is APPEND then the content of the flow file will contain a JSON file 
containing the information about the " +
    +        "append event. If successful the flow files are sent to the 
'success' relationship. Be careful of where the generated flow files are 
stored. If the flow files are stored in one of " +
    +        "processor's watch directories there will be a never ending flow 
of events. It is also important to be aware that this processor must consume 
all events. The filtering must happen within " +
    +        "the processor. This is because the HDFS admin's event 
notifications API does not have filtering.")
    +@Stateful(scopes = Scope.CLUSTER, description = "The last used transaction 
id is stored. This is used ")
    +@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class, ListHDFS.class})
    +public class GetHDFSEvents extends AbstractHadoopProcessor {
    +    static final PropertyDescriptor POLL_DURATION = new 
PropertyDescriptor.Builder()
    +            .name("Poll Duration")
    --- End diff --
    
    Thanks for the link. I'll look into this and update accordingly.


> Hdfs Inotify processor
> ----------------------
>
>                 Key: NIFI-1037
>                 URL: https://issues.apache.org/jira/browse/NIFI-1037
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: nicolas maillard
>            Assignee: Josh Meyer
>            Priority: Minor
>
> HDFS has an Inotify interface that enables to access the HDFS edit stream.
> https://issues.apache.org/jira/browse/HDFS-6634
> Creating a processor to listen in and get notifications either for select 
> directories or select actions would have many applications.
> - Stream to a search engine the activity on HDFS
> - Wait for specific actions or files to trigger workflows, like duplication 
> to other clusters
> - Validate ingestion processes
> etc..
> probably more I don't think of.
> I have a first working beta version that needs to evolve
> it reuses the Hadoop-nar-bundle
> Needs a HDFS 2.7  dependency currently done through editing the Hadop-lib 
> bundle
> let me know if this idea makes sense and would be of interest to the community
> would love to contribute the idea



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to