http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 361f1ed..7aa534f 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -58,18 +58,15 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; -/** - * This processor reads files from HDFS into NiFi FlowFiles. - */ @TriggerWhenEmpty @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) -@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles") +@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.") @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."), - @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if " - + "the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse " - + "Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")}) -@SeeAlso(PutHDFS.class) + @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property " + + "is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and " + + "a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") }) +@SeeAlso({PutHDFS.class, ListHDFS.class}) public class GetHDFS extends AbstractHadoopProcessor { public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; @@ -78,101 +75,101 @@ public class GetHDFS extends AbstractHadoopProcessor { // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All files retrieved from HDFS are transferred to this relationship") - .build(); + .name("success") + .description("All files retrieved from HDFS are transferred to this relationship") + .build(); public static final Relationship REL_PASSTHROUGH = new Relationship.Builder() - .name("passthrough") - .description( - "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship") + .name("passthrough") + .description( + "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship") .build(); // properties public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name(DIRECTORY_PROP_NAME) - .description("The HDFS directory from which files should be read") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name(DIRECTORY_PROP_NAME) + .description("The HDFS directory from which files should be read") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() - .name("Recurse Subdirectories") - .description("Indicates whether to pull files from subdirectories of the HDFS directory") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Recurse Subdirectories") + .description("Indicates whether to pull files from subdirectories of the HDFS directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder() - .name("Keep Source File") - .description("Determines whether to delete the file from HDFS after it has been successfully transferred") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); + .name("Keep Source File") + .description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder() - .name("File Filter Regex") - .description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular " - + "Expression will be fetched, otherwise all files will be fetched") + .name("File Filter Regex") + .description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular " + + "Expression will be fetched, otherwise all files will be fetched") .required(false) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder() - .name("Filter Match Name Only") - .description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename " - + "in the regex comparison") + .name("Filter Match Name Only") + .description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename " + + "in the regex comparison") .required(true) .allowableValues("true", "false") .defaultValue("true") .build(); public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder() - .name("Ignore Dotted Files") - .description("If true, files whose names begin with a dot (\".\") will be ignored") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Ignore Dotted Files") + .description("If true, files whose names begin with a dot (\".\") will be ignored") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() - .name("Minimum File Age") - .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored") - .required(true) - .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) - .defaultValue("0 sec") - .build(); + .name("Minimum File Age") + .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored") + .required(true) + .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .defaultValue("0 sec") + .build(); public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() - .name("Maximum File Age") - .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored") - .required(false) - .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) - .build(); + .name("Maximum File Age") + .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .build(); public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The maximum number of files to pull in each iteration, based on run schedule.") - .required(true) - .defaultValue("100") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); + .name("Batch Size") + .description("The maximum number of files to pull in each iteration, based on run schedule.") + .required(true) + .defaultValue("100") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder() - .name("Polling Interval") - .description("Indicates how long to wait between performing directory listings") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("0 sec") - .build(); + .name("Polling Interval") + .description("Indicates how long to wait between performing directory listings") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("IO Buffer Size") - .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .build(); + .name("IO Buffer Size") + .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); private static final Set<Relationship> relationships; protected static final List<PropertyDescriptor> localProperties; @@ -239,8 +236,8 @@ public class GetHDFS extends AbstractHadoopProcessor { abstractOnScheduled(context); // copy configuration values to pass them around cleanly processorConfig = new ProcessorConfiguration(context); - FileSystem fs = hdfsResources.get().getValue(); - Path dir = new Path(context.getProperty(DIRECTORY).getValue()); + final FileSystem fs = getFileSystem(); + final Path dir = new Path(context.getProperty(DIRECTORY).getValue()); if (!fs.exists(dir)) { throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist."); } @@ -333,8 +330,8 @@ public class GetHDFS extends AbstractHadoopProcessor { protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) { // process the batch of files FSDataInputStream stream = null; - Configuration conf = hdfsResources.get().getKey(); - FileSystem hdfs = hdfsResources.get().getValue(); + Configuration conf = getConfiguration(); + FileSystem hdfs = getFileSystem(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, @@ -401,7 +398,7 @@ public class GetHDFS extends AbstractHadoopProcessor { if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) { try { - final FileSystem hdfs = hdfsResources.get().getValue(); + final FileSystem hdfs = getFileSystem(); // get listing listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null); lastPollTime.set(System.currentTimeMillis()); @@ -464,33 +461,6 @@ public class GetHDFS extends AbstractHadoopProcessor { } /** - * Returns the relative path of the child that does not include the filename or the root path. - * - * @param root root - * @param child child - * @return the relative path of the child that does not include the filename or the root path - */ - public static String getPathDifference(final Path root, final Path child) { - final int depthDiff = child.depth() - root.depth(); - if (depthDiff <= 1) { - return "".intern(); - } - String lastRoot = root.getName(); - Path childsParent = child.getParent(); - final StringBuilder builder = new StringBuilder(); - builder.append(childsParent.getName()); - for (int i = (depthDiff - 3); i >= 0; i--) { - childsParent = childsParent.getParent(); - String name = childsParent.getName(); - if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) { - break; - } - builder.insert(0, Path.SEPARATOR).insert(0, name); - } - return builder.toString(); - } - - /** * Holder for a snapshot in time of some processor properties that are passed around. */ protected static class ProcessorConfiguration {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java index 22ba36b..f032ee4 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java @@ -22,6 +22,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -34,10 +37,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.util.StopWatch; -import org.apache.nifi.util.Tuple; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; /** * This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested @@ -80,9 +79,8 @@ public class GetHDFSSequenceFile extends GetHDFS { @Override protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) { - final Tuple<Configuration, FileSystem> hadoopResources = hdfsResources.get(); - final Configuration conf = hadoopResources.getKey(); - final FileSystem hdfs = hadoopResources.getValue(); + final Configuration conf = getConfiguration(); + final FileSystem hdfs = getFileSystem(); final String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java new file mode 100644 index 0000000..151cbf2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.util.HDFSListing; +import org.apache.nifi.processors.hadoop.util.StringSerDe; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + + +@TriggerSerially +@TriggerWhenEmpty +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents " + + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only " + + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating " + + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.") +@WritesAttributes({ + @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."), + @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, " + + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up " + + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."), + @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"), + @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"), + @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), + @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"), + @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"), + @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, " + + "3 for the group, and 3 for other users. For example rw-rw-r--") +}) +@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class}) +public class ListHDFS extends AbstractHadoopProcessor { + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("Distributed Cache Service") + .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node " + + "begins pulling data, it won't duplicate all of the work that has been done.") + .required(true) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name(DIRECTORY_PROP_NAME) + .description("The HDFS directory from which files should be read") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() + .name("Recurse Subdirectories") + .description("Indicates whether to list files from subdirectories of the HDFS directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are transferred to this relationship") + .build(); + + private volatile Long lastListingTime = null; + private volatile Set<Path> latestPathsListed = new HashSet<>(); + private volatile boolean electedPrimaryNode = false; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + } + + protected File getPersistenceFile() { + return new File("conf/state/" + getIdentifier()); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HADOOP_CONFIGURATION_RESOURCES); + properties.add(DISTRIBUTED_CACHE_SERVICE); + properties.add(DIRECTORY); + properties.add(RECURSE_SUBDIRS); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + return relationships; + } + + protected String getKey(final String directory) { + return getIdentifier() + ".lastListingTime." + directory; + } + + @OnPrimaryNodeStateChange + public void onPrimaryNodeChange(final PrimaryNodeState newState) { + if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) { + electedPrimaryNode = true; + } + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if ( descriptor.equals(DIRECTORY) ) { + lastListingTime = null; // clear lastListingTime so that we have to fetch new time + latestPathsListed = new HashSet<>(); + } + } + + private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(serializedState); + return mapper.readValue(jsonNode, HDFSListing.class); + } + + + private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException { + // Determine the timestamp for the last file that we've listed. + Long minTimestamp = lastListingTime; + if ( minTimestamp == null || electedPrimaryNode ) { + // We haven't yet restored any state from local or distributed state - or it's been at least a minute since + // we have performed a listing. In this case, + // First, attempt to get timestamp from distributed cache service. + try { + final StringSerDe serde = new StringSerDe(); + final String serializedState = client.get(getKey(directory), serde, serde); + if ( serializedState == null || serializedState.isEmpty() ) { + minTimestamp = null; + this.latestPathsListed = Collections.emptySet(); + } else { + final HDFSListing listing = deserialize(serializedState); + this.lastListingTime = listing.getLatestTimestamp().getTime(); + minTimestamp = listing.getLatestTimestamp().getTime(); + this.latestPathsListed = listing.toPaths(); + } + + this.lastListingTime = minTimestamp; + electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. + } catch (final IOException ioe) { + throw ioe; + } + + // Check the persistence file. We want to use the latest timestamp that we have so that + // we don't duplicate data. + try { + final File persistenceFile = getPersistenceFile(); + if ( persistenceFile.exists() ) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + final Properties props = new Properties(); + props.load(fis); + + // get the local timestamp for this directory, if it exists. + final String locallyPersistedValue = props.getProperty(directory); + if ( locallyPersistedValue != null ) { + final HDFSListing listing = deserialize(locallyPersistedValue); + final long localTimestamp = listing.getLatestTimestamp().getTime(); + + // If distributed state doesn't have an entry or the local entry is later than the distributed state, + // update the distributed state so that we are in sync. + if (minTimestamp == null || localTimestamp > minTimestamp) { + minTimestamp = localTimestamp; + + // Our local persistence file shows a later time than the Distributed service. + // Update the distributed service to match our local state. + try { + final StringSerDe serde = new StringSerDe(); + client.put(getKey(directory), locallyPersistedValue, serde, serde); + } catch (final IOException ioe) { + getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " + + "state due to {}. If a new node performs HDFS Listing, data duplication may occur", + new Object[] {directory, locallyPersistedValue, ioe}); + } + } + } + } + } + } catch (final IOException ioe) { + getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); + } + } + + return minTimestamp; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String directory = context.getProperty(DIRECTORY).getValue(); + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + + final Long minTimestamp; + try { + minTimestamp = getMinTimestamp(directory, client); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + + // Pull in any file that is newer than the timestamp that we have. + final FileSystem hdfs = getFileSystem(); + final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); + final Path rootPath = new Path(directory); + + int listCount = 0; + Long latestListingModTime = null; + final Set<FileStatus> statuses; + try { + statuses = getStatuses(rootPath, recursive, hdfs); + for ( final FileStatus status : statuses ) { + // don't get anything where the last modified timestamp is equal to our current timestamp. + // if we do, then we run the risk of multiple files having the same last mod date but us only + // seeing a portion of them. + // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe + // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a + // modified date not before the current time. + final long fileModTime = status.getModificationTime(); + + // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time. + // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those. + boolean fetch = !status.getPath().getName().endsWith("_COPYING_") + && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath()))); + + // Create the FlowFile for this path. + if ( fetch ) { + final Map<String, String> attributes = createAttributes(status); + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + listCount++; + + if ( latestListingModTime == null || fileModTime > latestListingModTime ) { + latestListingModTime = fileModTime; + } + } + } + } catch (final IOException ioe) { + getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); + return; + } + + if ( listCount > 0 ) { + getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount}); + session.commit(); + + // We have performed a listing and pushed the FlowFiles out. + // Now, we need to persist state about the Last Modified timestamp of the newest file + // that we pulled in. We do this in order to avoid pulling in the same file twice. + // However, we want to save the state both locally and remotely. + // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the + // previously Primary Node left off. + // We also store the state locally so that if the node is restarted, and the node cannot contact + // the distributed state cache, the node can continue to run (if it is primary node). + String serializedState = null; + try { + serializedState = serializeState(latestListingModTime, statuses); + } catch (final Exception e) { + getLogger().error("Failed to serialize state due to {}", new Object[] {e}); + } + + if ( serializedState != null ) { + // Save our state locally. + try { + persistLocalState(directory, serializedState); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); + } + + // Attempt to save state to remote server. + try { + client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); + } catch (final IOException ioe) { + getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); + } + } + + lastListingTime = latestListingModTime; + } else { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); + + // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system + if ( lastListingTime == null ) { + lastListingTime = 0L; + } + + return; + } + } + + private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { + final Set<FileStatus> statusSet = new HashSet<>(); + + final FileStatus[] statuses = hdfs.listStatus(path); + + for ( final FileStatus status : statuses ) { + if ( status.isDirectory() ) { + if ( recursive ) { + try { + statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs)); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); + } + } + } else { + statusSet.add(status); + } + } + + return statusSet; + } + + + private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException { + // we need to keep track of all files that we pulled in that had a modification time equal to + // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files + // that have a mod time equal to that timestamp because more files may come in with the same timestamp + // later in the same millisecond. + if ( statuses.isEmpty() ) { + return null; + } else { + final List<FileStatus> sortedStatuses = new ArrayList<>(statuses); + Collections.sort(sortedStatuses, new Comparator<FileStatus>() { + @Override + public int compare(final FileStatus o1, final FileStatus o2) { + return Long.compare(o1.getModificationTime(), o2.getModificationTime()); + } + }); + + final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime(); + final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>(); + for (int i=sortedStatuses.size() - 1; i >= 0; i--) { + final FileStatus status = sortedStatuses.get(i); + if (status.getModificationTime() == latestListingModTime) { + pathsWithModTimeEqualToListingModTime.add(status.getPath()); + } + } + + this.latestPathsListed = pathsWithModTimeEqualToListingModTime; + + final HDFSListing listing = new HDFSListing(); + listing.setLatestTimestamp(new Date(latestListingModTime)); + final Set<String> paths = new HashSet<>(); + for ( final Path path : pathsWithModTimeEqualToListingModTime ) { + paths.add(path.toUri().toString()); + } + listing.setMatchingPaths(paths); + + final ObjectMapper mapper = new ObjectMapper(); + final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing); + return serializedState; + } + } + + protected void persistLocalState(final String directory, final String serializedState) throws IOException { + // we need to keep track of all files that we pulled in that had a modification time equal to + // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files + // that have a mod time equal to that timestamp because more files may come in with the same timestamp + // later in the same millisecond. + final File persistenceFile = getPersistenceFile(); + final File dir = persistenceFile.getParentFile(); + if ( !dir.exists() && !dir.mkdirs() ) { + throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); + } + + final Properties props = new Properties(); + if ( persistenceFile.exists() ) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + props.load(fis); + } + } + + props.setProperty(directory, serializedState); + + try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { + props.store(fos, null); + } + } + + private String getAbsolutePath(final Path path) { + final Path parent = path.getParent(); + final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); + return prefix + "/" + path.getName(); + } + + private Map<String, String> createAttributes(final FileStatus status) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName()); + attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent())); + + attributes.put("hdfs.owner", status.getOwner()); + attributes.put("hdfs.group", status.getGroup()); + attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime())); + attributes.put("hdfs.length", String.valueOf(status.getLen())); + attributes.put("hdfs.replication", String.valueOf(status.getReplication())); + + final FsPermission permission = status.getPermission(); + final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction()); + attributes.put("hdfs.permissions", perms); + return attributes; + } + + private String getPerms(final FsAction action) { + final StringBuilder sb = new StringBuilder(); + if ( action.implies(FsAction.READ) ) { + sb.append("r"); + } else { + sb.append("-"); + } + + if ( action.implies(FsAction.WRITE) ) { + sb.append("w"); + } else { + sb.append("-"); + } + + if ( action.implies(FsAction.EXECUTE) ) { + sb.append("x"); + } else { + sb.append("-"); + } + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 057f786..52cf475 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -32,10 +32,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ipc.RemoteException; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -54,7 +54,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; -import org.apache.nifi.util.Tuple; /** * This processor copies FlowFiles to HDFS. @@ -183,8 +182,7 @@ public class PutHDFS extends AbstractHadoopProcessor { } else { dfsUmask = FsPermission.DEFAULT_UMASK; } - final Tuple<Configuration, FileSystem> resources = hdfsResources.get(); - final Configuration conf = resources.getKey(); + final Configuration conf = getConfiguration(); FsPermission.setUMask(conf, new FsPermission(dfsUmask)); } @@ -195,26 +193,23 @@ public class PutHDFS extends AbstractHadoopProcessor { return; } - final Tuple<Configuration, FileSystem> resources = hdfsResources.get(); - if (resources == null || resources.getKey() == null || resources.getValue() == null) { + final Configuration configuration = getConfiguration(); + final FileSystem hdfs = getFileSystem(); + if (configuration == null || hdfs == null) { getLogger().error("HDFS not configured properly"); session.transfer(flowFile, REL_FAILURE); context.yield(); return; } - final Configuration conf = resources.getKey(); - final FileSystem hdfs = resources.getValue(); - final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile) - .getValue()); + final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue()); final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); - final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, - BUFFER_SIZE_DEFAULT); + final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs @@ -230,7 +225,7 @@ public class PutHDFS extends AbstractHadoopProcessor { // Create destination directory if it does not exist try { - if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) { + if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) { throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory"); } } catch (FileNotFoundException fe) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java new file mode 100644 index 0000000..49957f5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.XmlType; + +import org.apache.hadoop.fs.Path; + +/** + * A simple POJO for maintaining state about the last HDFS Listing that was performed so that + * we can avoid pulling the same file multiple times + */ +@XmlType(name = "listing") +public class HDFSListing { + private Date latestTimestamp; + private Collection<String> matchingPaths; + + /** + * @return the modification date of the newest file that was contained in the HDFS Listing + */ + public Date getLatestTimestamp() { + return latestTimestamp; + } + + /** + * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing + * + * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing + */ + public void setLatestTimestamp(Date latestTimestamp) { + this.latestTimestamp = latestTimestamp; + } + + /** + * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date + * was equal to {@link #getLatestTimestamp()} + */ + @XmlTransient + public Collection<String> getMatchingPaths() { + return matchingPaths; + } + + /** + * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()} + */ + public Set<Path> toPaths() { + final Set<Path> paths = new HashSet<>(matchingPaths.size()); + for ( final String pathname : matchingPaths ) { + paths.add(new Path(pathname)); + } + return paths; + } + + /** + * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was + * equal to {@link #getLatestTimestamp()} + * @param matchingPaths the paths that have last modified date matching the latest timestamp + */ + public void setMatchingPaths(Collection<String> matchingPaths) { + this.matchingPaths = matchingPaths; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java new file mode 100644 index 0000000..229f26c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +public class LongSerDe implements Serializer<Long>, Deserializer<Long> { + + @Override + public Long deserialize(final byte[] input) throws DeserializationException, IOException { + if ( input == null || input.length == 0 ) { + return null; + } + + final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input)); + return dis.readLong(); + } + + @Override + public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException { + final DataOutputStream dos = new DataOutputStream(out); + dos.writeLong(value); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java new file mode 100644 index 0000000..ca1c548 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +public class StringSerDe implements Serializer<String>, Deserializer<String> { + + @Override + public String deserialize(final byte[] value) throws DeserializationException, IOException { + if ( value == null ) { + return null; + } + + return new String(value, StandardCharsets.UTF_8); + } + + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index da16ef7..4b359e8 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.hadoop.GetHDFS -org.apache.nifi.processors.hadoop.PutHDFS org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile +org.apache.nifi.processors.hadoop.FetchHDFS +org.apache.nifi.processors.hadoop.GetHDFS org.apache.nifi.processors.hadoop.GetHDFSSequenceFile +org.apache.nifi.processors.hadoop.ListHDFS +org.apache.nifi.processors.hadoop.PutHDFS http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java new file mode 100644 index 0000000..5822fc5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestListHDFS { + + private TestRunner runner; + private ListHDFSWithMockedFileSystem proc; + private MockCacheClient service; + + @Before + public void setup() throws InitializationException { + proc = new ListHDFSWithMockedFileSystem(); + runner = TestRunners.newTestRunner(proc); + + service = new MockCacheClient(); + runner.addControllerService("service", service); + runner.enableControllerService(service); + + runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml"); + runner.setProperty(ListHDFS.DIRECTORY, "/test"); + runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service"); + } + + @Test + public void testListingHasCorrectAttributes() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff.assertAttributeEquals("path", "/test"); + mff.assertAttributeEquals("filename", "testFile.txt"); + } + + + @Test + public void testRecursive() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS); + for (int i=0; i < 2; i++) { + final MockFlowFile ff = flowFiles.get(i); + final String filename = ff.getAttribute("filename"); + + if (filename.equals("testFile.txt")) { + ff.assertAttributeEquals("path", "/test"); + } else if ( filename.equals("1.txt")) { + ff.assertAttributeEquals("path", "/test/testDir"); + } else { + Assert.fail("filename was " + filename); + } + } + } + + @Test + public void testNotRecursive() { + runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false"); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff1.assertAttributeEquals("path", "/test"); + mff1.assertAttributeEquals("filename", "testFile.txt"); + } + + + @Test + public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff1.assertAttributeEquals("path", "/test"); + mff1.assertAttributeEquals("filename", "testFile.txt"); + + runner.clearTransferState(); + + // add new file to pull + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); + + // trigger primary node change + proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); + + // cause calls to service to fail + service.failOnCalls = true; + + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + final String key = proc.getKey("/test"); + + // wait just to a bit to ensure that the timestamp changes when we update the service + final Object curVal = service.values.get(key); + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + } + + service.failOnCalls = false; + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + // ensure state saved both locally & remotely + assertTrue(proc.localStateSaved); + assertNotNull(service.values.get(key)); + assertNotSame(curVal, service.values.get(key)); + } + + + private FsPermission create777() { + return new FsPermission((short) 0777); + } + + + private class ListHDFSWithMockedFileSystem extends ListHDFS { + private final MockFileSystem fileSystem = new MockFileSystem(); + private boolean localStateSaved = false; + + @Override + protected FileSystem getFileSystem() { + return fileSystem; + } + + @Override + protected File getPersistenceFile() { + return new File("target/conf/state-file"); + } + + @Override + protected FileSystem getFileSystem(final Configuration config) throws IOException { + return fileSystem; + } + + @Override + protected void persistLocalState(final String directory, final String serializedState) throws IOException { + super.persistLocalState(directory, serializedState); + localStateSaved = true; + } + } + + + private class MockFileSystem extends FileSystem { + private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>(); + + public void addFileStatus(final Path parent, final FileStatus child) { + Set<FileStatus> children = fileStatuses.get(parent); + if ( children == null ) { + children = new HashSet<>(); + fileStatuses.put(parent, children); + } + + children.add(child); + } + + + @Override + public long getDefaultBlockSize() { + return 1024L; + } + + @Override + public short getDefaultReplication() { + return 1; + } + + @Override + public URI getUri() { + return null; + } + + @Override + public FSDataInputStream open(final Path f, final int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, + final long blockSize, final Progressable progress) throws IOException { + return null; + } + + @Override + public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { + return null; + } + + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + return false; + } + + @Override + public boolean delete(final Path f, final boolean recursive) throws IOException { + return false; + } + + @Override + public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { + final Set<FileStatus> statuses = fileStatuses.get(f); + if ( statuses == null ) { + return new FileStatus[0]; + } + + return statuses.toArray(new FileStatus[statuses.size()]); + } + + @Override + public void setWorkingDirectory(final Path new_dir) { + + } + + @Override + public Path getWorkingDirectory() { + return new Path(new File(".").getAbsolutePath()); + } + + @Override + public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(final Path f) throws IOException { + return null; + } + + } + + + private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { + private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>(); + private boolean failOnCalls = false; + + private void verifyNotFail() throws IOException { + if ( failOnCalls ) { + throw new IOException("Could not call to remote service because Unit Test marked service unavailable"); + } + } + + @Override + public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + verifyNotFail(); + final Object retValue = values.putIfAbsent(key, value); + return (retValue == null); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, + final Deserializer<V> valueDeserializer) throws IOException { + verifyNotFail(); + return (V) values.putIfAbsent(key, value); + } + + @Override + public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { + verifyNotFail(); + return values.containsKey(key); + } + + @Override + public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + verifyNotFail(); + values.put(key, value); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + verifyNotFail(); + return (V) values.get(key); + } + + @Override + public void close() throws IOException { + } + + @Override + public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { + verifyNotFail(); + values.remove(key); + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java index 3beab65..9ea793d 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java @@ -121,13 +121,13 @@ public class IndexConfiguration { } } - public File getWritableIndexDirectory(final File provenanceLogFile) { + public File getWritableIndexDirectory(final File provenanceLogFile, final long newIndexTimestamp) { lock.lock(); try { final File storageDirectory = provenanceLogFile.getParentFile(); List<File> indexDirectories = this.indexDirectoryMap.get(storageDirectory); if (indexDirectories == null) { - final File newDir = addNewIndex(storageDirectory, provenanceLogFile); + final File newDir = addNewIndex(storageDirectory, provenanceLogFile, newIndexTimestamp); indexDirectories = new ArrayList<>(); indexDirectories.add(newDir); indexDirectoryMap.put(storageDirectory, indexDirectories); @@ -135,7 +135,7 @@ public class IndexConfiguration { } if (indexDirectories.isEmpty()) { - final File newDir = addNewIndex(storageDirectory, provenanceLogFile); + final File newDir = addNewIndex(storageDirectory, provenanceLogFile, newIndexTimestamp); indexDirectories.add(newDir); return newDir; } @@ -143,7 +143,7 @@ public class IndexConfiguration { final File lastDir = indexDirectories.get(indexDirectories.size() - 1); final long size = getSize(lastDir); if (size > repoConfig.getDesiredIndexSize()) { - final File newDir = addNewIndex(storageDirectory, provenanceLogFile); + final File newDir = addNewIndex(storageDirectory, provenanceLogFile, newIndexTimestamp); indexDirectories.add(newDir); return newDir; } else { @@ -154,14 +154,14 @@ public class IndexConfiguration { } } - private File addNewIndex(final File storageDirectory, final File provenanceLogFile) { + private File addNewIndex(final File storageDirectory, final File provenanceLogFile, final long newIndexTimestamp) { // Build the event time of the first record into the index's filename so that we can determine // which index files to look at when we perform a search. We use the timestamp of the first record // in the Provenance Log file, rather than the current time, because we may perform the Indexing // retroactively. Long firstEntryTime = getFirstEntryTime(provenanceLogFile); if (firstEntryTime == null) { - firstEntryTime = System.currentTimeMillis(); + firstEntryTime = newIndexTimestamp; } return new File(storageDirectory, "index-" + firstEntryTime); } @@ -222,7 +222,7 @@ public class IndexConfiguration { } }); - for (File indexDir : sortedIndexDirectories) { + for (final File indexDir : sortedIndexDirectories) { // If the index was last modified before the start time, we know that it doesn't // contain any data for us to query. if (startTime != null && indexDir.lastModified() < startTime) { @@ -282,7 +282,7 @@ public class IndexConfiguration { } boolean foundIndexCreatedLater = false; - for (File indexDir : sortedIndexDirectories) { + for (final File indexDir : sortedIndexDirectories) { // If the index was last modified before the log file was created, we know the index doesn't include // any data for the provenance log. if (indexDir.lastModified() < firstEntryTime) {
