http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 1dd5b91,361f1ed..f7894d9 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@@ -58,13 -58,18 +58,15 @@@ import org.apache.nifi.processor.except import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; -/** - * This processor reads files from HDFS into NiFi FlowFiles. - */ @TriggerWhenEmpty @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) -@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles") +@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.") @WritesAttributes({ - @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."), - @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") }) + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."), - @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if " - + "the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse " - + "Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")}) -@SeeAlso(PutHDFS.class) ++ @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property " ++ + "is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and " ++ + "a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") }) +@SeeAlso({PutHDFS.class, ListHDFS.class}) public class GetHDFS extends AbstractHadoopProcessor { public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; @@@ -73,105 -78,101 +75,101 @@@ // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() -- .name("success") -- .description("All files retrieved from HDFS are transferred to this relationship") -- .build(); ++ .name("success") ++ .description("All files retrieved from HDFS are transferred to this relationship") ++ .build(); public static final Relationship REL_PASSTHROUGH = new Relationship.Builder() -- .name("passthrough") -- .description( -- "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship") ++ .name("passthrough") ++ .description( ++ "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship") .build(); // properties public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() -- .name(DIRECTORY_PROP_NAME) -- .description("The HDFS directory from which files should be read") -- .required(true) -- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) -- .build(); ++ .name(DIRECTORY_PROP_NAME) ++ .description("The HDFS directory from which files should be read") ++ .required(true) ++ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) ++ .build(); public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() -- .name("Recurse Subdirectories") -- .description("Indicates whether to pull files from subdirectories of the HDFS directory") -- .required(true) -- .allowableValues("true", "false") -- .defaultValue("true") -- .build(); ++ .name("Recurse Subdirectories") ++ .description("Indicates whether to pull files from subdirectories of the HDFS directory") ++ .required(true) ++ .allowableValues("true", "false") ++ .defaultValue("true") ++ .build(); public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder() -- .name("Keep Source File") - .description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.") - .description("Determines whether to delete the file from HDFS after it has been successfully transferred") -- .required(true) -- .allowableValues("true", "false") -- .defaultValue("false") -- .build(); ++ .name("Keep Source File") ++ .description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.") ++ .required(true) ++ .allowableValues("true", "false") ++ .defaultValue("false") ++ .build(); public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder() -- .name("File Filter Regex") - .description( - "A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched") - .description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular " - + "Expression will be fetched, otherwise all files will be fetched") ++ .name("File Filter Regex") ++ .description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular " ++ + "Expression will be fetched, otherwise all files will be fetched") .required(false) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder() -- .name("Filter Match Name Only") - .description( - "If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename in the regex comparison") - .description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename " - + "in the regex comparison") ++ .name("Filter Match Name Only") ++ .description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename " ++ + "in the regex comparison") .required(true) .allowableValues("true", "false") .defaultValue("true") .build(); public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder() -- .name("Ignore Dotted Files") -- .description("If true, files whose names begin with a dot (\".\") will be ignored") -- .required(true) -- .allowableValues("true", "false") -- .defaultValue("true") -- .build(); ++ .name("Ignore Dotted Files") ++ .description("If true, files whose names begin with a dot (\".\") will be ignored") ++ .required(true) ++ .allowableValues("true", "false") ++ .defaultValue("true") ++ .build(); public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() -- .name("Minimum File Age") - .description( - "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored") - .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored") -- .required(true) - .addValidator( - StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) - .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) -- .defaultValue("0 sec") -- .build(); ++ .name("Minimum File Age") ++ .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored") ++ .required(true) ++ .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) ++ .defaultValue("0 sec") ++ .build(); public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() -- .name("Maximum File Age") - .description( - "The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored") - .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored") -- .required(false) - .addValidator( - StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) - .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) -- .build(); ++ .name("Maximum File Age") ++ .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored") ++ .required(false) ++ .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) ++ .build(); public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() -- .name("Batch Size") -- .description("The maximum number of files to pull in each iteration, based on run schedule.") -- .required(true) -- .defaultValue("100") -- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) -- .build(); ++ .name("Batch Size") ++ .description("The maximum number of files to pull in each iteration, based on run schedule.") ++ .required(true) ++ .defaultValue("100") ++ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) ++ .build(); public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder() -- .name("Polling Interval") -- .description("Indicates how long to wait between performing directory listings") -- .required(true) -- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) -- .defaultValue("0 sec") -- .build(); ++ .name("Polling Interval") ++ .description("Indicates how long to wait between performing directory listings") ++ .required(true) ++ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) ++ .defaultValue("0 sec") ++ .build(); public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder() -- .name("IO Buffer Size") -- .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration") -- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) -- .build(); ++ .name("IO Buffer Size") ++ .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration") ++ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) ++ .build(); private static final Set<Relationship> relationships; protected static final List<PropertyDescriptor> localProperties; @@@ -461,11 -463,35 +460,8 @@@ return files; } - - /** - * Holder for a snapshot in time of some processor properties that are - * passed around. - * Returns the relative path of the child that does not include the filename or the root path. - * - * @param root root - * @param child child - * @return the relative path of the child that does not include the filename or the root path - */ - public static String getPathDifference(final Path root, final Path child) { - final int depthDiff = child.depth() - root.depth(); - if (depthDiff <= 1) { - return "".intern(); - } - String lastRoot = root.getName(); - Path childsParent = child.getParent(); - final StringBuilder builder = new StringBuilder(); - builder.append(childsParent.getName()); - for (int i = (depthDiff - 3); i >= 0; i--) { - childsParent = childsParent.getParent(); - String name = childsParent.getName(); - if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) { - break; - } - builder.insert(0, Path.SEPARATOR).insert(0, name); - } - return builder.toString(); - } - - /** + * Holder for a snapshot in time of some processor properties that are passed around. */ protected static class ProcessorConfiguration {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 707b50d,0000000..56a128a mode 100644,000000..100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@@ -1,466 -1,0 +1,469 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.util.HDFSListing; +import org.apache.nifi.processors.hadoop.util.StringSerDe; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + + +@TriggerSerially +@TriggerWhenEmpty +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents " - + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only " - + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating " - + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.") ++ + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only " ++ + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating " ++ + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.") +@WritesAttributes({ - @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."), - @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."), - @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"), - @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"), - @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), - @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"), - @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"), - @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r--") ++ @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."), ++ @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, " ++ + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up " ++ + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."), ++ @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"), ++ @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"), ++ @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), ++ @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"), ++ @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"), ++ @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, " ++ + "3 for the group, and 3 for other users. For example rw-rw-r--") +}) +@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class}) +public class ListHDFS extends AbstractHadoopProcessor { - - public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() - .name("Distributed Cache Service") - .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node begins pulling data, it won't duplicate all of the work that has been done.") - .required(true) - .identifiesControllerService(DistributedMapCacheClient.class) - .build(); ++ public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() ++ .name("Distributed Cache Service") ++ .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node " ++ + "begins pulling data, it won't duplicate all of the work that has been done.") ++ .required(true) ++ .identifiesControllerService(DistributedMapCacheClient.class) ++ .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name(DIRECTORY_PROP_NAME) - .description("The HDFS directory from which files should be read") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() - .name("Recurse Subdirectories") - .description("Indicates whether to list files from subdirectories of the HDFS directory") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - - ++ .name(DIRECTORY_PROP_NAME) ++ .description("The HDFS directory from which files should be read") ++ .required(true) ++ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) ++ .build(); ++ ++ public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() ++ .name("Recurse Subdirectories") ++ .description("Indicates whether to list files from subdirectories of the HDFS directory") ++ .required(true) ++ .allowableValues("true", "false") ++ .defaultValue("true") ++ .build(); ++ ++ + public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles are transferred to this relationship") - .build(); ++ .name("success") ++ .description("All FlowFiles are transferred to this relationship") ++ .build(); + + private volatile Long lastListingTime = null; + private volatile Set<Path> latestPathsListed = new HashSet<>(); + private volatile boolean electedPrimaryNode = false; + private File persistenceFile = null; - ++ + @Override + protected void init(final ProcessorInitializationContext context) { - super.init(context); - persistenceFile = new File("conf/state/" + getIdentifier()); ++ super.init(context); ++ persistenceFile = new File("conf/state/" + getIdentifier()); ++ } ++ ++ @Override ++ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { ++ final List<PropertyDescriptor> properties = new ArrayList<>(); ++ properties.add(HADOOP_CONFIGURATION_RESOURCES); ++ properties.add(DISTRIBUTED_CACHE_SERVICE); ++ properties.add(DIRECTORY); ++ properties.add(RECURSE_SUBDIRS); ++ return properties; ++ } ++ ++ @Override ++ public Set<Relationship> getRelationships() { ++ final Set<Relationship> relationships = new HashSet<>(); ++ relationships.add(REL_SUCCESS); ++ return relationships; ++ } ++ ++ private String getKey(final String directory) { ++ return getIdentifier() + ".lastListingTime." + directory; ++ } ++ ++ @OnPrimaryNodeStateChange ++ public void onPrimaryNodeChange(final PrimaryNodeState newState) { ++ if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) { ++ electedPrimaryNode = true; ++ } ++ } ++ ++ @Override ++ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { ++ if ( descriptor.equals(DIRECTORY) ) { ++ lastListingTime = null; // clear lastListingTime so that we have to fetch new time ++ latestPathsListed = new HashSet<>(); ++ } + } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(HADOOP_CONFIGURATION_RESOURCES); - properties.add(DISTRIBUTED_CACHE_SERVICE); - properties.add(DIRECTORY); - properties.add(RECURSE_SUBDIRS); - return properties; - } - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - return relationships; - } - - private String getKey(final String directory) { - return getIdentifier() + ".lastListingTime." + directory; - } - - @OnPrimaryNodeStateChange - public void onPrimaryNodeChange(final PrimaryNodeState newState) { - if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) { - electedPrimaryNode = true; - } - } - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - if ( descriptor.equals(DIRECTORY) ) { - lastListingTime = null; // clear lastListingTime so that we have to fetch new time - latestPathsListed = new HashSet<>(); - } - } - - private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { - final ObjectMapper mapper = new ObjectMapper(); ++ ++ private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { ++ final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(serializedState); + return mapper.readValue(jsonNode, HDFSListing.class); - } - - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final String directory = context.getProperty(DIRECTORY).getValue(); - - // Determine the timestamp for the last file that we've listed. - Long minTimestamp = lastListingTime; - if ( minTimestamp == null || electedPrimaryNode ) { - // We haven't yet restored any state from local or distributed state - or it's been at least a minute since - // we have performed a listing. In this case, - // First, attempt to get timestamp from distributed cache service. - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - - try { - final StringSerDe serde = new StringSerDe(); - final String serializedState = client.get(getKey(directory), serde, serde); - if ( serializedState == null || serializedState.isEmpty() ) { - minTimestamp = null; - this.latestPathsListed = Collections.emptySet(); - } else { - final HDFSListing listing = deserialize(serializedState); - this.lastListingTime = listing.getLatestTimestamp().getTime(); - minTimestamp = listing.getLatestTimestamp().getTime(); - this.latestPathsListed = listing.toPaths(); - } - - this.lastListingTime = minTimestamp; - electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. - } catch (final IOException ioe) { - getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); - context.yield(); - return; - } - - // Check the persistence file. We want to use the latest timestamp that we have so that - // we don't duplicate data. - try { - if ( persistenceFile.exists() ) { - try (final FileInputStream fis = new FileInputStream(persistenceFile)) { - final Properties props = new Properties(); - props.load(fis); - - // get the local timestamp for this directory, if it exists. - final String locallyPersistedValue = props.getProperty(directory); - if ( locallyPersistedValue != null ) { - final HDFSListing listing = deserialize(locallyPersistedValue); - final long localTimestamp = listing.getLatestTimestamp().getTime(); - - // If distributed state doesn't have an entry or the local entry is later than the distributed state, - // update the distributed state so that we are in sync. - if (minTimestamp == null || localTimestamp > minTimestamp) { - minTimestamp = localTimestamp; - - // Our local persistence file shows a later time than the Distributed service. - // Update the distributed service to match our local state. - try { - final StringSerDe serde = new StringSerDe(); - client.put(getKey(directory), locallyPersistedValue, serde, serde); - } catch (final IOException ioe) { - getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " - + "state due to {}. If a new node performs HDFS Listing, data duplication may occur", - new Object[] {directory, locallyPersistedValue, ioe}); - } - } - } - } - } - } catch (final IOException ioe) { - getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); - } - } - - - // Pull in any file that is newer than the timestamp that we have. - final FileSystem hdfs = hdfsResources.get().getValue(); - final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); - final Path rootPath = new Path(directory); - - int listCount = 0; - Long latestListingModTime = null; - final Set<FileStatus> statuses; - try { - statuses = getStatuses(rootPath, recursive, hdfs); - for ( final FileStatus status : statuses ) { - // don't get anything where the last modified timestamp is equal to our current timestamp. - // if we do, then we run the risk of multiple files having the same last mod date but us only - // seeing a portion of them. - // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe - // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a - // modified date not before the current time. - final long fileModTime = status.getModificationTime(); - - // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time. - // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those. - boolean fetch = !status.getPath().getName().endsWith("_COPYING_") && - (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath()))); - - // Create the FlowFile for this path. - if ( fetch ) { - final Map<String, String> attributes = createAttributes(status); - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); - listCount++; - - if ( latestListingModTime == null || fileModTime > latestListingModTime ) { - latestListingModTime = fileModTime; - } - } - } - } catch (final IOException ioe) { - getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); - return; - } - - if ( listCount > 0 ) { - getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount}); - session.commit(); - - // We have performed a listing and pushed the FlowFiles out. - // Now, we need to persist state about the Last Modified timestamp of the newest file - // that we pulled in. We do this in order to avoid pulling in the same file twice. - // However, we want to save the state both locally and remotely. - // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the - // previously Primary Node left off. - // We also store the state locally so that if the node is restarted, and the node cannot contact - // the distributed state cache, the node can continue to run (if it is primary node). - String serializedState = null; - try { - serializedState = serializeState(latestListingModTime, statuses); - } catch (final Exception e) { - getLogger().error("Failed to serialize state due to {}", new Object[] {e}); - } - - if ( serializedState != null ) { - // Save our state locally. - try { - persistLocalState(directory, serializedState); - } catch (final IOException ioe) { - getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); - } - - // Attempt to save state to remote server. - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - try { - client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); - } catch (final IOException ioe) { - getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); - } - } - - lastListingTime = latestListingModTime; - } else { - getLogger().debug("There is no data to list. Yielding."); - context.yield(); - - // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system - if ( lastListingTime == null ) { - lastListingTime = 0L; - } - - return; - } - } - - private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { - final Set<FileStatus> statusSet = new HashSet<>(); - - final FileStatus[] statuses = hdfs.listStatus(path); - - for ( final FileStatus status : statuses ) { - if ( status.isDirectory() ) { - if ( recursive ) { - try { - statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs)); - } catch (final IOException ioe) { - getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); - } - } - } else { - statusSet.add(status); - } - } - - return statusSet; - } - - - private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException { - // we need to keep track of all files that we pulled in that had a modification time equal to - // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files - // that have a mod time equal to that timestamp because more files may come in with the same timestamp - // later in the same millisecond. - if ( statuses.isEmpty() ) { - return null; - } else { - final List<FileStatus> sortedStatuses = new ArrayList<>(statuses); - Collections.sort(sortedStatuses, new Comparator<FileStatus>() { - @Override - public int compare(final FileStatus o1, final FileStatus o2) { - return Long.compare(o1.getModificationTime(), o2.getModificationTime()); - } - }); - - final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime(); - final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>(); - for (int i=sortedStatuses.size() - 1; i >= 0; i--) { - final FileStatus status = sortedStatuses.get(i); - if (status.getModificationTime() == latestListingModTime) { - pathsWithModTimeEqualToListingModTime.add(status.getPath()); - } - } - - this.latestPathsListed = pathsWithModTimeEqualToListingModTime; - - final HDFSListing listing = new HDFSListing(); - listing.setLatestTimestamp(new Date(latestListingModTime)); - final Set<String> paths = new HashSet<>(); - for ( final Path path : pathsWithModTimeEqualToListingModTime ) { - paths.add(path.toUri().toString()); - } - listing.setMatchingPaths(paths); - - final ObjectMapper mapper = new ObjectMapper(); - final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing); - return serializedState; - } - } - - private void persistLocalState(final String directory, final String serializedState) throws IOException { - // we need to keep track of all files that we pulled in that had a modification time equal to - // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files - // that have a mod time equal to that timestamp because more files may come in with the same timestamp - // later in the same millisecond. - final File dir = persistenceFile.getParentFile(); - if ( !dir.exists() && !dir.mkdirs() ) { - throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); - } - - final Properties props = new Properties(); - if ( persistenceFile.exists() ) { - try (final FileInputStream fis = new FileInputStream(persistenceFile)) { - props.load(fis); - } - } - - props.setProperty(directory, serializedState); - - try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { - props.store(fos, null); - } - } - - private String getAbsolutePath(final Path path) { - final Path parent = path.getParent(); - final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); - return prefix + "/" + path.getName(); - } - - private Map<String, String> createAttributes(final FileStatus status) { - final Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName()); - attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent())); - - attributes.put("hdfs.owner", status.getOwner()); - attributes.put("hdfs.group", status.getGroup()); - attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime())); - attributes.put("hdfs.length", String.valueOf(status.getLen())); - attributes.put("hdfs.replication", String.valueOf(status.getReplication())); - - final FsPermission permission = status.getPermission(); - final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction()); - attributes.put("hdfs.permissions", perms); - return attributes; - } - - private String getPerms(final FsAction action) { - final StringBuilder sb = new StringBuilder(); - if ( action.implies(FsAction.READ) ) { - sb.append("r"); - } else { - sb.append("-"); - } - - if ( action.implies(FsAction.WRITE) ) { - sb.append("w"); - } else { - sb.append("-"); - } - - if ( action.implies(FsAction.EXECUTE) ) { - sb.append("x"); - } else { - sb.append("-"); - } - - return sb.toString(); - } ++ } ++ ++ ++ @Override ++ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { ++ final String directory = context.getProperty(DIRECTORY).getValue(); ++ ++ // Determine the timestamp for the last file that we've listed. ++ Long minTimestamp = lastListingTime; ++ if ( minTimestamp == null || electedPrimaryNode ) { ++ // We haven't yet restored any state from local or distributed state - or it's been at least a minute since ++ // we have performed a listing. In this case, ++ // First, attempt to get timestamp from distributed cache service. ++ final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); ++ ++ try { ++ final StringSerDe serde = new StringSerDe(); ++ final String serializedState = client.get(getKey(directory), serde, serde); ++ if ( serializedState == null || serializedState.isEmpty() ) { ++ minTimestamp = null; ++ this.latestPathsListed = Collections.emptySet(); ++ } else { ++ final HDFSListing listing = deserialize(serializedState); ++ this.lastListingTime = listing.getLatestTimestamp().getTime(); ++ minTimestamp = listing.getLatestTimestamp().getTime(); ++ this.latestPathsListed = listing.toPaths(); ++ } ++ ++ this.lastListingTime = minTimestamp; ++ electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. ++ } catch (final IOException ioe) { ++ getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); ++ context.yield(); ++ return; ++ } ++ ++ // Check the persistence file. We want to use the latest timestamp that we have so that ++ // we don't duplicate data. ++ try { ++ if ( persistenceFile.exists() ) { ++ try (final FileInputStream fis = new FileInputStream(persistenceFile)) { ++ final Properties props = new Properties(); ++ props.load(fis); ++ ++ // get the local timestamp for this directory, if it exists. ++ final String locallyPersistedValue = props.getProperty(directory); ++ if ( locallyPersistedValue != null ) { ++ final HDFSListing listing = deserialize(locallyPersistedValue); ++ final long localTimestamp = listing.getLatestTimestamp().getTime(); ++ ++ // If distributed state doesn't have an entry or the local entry is later than the distributed state, ++ // update the distributed state so that we are in sync. ++ if (minTimestamp == null || localTimestamp > minTimestamp) { ++ minTimestamp = localTimestamp; ++ ++ // Our local persistence file shows a later time than the Distributed service. ++ // Update the distributed service to match our local state. ++ try { ++ final StringSerDe serde = new StringSerDe(); ++ client.put(getKey(directory), locallyPersistedValue, serde, serde); ++ } catch (final IOException ioe) { ++ getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " ++ + "state due to {}. If a new node performs HDFS Listing, data duplication may occur", ++ new Object[] {directory, locallyPersistedValue, ioe}); ++ } ++ } ++ } ++ } ++ } ++ } catch (final IOException ioe) { ++ getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); ++ } ++ } ++ ++ ++ // Pull in any file that is newer than the timestamp that we have. ++ final FileSystem hdfs = hdfsResources.get().getValue(); ++ final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); ++ final Path rootPath = new Path(directory); ++ ++ int listCount = 0; ++ Long latestListingModTime = null; ++ final Set<FileStatus> statuses; ++ try { ++ statuses = getStatuses(rootPath, recursive, hdfs); ++ for ( final FileStatus status : statuses ) { ++ // don't get anything where the last modified timestamp is equal to our current timestamp. ++ // if we do, then we run the risk of multiple files having the same last mod date but us only ++ // seeing a portion of them. ++ // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe ++ // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a ++ // modified date not before the current time. ++ final long fileModTime = status.getModificationTime(); ++ ++ // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time. ++ // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those. ++ boolean fetch = !status.getPath().getName().endsWith("_COPYING_") ++ && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath()))); ++ ++ // Create the FlowFile for this path. ++ if ( fetch ) { ++ final Map<String, String> attributes = createAttributes(status); ++ FlowFile flowFile = session.create(); ++ flowFile = session.putAllAttributes(flowFile, attributes); ++ session.transfer(flowFile, REL_SUCCESS); ++ listCount++; ++ ++ if ( latestListingModTime == null || fileModTime > latestListingModTime ) { ++ latestListingModTime = fileModTime; ++ } ++ } ++ } ++ } catch (final IOException ioe) { ++ getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); ++ return; ++ } ++ ++ if ( listCount > 0 ) { ++ getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount}); ++ session.commit(); ++ ++ // We have performed a listing and pushed the FlowFiles out. ++ // Now, we need to persist state about the Last Modified timestamp of the newest file ++ // that we pulled in. We do this in order to avoid pulling in the same file twice. ++ // However, we want to save the state both locally and remotely. ++ // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the ++ // previously Primary Node left off. ++ // We also store the state locally so that if the node is restarted, and the node cannot contact ++ // the distributed state cache, the node can continue to run (if it is primary node). ++ String serializedState = null; ++ try { ++ serializedState = serializeState(latestListingModTime, statuses); ++ } catch (final Exception e) { ++ getLogger().error("Failed to serialize state due to {}", new Object[] {e}); ++ } ++ ++ if ( serializedState != null ) { ++ // Save our state locally. ++ try { ++ persistLocalState(directory, serializedState); ++ } catch (final IOException ioe) { ++ getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); ++ } ++ ++ // Attempt to save state to remote server. ++ final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); ++ try { ++ client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); ++ } catch (final IOException ioe) { ++ getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); ++ } ++ } ++ ++ lastListingTime = latestListingModTime; ++ } else { ++ getLogger().debug("There is no data to list. Yielding."); ++ context.yield(); ++ ++ // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system ++ if ( lastListingTime == null ) { ++ lastListingTime = 0L; ++ } ++ ++ return; ++ } ++ } ++ ++ private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { ++ final Set<FileStatus> statusSet = new HashSet<>(); ++ ++ final FileStatus[] statuses = hdfs.listStatus(path); ++ ++ for ( final FileStatus status : statuses ) { ++ if ( status.isDirectory() ) { ++ if ( recursive ) { ++ try { ++ statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs)); ++ } catch (final IOException ioe) { ++ getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); ++ } ++ } ++ } else { ++ statusSet.add(status); ++ } ++ } ++ ++ return statusSet; ++ } ++ ++ ++ private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException { ++ // we need to keep track of all files that we pulled in that had a modification time equal to ++ // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files ++ // that have a mod time equal to that timestamp because more files may come in with the same timestamp ++ // later in the same millisecond. ++ if ( statuses.isEmpty() ) { ++ return null; ++ } else { ++ final List<FileStatus> sortedStatuses = new ArrayList<>(statuses); ++ Collections.sort(sortedStatuses, new Comparator<FileStatus>() { ++ @Override ++ public int compare(final FileStatus o1, final FileStatus o2) { ++ return Long.compare(o1.getModificationTime(), o2.getModificationTime()); ++ } ++ }); ++ ++ final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime(); ++ final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>(); ++ for (int i=sortedStatuses.size() - 1; i >= 0; i--) { ++ final FileStatus status = sortedStatuses.get(i); ++ if (status.getModificationTime() == latestListingModTime) { ++ pathsWithModTimeEqualToListingModTime.add(status.getPath()); ++ } ++ } ++ ++ this.latestPathsListed = pathsWithModTimeEqualToListingModTime; ++ ++ final HDFSListing listing = new HDFSListing(); ++ listing.setLatestTimestamp(new Date(latestListingModTime)); ++ final Set<String> paths = new HashSet<>(); ++ for ( final Path path : pathsWithModTimeEqualToListingModTime ) { ++ paths.add(path.toUri().toString()); ++ } ++ listing.setMatchingPaths(paths); ++ ++ final ObjectMapper mapper = new ObjectMapper(); ++ final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing); ++ return serializedState; ++ } ++ } ++ ++ private void persistLocalState(final String directory, final String serializedState) throws IOException { ++ // we need to keep track of all files that we pulled in that had a modification time equal to ++ // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files ++ // that have a mod time equal to that timestamp because more files may come in with the same timestamp ++ // later in the same millisecond. ++ final File dir = persistenceFile.getParentFile(); ++ if ( !dir.exists() && !dir.mkdirs() ) { ++ throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); ++ } ++ ++ final Properties props = new Properties(); ++ if ( persistenceFile.exists() ) { ++ try (final FileInputStream fis = new FileInputStream(persistenceFile)) { ++ props.load(fis); ++ } ++ } ++ ++ props.setProperty(directory, serializedState); ++ ++ try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { ++ props.store(fos, null); ++ } ++ } ++ ++ private String getAbsolutePath(final Path path) { ++ final Path parent = path.getParent(); ++ final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); ++ return prefix + "/" + path.getName(); ++ } ++ ++ private Map<String, String> createAttributes(final FileStatus status) { ++ final Map<String, String> attributes = new HashMap<>(); ++ attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName()); ++ attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent())); ++ ++ attributes.put("hdfs.owner", status.getOwner()); ++ attributes.put("hdfs.group", status.getGroup()); ++ attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime())); ++ attributes.put("hdfs.length", String.valueOf(status.getLen())); ++ attributes.put("hdfs.replication", String.valueOf(status.getReplication())); ++ ++ final FsPermission permission = status.getPermission(); ++ final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction()); ++ attributes.put("hdfs.permissions", perms); ++ return attributes; ++ } ++ ++ private String getPerms(final FsAction action) { ++ final StringBuilder sb = new StringBuilder(); ++ if ( action.implies(FsAction.READ) ) { ++ sb.append("r"); ++ } else { ++ sb.append("-"); ++ } ++ ++ if ( action.implies(FsAction.WRITE) ) { ++ sb.append("w"); ++ } else { ++ sb.append("-"); ++ } ++ ++ if ( action.implies(FsAction.EXECUTE) ) { ++ sb.append("x"); ++ } else { ++ sb.append("-"); ++ } ++ ++ return sb.toString(); ++ } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java index 9f4d68b,0000000..49957f5 mode 100644,000000..100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java @@@ -1,83 -1,0 +1,83 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.XmlType; + +import org.apache.hadoop.fs.Path; + +/** + * A simple POJO for maintaining state about the last HDFS Listing that was performed so that - * we can avoid pulling the same file multiple times ++ * we can avoid pulling the same file multiple times + */ +@XmlType(name = "listing") +public class HDFSListing { - private Date latestTimestamp; - private Collection<String> matchingPaths; - - /** - * @return the modification date of the newest file that was contained in the HDFS Listing - */ - public Date getLatestTimestamp() { - return latestTimestamp; - } - - /** - * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing - * - * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing - */ - public void setLatestTimestamp(Date latestTimestamp) { - this.latestTimestamp = latestTimestamp; - } - - /** - * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date - * was equal to {@link #getLatestTimestamp()} - */ - @XmlTransient - public Collection<String> getMatchingPaths() { - return matchingPaths; - } - - /** - * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()} - */ - public Set<Path> toPaths() { - final Set<Path> paths = new HashSet<>(matchingPaths.size()); - for ( final String pathname : matchingPaths ) { - paths.add(new Path(pathname)); - } - return paths; - } ++ private Date latestTimestamp; ++ private Collection<String> matchingPaths; + - /** - * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was - * equal to {@link #getLatestTimestamp()} - * @param matchingPaths - */ - public void setMatchingPaths(Collection<String> matchingPaths) { - this.matchingPaths = matchingPaths; - } ++ /** ++ * @return the modification date of the newest file that was contained in the HDFS Listing ++ */ ++ public Date getLatestTimestamp() { ++ return latestTimestamp; ++ } ++ ++ /** ++ * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing ++ * ++ * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing ++ */ ++ public void setLatestTimestamp(Date latestTimestamp) { ++ this.latestTimestamp = latestTimestamp; ++ } ++ ++ /** ++ * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date ++ * was equal to {@link #getLatestTimestamp()} ++ */ ++ @XmlTransient ++ public Collection<String> getMatchingPaths() { ++ return matchingPaths; ++ } ++ ++ /** ++ * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()} ++ */ ++ public Set<Path> toPaths() { ++ final Set<Path> paths = new HashSet<>(matchingPaths.size()); ++ for ( final String pathname : matchingPaths ) { ++ paths.add(new Path(pathname)); ++ } ++ return paths; ++ } ++ ++ /** ++ * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was ++ * equal to {@link #getLatestTimestamp()} ++ * @param matchingPaths the paths that have last modified date matching the latest timestamp ++ */ ++ public void setMatchingPaths(Collection<String> matchingPaths) { ++ this.matchingPaths = matchingPaths; ++ } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java index ef0e590,0000000..229f26c mode 100644,000000..100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java @@@ -1,48 -1,0 +1,48 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +public class LongSerDe implements Serializer<Long>, Deserializer<Long> { + - @Override - public Long deserialize(final byte[] input) throws DeserializationException, IOException { - if ( input == null || input.length == 0 ) { - return null; - } - - final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input)); - return dis.readLong(); - } ++ @Override ++ public Long deserialize(final byte[] input) throws DeserializationException, IOException { ++ if ( input == null || input.length == 0 ) { ++ return null; ++ } + - @Override - public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException { - final DataOutputStream dos = new DataOutputStream(out); - dos.writeLong(value); - } ++ final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input)); ++ return dis.readLong(); ++ } ++ ++ @Override ++ public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException { ++ final DataOutputStream dos = new DataOutputStream(out); ++ dos.writeLong(value); ++ } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java index 848831f,0000000..ca1c548 mode 100644,000000..100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java @@@ -1,44 -1,0 +1,44 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +public class StringSerDe implements Serializer<String>, Deserializer<String> { + - @Override - public String deserialize(final byte[] value) throws DeserializationException, IOException { - if ( value == null ) { - return null; - } - - return new String(value, StandardCharsets.UTF_8); - } ++ @Override ++ public String deserialize(final byte[] value) throws DeserializationException, IOException { ++ if ( value == null ) { ++ return null; ++ } + - @Override - public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { - out.write(value.getBytes(StandardCharsets.UTF_8)); - } ++ return new String(value, StandardCharsets.UTF_8); ++ } ++ ++ @Override ++ public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { ++ out.write(value.getBytes(StandardCharsets.UTF_8)); ++ } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index 975dc63,d816e8c..ea3bb63 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@@ -83,31 -86,15 +86,34 @@@ public interface DistributedMapCacheCli <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException; /** - * @param <K> type of key - * @param <V> type of value + * Adds the specified key and value to the cache, overwriting any value that is + * currently set. - * ++ * ++ * @param <K> the key type ++ * @param <V> the value type + * @param key The key to set + * @param value The value to associate with the given Key + * @param keySerializer the Serializer that will be used to serialize the key into bytes + * @param valueSerializer the Serializer that will be used to serialize the value into bytes - * ++ * + * @throws IOException if unable to communicate with the remote instance + * @throws NullPointerException if the key or either serializer is null + */ + <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException; - ++ + /** + * Returns the value in the cache for the given key, if one exists; + * otherwise returns <code>null</code> + * - * @param <K> - * @param <V> ++ * @param <K> the key type ++ * @param <V> the value type * @param key the key to lookup in the map - * @param keySerializer - * @param valueDeserializer + * @param keySerializer key serializer + * @param valueDeserializer value serializer * - * @return - * @throws IOException + * @return the value in the cache for the given key, if one exists; + * otherwise returns <code>null</code> + * @throws IOException ex */ <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java index 8903046,fad0adb..e9c6f1d --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@@ -22,9 -22,12 +22,14 @@@ import java.nio.ByteBuffer public interface MapCache { MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException; + + MapPutResult put(ByteBuffer key, ByteBuffer value) throws IOException; ++ boolean containsKey(ByteBuffer key) throws IOException; + ByteBuffer get(ByteBuffer key) throws IOException; + ByteBuffer remove(ByteBuffer key) throws IOException; + void shutdown() throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index cf8996c,943d6aa..13ed0df --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@@ -55,70 -55,63 +55,70 @@@ public class MapCacheServer extends Abs final String action = dis.readUTF(); try { switch (action) { - case "close": { - return false; - } - case "putIfAbsent": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - dos.writeBoolean(putResult.isSuccessful()); - break; - } - case "containsKey": { - final byte[] key = readValue(dis); - final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); - dos.writeBoolean(contains); - break; - } - case "getAndPutIfAbsent": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - - final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - if (putResult.isSuccessful()) { - // Put was successful. There was no old value to get. - dos.writeInt(0); - } else { - // we didn't put. Write back the previous value - final byte[] byteArray = putResult.getExistingValue().array(); - dos.writeInt(byteArray.length); - dos.write(byteArray); - } - - break; - } - case "get": { - final byte[] key = readValue(dis); - final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); - if (existingValue == null) { - // there was no existing value; we did a "put". - dos.writeInt(0); - } else { - // a value already existed. we did not update the map - final byte[] byteArray = existingValue.array(); - dos.writeInt(byteArray.length); - dos.write(byteArray); - } - - break; - } - case "remove": { - final byte[] key = readValue(dis); - final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; - dos.writeBoolean(removed); - break; + case "close": { + return false; + } + case "putIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + dos.writeBoolean(putResult.isSuccessful()); + break; + } + case "put": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); ++ final byte[] key = readValue(dis); ++ final byte[] value = readValue(dis); ++ cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + dos.writeBoolean(true); - break; ++ break; + } + case "containsKey": { + final byte[] key = readValue(dis); + final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); + dos.writeBoolean(contains); + break; + } + case "getAndPutIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + if (putResult.isSuccessful()) { + // Put was successful. There was no old value to get. + dos.writeInt(0); + } else { + // we didn't put. Write back the previous value + final byte[] byteArray = putResult.getExistingValue().array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); } - default: { - throw new IOException("Illegal Request"); + + break; + } + case "get": { + final byte[] key = readValue(dis); + final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); + if (existingValue == null) { + // there was no existing value; we did a "put". + dos.writeInt(0); + } else { + // a value already existed. we did not update the map + final byte[] byteArray = existingValue.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); } + + break; + } + case "remove": { + final byte[] key = readValue(dis); + final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; + dos.writeBoolean(removed); + break; + } + default: { + throw new IOException("Illegal Request"); + } } } finally { dos.flush(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 82b1787,e821fbf..663f441 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@@ -75,33 -75,9 +75,33 @@@ public class PersistentMapCache impleme wali.checkpoint(); } } - + return putResult; } + + @Override + public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException { + final MapPutResult putResult = wrapped.put(key, value); + if ( putResult.isSuccessful() ) { + // The put was successful. + final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); + final List<MapWaliRecord> records = new ArrayList<>(); + records.add(record); + + if ( putResult.getEvictedKey() != null ) { + records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); + } + + wali.update(Collections.singletonList(record), false); + + final long modCount = modifications.getAndIncrement(); + if ( modCount > 0 && modCount % 100000 == 0 ) { + wali.checkpoint(); + } + } + + return putResult; + } @Override public boolean containsKey(final ByteBuffer key) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index d8f9c45,9e8bbd1..b167c62 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@@ -105,29 -106,7 +106,29 @@@ public class SimpleMapCache implements writeLock.unlock(); } } - + + + @Override + public MapPutResult put(final ByteBuffer key, final ByteBuffer value) { + writeLock.lock(); + try { - // evict if we need to in order to make room for a new entry. ++ // evict if we need to in order to make room for a new entry. + final MapCacheRecord evicted = evict(); + + final MapCacheRecord record = new MapCacheRecord(key, value); - final MapCacheRecord existing = cache.put(key, record); - inverseCacheMap.put(record, key); - - final ByteBuffer existingValue = (existing == null) ? null : existing.getValue(); - final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey(); - final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue(); - - return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue); ++ final MapCacheRecord existing = cache.put(key, record); ++ inverseCacheMap.put(record, key); ++ ++ final ByteBuffer existingValue = (existing == null) ? null : existing.getValue(); ++ final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey(); ++ final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue(); ++ ++ return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue); + } finally { + writeLock.unlock(); + } + } - ++ @Override public boolean containsKey(final ByteBuffer key) { readLock.lock();
