http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 6434e5e..6b04910 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -1,131 +1,131 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.hadoop; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.AccessControlException; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StopWatch; - -@SupportsBatching -@InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"}) -@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. " - + "The file in HDFS is left intact without any changes being made to it.") -@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " - + "not be fetched from HDFS") -@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class}) -public class FetchHDFS extends AbstractHadoopProcessor { - static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() - .name("HDFS Filename") - .description("The name of the HDFS file to retrieve") - .required(true) - .expressionLanguageSupported(true) - .defaultValue("${path}/${filename}") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file") - .build(); - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. " - + "This would occur, for instance, if the file is not found or if there is a permissions issue") - .build(); - static final Relationship REL_COMMS_FAILURE = new Relationship.Builder() - .name("comms.failure") - .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. " - + "This generally indicates that the Fetch should be tried again.") - .build(); - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(HADOOP_CONFIGURATION_RESOURCES); - properties.add(FILENAME); - properties.add(KERBEROS_PRINCIPAL); - properties.add(KERBEROS_KEYTAB); - properties.add(KERBEROS_RELOGIN_PERIOD); - return properties; - } - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_COMMS_FAILURE); - return relationships; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } - - final FileSystem hdfs = getFileSystem(); - final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue()); - final URI uri = path.toUri(); - - final StopWatch stopWatch = new StopWatch(true); - try (final FSDataInputStream inStream = hdfs.open(path, 16384)) { - flowFile = session.importFrom(inStream, flowFile); - stopWatch.stop(); - getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); - session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } catch (final FileNotFoundException | AccessControlException e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e}); - flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } catch (final IOException e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_COMMS_FAILURE); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.AccessControlException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"}) +@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. " + + "The file in HDFS is left intact without any changes being made to it.") +@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " + + "not be fetched from HDFS") +@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class}) +public class FetchHDFS extends AbstractHadoopProcessor { + static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() + .name("HDFS Filename") + .description("The name of the HDFS file to retrieve") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${path}/${filename}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. " + + "This would occur, for instance, if the file is not found or if there is a permissions issue") + .build(); + static final Relationship REL_COMMS_FAILURE = new Relationship.Builder() + .name("comms.failure") + .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. " + + "This generally indicates that the Fetch should be tried again.") + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HADOOP_CONFIGURATION_RESOURCES); + properties.add(FILENAME); + properties.add(KERBEROS_PRINCIPAL); + properties.add(KERBEROS_KEYTAB); + properties.add(KERBEROS_RELOGIN_PERIOD); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_COMMS_FAILURE); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final FileSystem hdfs = getFileSystem(); + final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue()); + final URI uri = path.toUri(); + + final StopWatch stopWatch = new StopWatch(true); + try (final FSDataInputStream inStream = hdfs.open(path, 16384)) { + flowFile = session.importFrom(inStream, flowFile); + stopWatch.stop(); + getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); + session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final FileNotFoundException | AccessControlException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e}); + flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } catch (final IOException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_COMMS_FAILURE); + } + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 0fae4ca..ea2d397 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -1,487 +1,487 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.hadoop; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; -import org.apache.nifi.annotation.notification.PrimaryNodeState; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.hadoop.util.HDFSListing; -import org.apache.nifi.processors.hadoop.util.StringSerDe; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; - - -@TriggerSerially -@TriggerWhenEmpty -@InputRequirement(Requirement.INPUT_FORBIDDEN) -@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) -@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents " - + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only " - + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating " - + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.") -@WritesAttributes({ - @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."), - @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, " - + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up " - + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."), - @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"), - @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"), - @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), - @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"), - @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"), - @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, " - + "3 for the group, and 3 for other users. For example rw-rw-r--") -}) -@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class}) -public class ListHDFS extends AbstractHadoopProcessor { - public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() - .name("Distributed Cache Service") - .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node " - + "begins pulling data, it won't duplicate all of the work that has been done.") - .required(true) - .identifiesControllerService(DistributedMapCacheClient.class) - .build(); - - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name(DIRECTORY_PROP_NAME) - .description("The HDFS directory from which files should be read") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() - .name("Recurse Subdirectories") - .description("Indicates whether to list files from subdirectories of the HDFS directory") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles are transferred to this relationship") - .build(); - - private volatile Long lastListingTime = null; - private volatile Set<Path> latestPathsListed = new HashSet<>(); - private volatile boolean electedPrimaryNode = false; - - @Override - protected void init(final ProcessorInitializationContext context) { - super.init(context); - } - - protected File getPersistenceFile() { - return new File("conf/state/" + getIdentifier()); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(HADOOP_CONFIGURATION_RESOURCES); - properties.add(DISTRIBUTED_CACHE_SERVICE); - properties.add(DIRECTORY); - properties.add(RECURSE_SUBDIRS); - properties.add(KERBEROS_PRINCIPAL); - properties.add(KERBEROS_KEYTAB); - properties.add(KERBEROS_RELOGIN_PERIOD); - return properties; - } - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - return relationships; - } - - protected String getKey(final String directory) { - return getIdentifier() + ".lastListingTime." + directory; - } - - @OnPrimaryNodeStateChange - public void onPrimaryNodeChange(final PrimaryNodeState newState) { - if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) { - electedPrimaryNode = true; - } - } - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - if ( descriptor.equals(DIRECTORY) ) { - lastListingTime = null; // clear lastListingTime so that we have to fetch new time - latestPathsListed = new HashSet<>(); - } - } - - private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { - final ObjectMapper mapper = new ObjectMapper(); - final JsonNode jsonNode = mapper.readTree(serializedState); - return mapper.readValue(jsonNode, HDFSListing.class); - } - - - private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException { - // Determine the timestamp for the last file that we've listed. - Long minTimestamp = lastListingTime; - if ( minTimestamp == null || electedPrimaryNode ) { - // We haven't yet restored any state from local or distributed state - or it's been at least a minute since - // we have performed a listing. In this case, - // First, attempt to get timestamp from distributed cache service. - try { - final StringSerDe serde = new StringSerDe(); - final String serializedState = client.get(getKey(directory), serde, serde); - if ( serializedState == null || serializedState.isEmpty() ) { - minTimestamp = null; - this.latestPathsListed = Collections.emptySet(); - } else { - final HDFSListing listing = deserialize(serializedState); - this.lastListingTime = listing.getLatestTimestamp().getTime(); - minTimestamp = listing.getLatestTimestamp().getTime(); - this.latestPathsListed = listing.toPaths(); - } - - this.lastListingTime = minTimestamp; - electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. - } catch (final IOException ioe) { - throw ioe; - } - - // Check the persistence file. We want to use the latest timestamp that we have so that - // we don't duplicate data. - try { - final File persistenceFile = getPersistenceFile(); - if ( persistenceFile.exists() ) { - try (final FileInputStream fis = new FileInputStream(persistenceFile)) { - final Properties props = new Properties(); - props.load(fis); - - // get the local timestamp for this directory, if it exists. - final String locallyPersistedValue = props.getProperty(directory); - if ( locallyPersistedValue != null ) { - final HDFSListing listing = deserialize(locallyPersistedValue); - final long localTimestamp = listing.getLatestTimestamp().getTime(); - - // If distributed state doesn't have an entry or the local entry is later than the distributed state, - // update the distributed state so that we are in sync. - if (minTimestamp == null || localTimestamp > minTimestamp) { - minTimestamp = localTimestamp; - - // Our local persistence file shows a later time than the Distributed service. - // Update the distributed service to match our local state. - try { - final StringSerDe serde = new StringSerDe(); - client.put(getKey(directory), locallyPersistedValue, serde, serde); - } catch (final IOException ioe) { - getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " - + "state due to {}. If a new node performs HDFS Listing, data duplication may occur", - new Object[] {directory, locallyPersistedValue, ioe}); - } - } - } - } - } - } catch (final IOException ioe) { - getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); - } - } - - return minTimestamp; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final String directory = context.getProperty(DIRECTORY).getValue(); - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - - final Long minTimestamp; - try { - minTimestamp = getMinTimestamp(directory, client); - } catch (final IOException ioe) { - getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); - context.yield(); - return; - } - - // Pull in any file that is newer than the timestamp that we have. - final FileSystem hdfs = getFileSystem(); - final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); - final Path rootPath = new Path(directory); - - int listCount = 0; - Long latestListingModTime = null; - final Set<FileStatus> statuses; - try { - statuses = getStatuses(rootPath, recursive, hdfs); - for ( final FileStatus status : statuses ) { - // don't get anything where the last modified timestamp is equal to our current timestamp. - // if we do, then we run the risk of multiple files having the same last mod date but us only - // seeing a portion of them. - // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe - // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a - // modified date not before the current time. - final long fileModTime = status.getModificationTime(); - - // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time. - // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those. - boolean fetch = !status.getPath().getName().endsWith("_COPYING_") - && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath()))); - - // Create the FlowFile for this path. - if ( fetch ) { - final Map<String, String> attributes = createAttributes(status); - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); - listCount++; - - if ( latestListingModTime == null || fileModTime > latestListingModTime ) { - latestListingModTime = fileModTime; - } - } - } - } catch (final IOException ioe) { - getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); - return; - } - - if ( listCount > 0 ) { - getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount}); - session.commit(); - - // We have performed a listing and pushed the FlowFiles out. - // Now, we need to persist state about the Last Modified timestamp of the newest file - // that we pulled in. We do this in order to avoid pulling in the same file twice. - // However, we want to save the state both locally and remotely. - // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the - // previously Primary Node left off. - // We also store the state locally so that if the node is restarted, and the node cannot contact - // the distributed state cache, the node can continue to run (if it is primary node). - String serializedState = null; - try { - serializedState = serializeState(latestListingModTime, statuses); - } catch (final Exception e) { - getLogger().error("Failed to serialize state due to {}", new Object[] {e}); - } - - if ( serializedState != null ) { - // Save our state locally. - try { - persistLocalState(directory, serializedState); - } catch (final IOException ioe) { - getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); - } - - // Attempt to save state to remote server. - try { - client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); - } catch (final IOException ioe) { - getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); - } - } - - lastListingTime = latestListingModTime; - } else { - getLogger().debug("There is no data to list. Yielding."); - context.yield(); - - // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system - if ( lastListingTime == null ) { - lastListingTime = 0L; - } - - return; - } - } - - private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { - final Set<FileStatus> statusSet = new HashSet<>(); - - final FileStatus[] statuses = hdfs.listStatus(path); - - for ( final FileStatus status : statuses ) { - if ( status.isDirectory() ) { - if ( recursive ) { - try { - statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs)); - } catch (final IOException ioe) { - getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); - } - } - } else { - statusSet.add(status); - } - } - - return statusSet; - } - - - private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException { - // we need to keep track of all files that we pulled in that had a modification time equal to - // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files - // that have a mod time equal to that timestamp because more files may come in with the same timestamp - // later in the same millisecond. - if ( statuses.isEmpty() ) { - return null; - } else { - final List<FileStatus> sortedStatuses = new ArrayList<>(statuses); - Collections.sort(sortedStatuses, new Comparator<FileStatus>() { - @Override - public int compare(final FileStatus o1, final FileStatus o2) { - return Long.compare(o1.getModificationTime(), o2.getModificationTime()); - } - }); - - final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime(); - final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>(); - for (int i=sortedStatuses.size() - 1; i >= 0; i--) { - final FileStatus status = sortedStatuses.get(i); - if (status.getModificationTime() == latestListingModTime) { - pathsWithModTimeEqualToListingModTime.add(status.getPath()); - } - } - - this.latestPathsListed = pathsWithModTimeEqualToListingModTime; - - final HDFSListing listing = new HDFSListing(); - listing.setLatestTimestamp(new Date(latestListingModTime)); - final Set<String> paths = new HashSet<>(); - for ( final Path path : pathsWithModTimeEqualToListingModTime ) { - paths.add(path.toUri().toString()); - } - listing.setMatchingPaths(paths); - - final ObjectMapper mapper = new ObjectMapper(); - final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing); - return serializedState; - } - } - - protected void persistLocalState(final String directory, final String serializedState) throws IOException { - // we need to keep track of all files that we pulled in that had a modification time equal to - // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files - // that have a mod time equal to that timestamp because more files may come in with the same timestamp - // later in the same millisecond. - final File persistenceFile = getPersistenceFile(); - final File dir = persistenceFile.getParentFile(); - if ( !dir.exists() && !dir.mkdirs() ) { - throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); - } - - final Properties props = new Properties(); - if ( persistenceFile.exists() ) { - try (final FileInputStream fis = new FileInputStream(persistenceFile)) { - props.load(fis); - } - } - - props.setProperty(directory, serializedState); - - try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { - props.store(fos, null); - } - } - - private String getAbsolutePath(final Path path) { - final Path parent = path.getParent(); - final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); - return prefix + "/" + path.getName(); - } - - private Map<String, String> createAttributes(final FileStatus status) { - final Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName()); - attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent())); - - attributes.put("hdfs.owner", status.getOwner()); - attributes.put("hdfs.group", status.getGroup()); - attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime())); - attributes.put("hdfs.length", String.valueOf(status.getLen())); - attributes.put("hdfs.replication", String.valueOf(status.getReplication())); - - final FsPermission permission = status.getPermission(); - final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction()); - attributes.put("hdfs.permissions", perms); - return attributes; - } - - private String getPerms(final FsAction action) { - final StringBuilder sb = new StringBuilder(); - if ( action.implies(FsAction.READ) ) { - sb.append("r"); - } else { - sb.append("-"); - } - - if ( action.implies(FsAction.WRITE) ) { - sb.append("w"); - } else { - sb.append("-"); - } - - if ( action.implies(FsAction.EXECUTE) ) { - sb.append("x"); - } else { - sb.append("-"); - } - - return sb.toString(); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.util.HDFSListing; +import org.apache.nifi.processors.hadoop.util.StringSerDe; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents " + + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only " + + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating " + + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.") +@WritesAttributes({ + @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."), + @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, " + + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up " + + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."), + @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"), + @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"), + @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), + @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"), + @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"), + @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, " + + "3 for the group, and 3 for other users. For example rw-rw-r--") +}) +@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class}) +public class ListHDFS extends AbstractHadoopProcessor { + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("Distributed Cache Service") + .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node " + + "begins pulling data, it won't duplicate all of the work that has been done.") + .required(true) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name(DIRECTORY_PROP_NAME) + .description("The HDFS directory from which files should be read") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() + .name("Recurse Subdirectories") + .description("Indicates whether to list files from subdirectories of the HDFS directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are transferred to this relationship") + .build(); + + private volatile Long lastListingTime = null; + private volatile Set<Path> latestPathsListed = new HashSet<>(); + private volatile boolean electedPrimaryNode = false; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + } + + protected File getPersistenceFile() { + return new File("conf/state/" + getIdentifier()); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HADOOP_CONFIGURATION_RESOURCES); + properties.add(DISTRIBUTED_CACHE_SERVICE); + properties.add(DIRECTORY); + properties.add(RECURSE_SUBDIRS); + properties.add(KERBEROS_PRINCIPAL); + properties.add(KERBEROS_KEYTAB); + properties.add(KERBEROS_RELOGIN_PERIOD); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + return relationships; + } + + protected String getKey(final String directory) { + return getIdentifier() + ".lastListingTime." + directory; + } + + @OnPrimaryNodeStateChange + public void onPrimaryNodeChange(final PrimaryNodeState newState) { + if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) { + electedPrimaryNode = true; + } + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if ( descriptor.equals(DIRECTORY) ) { + lastListingTime = null; // clear lastListingTime so that we have to fetch new time + latestPathsListed = new HashSet<>(); + } + } + + private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(serializedState); + return mapper.readValue(jsonNode, HDFSListing.class); + } + + + private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException { + // Determine the timestamp for the last file that we've listed. + Long minTimestamp = lastListingTime; + if ( minTimestamp == null || electedPrimaryNode ) { + // We haven't yet restored any state from local or distributed state - or it's been at least a minute since + // we have performed a listing. In this case, + // First, attempt to get timestamp from distributed cache service. + try { + final StringSerDe serde = new StringSerDe(); + final String serializedState = client.get(getKey(directory), serde, serde); + if ( serializedState == null || serializedState.isEmpty() ) { + minTimestamp = null; + this.latestPathsListed = Collections.emptySet(); + } else { + final HDFSListing listing = deserialize(serializedState); + this.lastListingTime = listing.getLatestTimestamp().getTime(); + minTimestamp = listing.getLatestTimestamp().getTime(); + this.latestPathsListed = listing.toPaths(); + } + + this.lastListingTime = minTimestamp; + electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. + } catch (final IOException ioe) { + throw ioe; + } + + // Check the persistence file. We want to use the latest timestamp that we have so that + // we don't duplicate data. + try { + final File persistenceFile = getPersistenceFile(); + if ( persistenceFile.exists() ) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + final Properties props = new Properties(); + props.load(fis); + + // get the local timestamp for this directory, if it exists. + final String locallyPersistedValue = props.getProperty(directory); + if ( locallyPersistedValue != null ) { + final HDFSListing listing = deserialize(locallyPersistedValue); + final long localTimestamp = listing.getLatestTimestamp().getTime(); + + // If distributed state doesn't have an entry or the local entry is later than the distributed state, + // update the distributed state so that we are in sync. + if (minTimestamp == null || localTimestamp > minTimestamp) { + minTimestamp = localTimestamp; + + // Our local persistence file shows a later time than the Distributed service. + // Update the distributed service to match our local state. + try { + final StringSerDe serde = new StringSerDe(); + client.put(getKey(directory), locallyPersistedValue, serde, serde); + } catch (final IOException ioe) { + getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " + + "state due to {}. If a new node performs HDFS Listing, data duplication may occur", + new Object[] {directory, locallyPersistedValue, ioe}); + } + } + } + } + } + } catch (final IOException ioe) { + getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); + } + } + + return minTimestamp; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String directory = context.getProperty(DIRECTORY).getValue(); + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + + final Long minTimestamp; + try { + minTimestamp = getMinTimestamp(directory, client); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + + // Pull in any file that is newer than the timestamp that we have. + final FileSystem hdfs = getFileSystem(); + final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); + final Path rootPath = new Path(directory); + + int listCount = 0; + Long latestListingModTime = null; + final Set<FileStatus> statuses; + try { + statuses = getStatuses(rootPath, recursive, hdfs); + for ( final FileStatus status : statuses ) { + // don't get anything where the last modified timestamp is equal to our current timestamp. + // if we do, then we run the risk of multiple files having the same last mod date but us only + // seeing a portion of them. + // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe + // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a + // modified date not before the current time. + final long fileModTime = status.getModificationTime(); + + // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time. + // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those. + boolean fetch = !status.getPath().getName().endsWith("_COPYING_") + && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath()))); + + // Create the FlowFile for this path. + if ( fetch ) { + final Map<String, String> attributes = createAttributes(status); + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + listCount++; + + if ( latestListingModTime == null || fileModTime > latestListingModTime ) { + latestListingModTime = fileModTime; + } + } + } + } catch (final IOException ioe) { + getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); + return; + } + + if ( listCount > 0 ) { + getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount}); + session.commit(); + + // We have performed a listing and pushed the FlowFiles out. + // Now, we need to persist state about the Last Modified timestamp of the newest file + // that we pulled in. We do this in order to avoid pulling in the same file twice. + // However, we want to save the state both locally and remotely. + // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the + // previously Primary Node left off. + // We also store the state locally so that if the node is restarted, and the node cannot contact + // the distributed state cache, the node can continue to run (if it is primary node). + String serializedState = null; + try { + serializedState = serializeState(latestListingModTime, statuses); + } catch (final Exception e) { + getLogger().error("Failed to serialize state due to {}", new Object[] {e}); + } + + if ( serializedState != null ) { + // Save our state locally. + try { + persistLocalState(directory, serializedState); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); + } + + // Attempt to save state to remote server. + try { + client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); + } catch (final IOException ioe) { + getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); + } + } + + lastListingTime = latestListingModTime; + } else { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); + + // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system + if ( lastListingTime == null ) { + lastListingTime = 0L; + } + + return; + } + } + + private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { + final Set<FileStatus> statusSet = new HashSet<>(); + + final FileStatus[] statuses = hdfs.listStatus(path); + + for ( final FileStatus status : statuses ) { + if ( status.isDirectory() ) { + if ( recursive ) { + try { + statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs)); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); + } + } + } else { + statusSet.add(status); + } + } + + return statusSet; + } + + + private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException { + // we need to keep track of all files that we pulled in that had a modification time equal to + // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files + // that have a mod time equal to that timestamp because more files may come in with the same timestamp + // later in the same millisecond. + if ( statuses.isEmpty() ) { + return null; + } else { + final List<FileStatus> sortedStatuses = new ArrayList<>(statuses); + Collections.sort(sortedStatuses, new Comparator<FileStatus>() { + @Override + public int compare(final FileStatus o1, final FileStatus o2) { + return Long.compare(o1.getModificationTime(), o2.getModificationTime()); + } + }); + + final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime(); + final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>(); + for (int i=sortedStatuses.size() - 1; i >= 0; i--) { + final FileStatus status = sortedStatuses.get(i); + if (status.getModificationTime() == latestListingModTime) { + pathsWithModTimeEqualToListingModTime.add(status.getPath()); + } + } + + this.latestPathsListed = pathsWithModTimeEqualToListingModTime; + + final HDFSListing listing = new HDFSListing(); + listing.setLatestTimestamp(new Date(latestListingModTime)); + final Set<String> paths = new HashSet<>(); + for ( final Path path : pathsWithModTimeEqualToListingModTime ) { + paths.add(path.toUri().toString()); + } + listing.setMatchingPaths(paths); + + final ObjectMapper mapper = new ObjectMapper(); + final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing); + return serializedState; + } + } + + protected void persistLocalState(final String directory, final String serializedState) throws IOException { + // we need to keep track of all files that we pulled in that had a modification time equal to + // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files + // that have a mod time equal to that timestamp because more files may come in with the same timestamp + // later in the same millisecond. + final File persistenceFile = getPersistenceFile(); + final File dir = persistenceFile.getParentFile(); + if ( !dir.exists() && !dir.mkdirs() ) { + throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); + } + + final Properties props = new Properties(); + if ( persistenceFile.exists() ) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + props.load(fis); + } + } + + props.setProperty(directory, serializedState); + + try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { + props.store(fos, null); + } + } + + private String getAbsolutePath(final Path path) { + final Path parent = path.getParent(); + final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); + return prefix + "/" + path.getName(); + } + + private Map<String, String> createAttributes(final FileStatus status) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName()); + attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent())); + + attributes.put("hdfs.owner", status.getOwner()); + attributes.put("hdfs.group", status.getGroup()); + attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime())); + attributes.put("hdfs.length", String.valueOf(status.getLen())); + attributes.put("hdfs.replication", String.valueOf(status.getReplication())); + + final FsPermission permission = status.getPermission(); + final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction()); + attributes.put("hdfs.permissions", perms); + return attributes; + } + + private String getPerms(final FsAction action) { + final StringBuilder sb = new StringBuilder(); + if ( action.implies(FsAction.READ) ) { + sb.append("r"); + } else { + sb.append("-"); + } + + if ( action.implies(FsAction.WRITE) ) { + sb.append("w"); + } else { + sb.append("-"); + } + + if ( action.implies(FsAction.EXECUTE) ) { + sb.append("x"); + } else { + sb.append("-"); + } + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java index 49957f5..a4d957a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java @@ -1,83 +1,83 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.hadoop.util; - -import java.util.Collection; -import java.util.Date; -import java.util.HashSet; -import java.util.Set; - -import javax.xml.bind.annotation.XmlTransient; -import javax.xml.bind.annotation.XmlType; - -import org.apache.hadoop.fs.Path; - -/** - * A simple POJO for maintaining state about the last HDFS Listing that was performed so that - * we can avoid pulling the same file multiple times - */ -@XmlType(name = "listing") -public class HDFSListing { - private Date latestTimestamp; - private Collection<String> matchingPaths; - - /** - * @return the modification date of the newest file that was contained in the HDFS Listing - */ - public Date getLatestTimestamp() { - return latestTimestamp; - } - - /** - * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing - * - * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing - */ - public void setLatestTimestamp(Date latestTimestamp) { - this.latestTimestamp = latestTimestamp; - } - - /** - * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date - * was equal to {@link #getLatestTimestamp()} - */ - @XmlTransient - public Collection<String> getMatchingPaths() { - return matchingPaths; - } - - /** - * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()} - */ - public Set<Path> toPaths() { - final Set<Path> paths = new HashSet<>(matchingPaths.size()); - for ( final String pathname : matchingPaths ) { - paths.add(new Path(pathname)); - } - return paths; - } - - /** - * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was - * equal to {@link #getLatestTimestamp()} - * @param matchingPaths the paths that have last modified date matching the latest timestamp - */ - public void setMatchingPaths(Collection<String> matchingPaths) { - this.matchingPaths = matchingPaths; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.XmlType; + +import org.apache.hadoop.fs.Path; + +/** + * A simple POJO for maintaining state about the last HDFS Listing that was performed so that + * we can avoid pulling the same file multiple times + */ +@XmlType(name = "listing") +public class HDFSListing { + private Date latestTimestamp; + private Collection<String> matchingPaths; + + /** + * @return the modification date of the newest file that was contained in the HDFS Listing + */ + public Date getLatestTimestamp() { + return latestTimestamp; + } + + /** + * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing + * + * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing + */ + public void setLatestTimestamp(Date latestTimestamp) { + this.latestTimestamp = latestTimestamp; + } + + /** + * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date + * was equal to {@link #getLatestTimestamp()} + */ + @XmlTransient + public Collection<String> getMatchingPaths() { + return matchingPaths; + } + + /** + * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()} + */ + public Set<Path> toPaths() { + final Set<Path> paths = new HashSet<>(matchingPaths.size()); + for ( final String pathname : matchingPaths ) { + paths.add(new Path(pathname)); + } + return paths; + } + + /** + * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was + * equal to {@link #getLatestTimestamp()} + * @param matchingPaths the paths that have last modified date matching the latest timestamp + */ + public void setMatchingPaths(Collection<String> matchingPaths) { + this.matchingPaths = matchingPaths; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java index 229f26c..17cacd9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java @@ -1,48 +1,48 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.hadoop.util; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.exception.DeserializationException; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; - -public class LongSerDe implements Serializer<Long>, Deserializer<Long> { - - @Override - public Long deserialize(final byte[] input) throws DeserializationException, IOException { - if ( input == null || input.length == 0 ) { - return null; - } - - final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input)); - return dis.readLong(); - } - - @Override - public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException { - final DataOutputStream dos = new DataOutputStream(out); - dos.writeLong(value); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +public class LongSerDe implements Serializer<Long>, Deserializer<Long> { + + @Override + public Long deserialize(final byte[] input) throws DeserializationException, IOException { + if ( input == null || input.length == 0 ) { + return null; + } + + final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input)); + return dis.readLong(); + } + + @Override + public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException { + final DataOutputStream dos = new DataOutputStream(out); + dos.writeLong(value); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java index ca1c548..2a52c4d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java @@ -1,44 +1,44 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.hadoop.util; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; - -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.exception.DeserializationException; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; - -public class StringSerDe implements Serializer<String>, Deserializer<String> { - - @Override - public String deserialize(final byte[] value) throws DeserializationException, IOException { - if ( value == null ) { - return null; - } - - return new String(value, StandardCharsets.UTF_8); - } - - @Override - public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { - out.write(value.getBytes(StandardCharsets.UTF_8)); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +public class StringSerDe implements Serializer<String>, Deserializer<String> { + + @Override + public String deserialize(final byte[] value) throws DeserializationException, IOException { + if ( value == null ) { + return null; + } + + return new String(value, StandardCharsets.UTF_8); + } + + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + +}
