NIFI-533: Refactored for Unit Tests and added unit tests for ListHDFS
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e4f43156 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e4f43156 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e4f43156 Branch: refs/heads/develop Commit: e4f431561e6af7a603a5c8b7a82910f28dc6d600 Parents: dc7f7a8 Author: Mark Payne <[email protected]> Authored: Tue Apr 28 08:46:38 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Tue Apr 28 08:46:38 2015 -0400 ---------------------------------------------------------------------- .../hadoop/AbstractHadoopProcessor.java | 24 +- .../hadoop/CreateHadoopSequenceFile.java | 2 +- .../nifi/processors/hadoop/FetchHDFS.java | 2 +- .../apache/nifi/processors/hadoop/GetHDFS.java | 10 +- .../processors/hadoop/GetHDFSSequenceFile.java | 12 +- .../apache/nifi/processors/hadoop/ListHDFS.java | 42 ++- .../apache/nifi/processors/hadoop/PutHDFS.java | 21 +- .../nifi/processors/hadoop/TestListHDFS.java | 347 +++++++++++++++++++ 8 files changed, 416 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 91c21a0..355950f 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -70,7 +70,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { // variables shared by all threads of this processor // Hadoop Configuration and FileSystem - protected final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>(); + private final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>(); @Override protected void init(ProcessorInitializationContext context) { @@ -153,7 +153,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); config.set(disableCacheName, "true"); - final FileSystem fs = FileSystem.get(config); + final FileSystem fs = getFileSystem(config); getLogger().info( "Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), @@ -165,6 +165,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } } + /** + * This exists in order to allow unit tests to override it so that they don't take several minutes waiting + * for UDP packets to be received + * + * @param config the configuration to use + * @return the FileSystem that is created for the given Configuration + * @throws IOException if unable to create the FileSystem + */ + protected FileSystem getFileSystem(final Configuration config) throws IOException { + return FileSystem.get(config); + } + /* * Drastically reduce the timeout of a socket connection from the default in FileSystem.get() */ @@ -243,4 +255,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } return builder.toString(); } + + protected Configuration getConfiguration() { + return hdfsResources.get().getKey(); + } + + protected FileSystem getFileSystem() { + return hdfsResources.get().getValue(); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index f462277..186a290 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -156,7 +156,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf"; flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName); try { - flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType); + flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType); session.transfer(flowFile, RELATIONSHIP_SUCCESS); getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS}); } catch (ProcessException e) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index b5efce0..4a52fb7 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -99,7 +99,7 @@ public class FetchHDFS extends AbstractHadoopProcessor { return; } - final FileSystem hdfs = hdfsResources.get().getValue(); + final FileSystem hdfs = getFileSystem(); final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue()); final URI uri = path.toUri(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index f7894d9..7aa534f 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -236,8 +236,8 @@ public class GetHDFS extends AbstractHadoopProcessor { abstractOnScheduled(context); // copy configuration values to pass them around cleanly processorConfig = new ProcessorConfiguration(context); - FileSystem fs = hdfsResources.get().getValue(); - Path dir = new Path(context.getProperty(DIRECTORY).getValue()); + final FileSystem fs = getFileSystem(); + final Path dir = new Path(context.getProperty(DIRECTORY).getValue()); if (!fs.exists(dir)) { throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist."); } @@ -330,8 +330,8 @@ public class GetHDFS extends AbstractHadoopProcessor { protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) { // process the batch of files FSDataInputStream stream = null; - Configuration conf = hdfsResources.get().getKey(); - FileSystem hdfs = hdfsResources.get().getValue(); + Configuration conf = getConfiguration(); + FileSystem hdfs = getFileSystem(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, @@ -398,7 +398,7 @@ public class GetHDFS extends AbstractHadoopProcessor { if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) { try { - final FileSystem hdfs = hdfsResources.get().getValue(); + final FileSystem hdfs = getFileSystem(); // get listing listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null); lastPollTime.set(System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java index 22ba36b..f032ee4 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java @@ -22,6 +22,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -34,10 +37,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.util.StopWatch; -import org.apache.nifi.util.Tuple; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; /** * This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested @@ -80,9 +79,8 @@ public class GetHDFSSequenceFile extends GetHDFS { @Override protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) { - final Tuple<Configuration, FileSystem> hadoopResources = hdfsResources.get(); - final Configuration conf = hadoopResources.getKey(); - final FileSystem hdfs = hadoopResources.getValue(); + final Configuration conf = getConfiguration(); + final FileSystem hdfs = getFileSystem(); final String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 56a128a..151cbf2 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -118,12 +118,14 @@ public class ListHDFS extends AbstractHadoopProcessor { private volatile Long lastListingTime = null; private volatile Set<Path> latestPathsListed = new HashSet<>(); private volatile boolean electedPrimaryNode = false; - private File persistenceFile = null; @Override protected void init(final ProcessorInitializationContext context) { super.init(context); - persistenceFile = new File("conf/state/" + getIdentifier()); + } + + protected File getPersistenceFile() { + return new File("conf/state/" + getIdentifier()); } @Override @@ -143,7 +145,7 @@ public class ListHDFS extends AbstractHadoopProcessor { return relationships; } - private String getKey(final String directory) { + protected String getKey(final String directory) { return getIdentifier() + ".lastListingTime." + directory; } @@ -169,18 +171,13 @@ public class ListHDFS extends AbstractHadoopProcessor { } - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final String directory = context.getProperty(DIRECTORY).getValue(); - + private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException { // Determine the timestamp for the last file that we've listed. Long minTimestamp = lastListingTime; if ( minTimestamp == null || electedPrimaryNode ) { // We haven't yet restored any state from local or distributed state - or it's been at least a minute since // we have performed a listing. In this case, // First, attempt to get timestamp from distributed cache service. - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - try { final StringSerDe serde = new StringSerDe(); final String serializedState = client.get(getKey(directory), serde, serde); @@ -197,14 +194,13 @@ public class ListHDFS extends AbstractHadoopProcessor { this.lastListingTime = minTimestamp; electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. } catch (final IOException ioe) { - getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); - context.yield(); - return; + throw ioe; } // Check the persistence file. We want to use the latest timestamp that we have so that // we don't duplicate data. try { + final File persistenceFile = getPersistenceFile(); if ( persistenceFile.exists() ) { try (final FileInputStream fis = new FileInputStream(persistenceFile)) { final Properties props = new Properties(); @@ -240,9 +236,25 @@ public class ListHDFS extends AbstractHadoopProcessor { } } + return minTimestamp; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String directory = context.getProperty(DIRECTORY).getValue(); + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + + final Long minTimestamp; + try { + minTimestamp = getMinTimestamp(directory, client); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); + context.yield(); + return; + } // Pull in any file that is newer than the timestamp that we have. - final FileSystem hdfs = hdfsResources.get().getValue(); + final FileSystem hdfs = getFileSystem(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); final Path rootPath = new Path(directory); @@ -311,7 +323,6 @@ public class ListHDFS extends AbstractHadoopProcessor { } // Attempt to save state to remote server. - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); try { client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe()); } catch (final IOException ioe) { @@ -397,11 +408,12 @@ public class ListHDFS extends AbstractHadoopProcessor { } } - private void persistLocalState(final String directory, final String serializedState) throws IOException { + protected void persistLocalState(final String directory, final String serializedState) throws IOException { // we need to keep track of all files that we pulled in that had a modification time equal to // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files // that have a mod time equal to that timestamp because more files may come in with the same timestamp // later in the same millisecond. + final File persistenceFile = getPersistenceFile(); final File dir = persistenceFile.getParentFile(); if ( !dir.exists() && !dir.mkdirs() ) { throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 057f786..52cf475 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -32,10 +32,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ipc.RemoteException; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -54,7 +54,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; -import org.apache.nifi.util.Tuple; /** * This processor copies FlowFiles to HDFS. @@ -183,8 +182,7 @@ public class PutHDFS extends AbstractHadoopProcessor { } else { dfsUmask = FsPermission.DEFAULT_UMASK; } - final Tuple<Configuration, FileSystem> resources = hdfsResources.get(); - final Configuration conf = resources.getKey(); + final Configuration conf = getConfiguration(); FsPermission.setUMask(conf, new FsPermission(dfsUmask)); } @@ -195,26 +193,23 @@ public class PutHDFS extends AbstractHadoopProcessor { return; } - final Tuple<Configuration, FileSystem> resources = hdfsResources.get(); - if (resources == null || resources.getKey() == null || resources.getValue() == null) { + final Configuration configuration = getConfiguration(); + final FileSystem hdfs = getFileSystem(); + if (configuration == null || hdfs == null) { getLogger().error("HDFS not configured properly"); session.transfer(flowFile, REL_FAILURE); context.yield(); return; } - final Configuration conf = resources.getKey(); - final FileSystem hdfs = resources.getValue(); - final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile) - .getValue()); + final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue()); final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath); final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); - final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, - BUFFER_SIZE_DEFAULT); + final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs @@ -230,7 +225,7 @@ public class PutHDFS extends AbstractHadoopProcessor { // Create destination directory if it does not exist try { - if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) { + if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) { throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory"); } } catch (FileNotFoundException fe) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java new file mode 100644 index 0000000..499fd51 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestListHDFS { + + private TestRunner runner; + private ListHDFSWithMockedFileSystem proc; + private MockCacheClient service; + + @Before + public void setup() throws InitializationException { + proc = new ListHDFSWithMockedFileSystem(); + runner = TestRunners.newTestRunner(proc); + + service = new MockCacheClient(); + runner.addControllerService("service", service); + runner.enableControllerService(service); + + runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml"); + runner.setProperty(ListHDFS.DIRECTORY, "/test"); + runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service"); + } + + @Test + public void testListingHasCorrectAttributes() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff.assertAttributeEquals("path", "/test"); + mff.assertAttributeEquals("filename", "testFile.txt"); + } + + + @Test + public void testRecursive() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + + final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff1.assertAttributeEquals("path", "/test"); + mff1.assertAttributeEquals("filename", "testFile.txt"); + + final MockFlowFile mff2 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(1); + mff2.assertAttributeEquals("path", "/test/testDir"); + mff2.assertAttributeEquals("filename", "1.txt"); + } + + @Test + public void testNotRecursive() { + runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false"); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff1.assertAttributeEquals("path", "/test"); + mff1.assertAttributeEquals("filename", "testFile.txt"); + } + + + @Test + public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff1.assertAttributeEquals("path", "/test"); + mff1.assertAttributeEquals("filename", "testFile.txt"); + + runner.clearTransferState(); + + // add new file to pull + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); + + // trigger primary node change + proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); + + // cause calls to service to fail + service.failOnCalls = true; + + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + final String key = proc.getKey("/test"); + + // wait just to a bit to ensure that the timestamp changes when we update the service + final Object curVal = service.values.get(key); + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + } + + service.failOnCalls = false; + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + // ensure state saved both locally & remotely + assertTrue(proc.localStateSaved); + assertNotNull(service.values.get(key)); + assertNotSame(curVal, service.values.get(key)); + } + + + private FsPermission create777() { + return new FsPermission((short) 0777); + } + + + private class ListHDFSWithMockedFileSystem extends ListHDFS { + private final MockFileSystem fileSystem = new MockFileSystem(); + private boolean localStateSaved = false; + + @Override + protected FileSystem getFileSystem() { + return fileSystem; + } + + @Override + protected File getPersistenceFile() { + return new File("target/conf/state-file"); + } + + @Override + protected FileSystem getFileSystem(Configuration config) throws IOException { + return fileSystem; + } + + @Override + protected void persistLocalState(String directory, String serializedState) throws IOException { + super.persistLocalState(directory, serializedState); + localStateSaved = true; + } + } + + + private class MockFileSystem extends FileSystem { + private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>(); + + public void addFileStatus(final Path parent, final FileStatus child) { + Set<FileStatus> children = fileStatuses.get(parent); + if ( children == null ) { + children = new HashSet<>(); + fileStatuses.put(parent, children); + } + + children.add(child); + } + + + @Override + public long getDefaultBlockSize() { + return 1024L; + } + + @Override + public short getDefaultReplication() { + return 1; + } + + @Override + public URI getUri() { + return null; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + return null; + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + return null; + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return false; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return false; + } + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + final Set<FileStatus> statuses = fileStatuses.get(f); + if ( statuses == null ) { + return new FileStatus[0]; + } + + return statuses.toArray(new FileStatus[statuses.size()]); + } + + @Override + public void setWorkingDirectory(Path new_dir) { + + } + + @Override + public Path getWorkingDirectory() { + return new Path(new File(".").getAbsolutePath()); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return null; + } + + } + + + private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { + private ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>(); + private boolean failOnCalls = false; + + private void verifyNotFail() throws IOException { + if ( failOnCalls ) { + throw new IOException("Could not call to remote service because Unit Test marked service unavailable"); + } + } + + @Override + public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + verifyNotFail(); + final Object retValue = values.putIfAbsent(key, value); + return (retValue == null); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException { + verifyNotFail(); + return (V) values.putIfAbsent(key, value); + } + + @Override + public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException { + verifyNotFail(); + return values.containsKey(key); + } + + @Override + public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + verifyNotFail(); + values.put(key, value); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { + verifyNotFail(); + return (V) values.get(key); + } + + @Override + public void close() throws IOException { + } + + @Override + public <K> boolean remove(K key, Serializer<K> serializer) throws IOException { + verifyNotFail(); + values.remove(key); + return true; + } + } +}
