http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java
index a86a416..d7bad78 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import javax.xml.stream.FactoryConfigurationError;
 import javax.xml.stream.XMLOutputFactory;
@@ -36,6 +37,7 @@ import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.documentation.DocumentationWriter;
+import org.apache.nifi.nar.ExtensionManager;
 
 /**
  * Generates HTML documentation for a ConfigurableComponent. This class is used
@@ -148,9 +150,7 @@ public class HtmlDocumentationWriter implements 
DocumentationWriter {
                     xmlStreamWriter.writeCharacters(", ");
                 }
 
-                final String link = "../" + linkedComponent.getCanonicalName() 
+ "/index.html";
-
-                writeLink(xmlStreamWriter, linkedComponent.getSimpleName(), 
link);
+                writeLinkForComponent(xmlStreamWriter, linkedComponent);
 
                 ++index;
             }
@@ -434,12 +434,24 @@ public class HtmlDocumentationWriter implements 
DocumentationWriter {
             }
             xmlStreamWriter.writeEndElement();
         } else if (property.getControllerServiceDefinition() != null) {
-            Class<? extends ControllerService> controllerServiceClass = 
property
-                    .getControllerServiceDefinition();
+            Class<? extends ControllerService> controllerServiceClass = 
property.getControllerServiceDefinition();
 
-            writeSimpleElement(xmlStreamWriter, "strong", "Controller Service: 
");
+            writeSimpleElement(xmlStreamWriter, "strong", "Controller Service 
API: ");
             xmlStreamWriter.writeEmptyElement("br");
             
xmlStreamWriter.writeCharacters(controllerServiceClass.getSimpleName());
+
+            final List<Class<? extends ControllerService>> implementations = 
lookupControllerServiceImpls(controllerServiceClass);
+            xmlStreamWriter.writeEmptyElement("br");
+            if (implementations.size() > 0) {
+                final String title = implementations.size() > 1 ? 
"Implementations: " : "Implementation:";
+                writeSimpleElement(xmlStreamWriter, "strong", title);
+                for (int i = 0; i < implementations.size(); i++) {
+                    xmlStreamWriter.writeEmptyElement("br");
+                    writeLinkForComponent(xmlStreamWriter, 
implementations.get(i));
+                }
+            } else {
+                xmlStreamWriter.writeCharacters("No implementations found.");
+            }
         }
     }
 
@@ -519,4 +531,41 @@ public class HtmlDocumentationWriter implements 
DocumentationWriter {
         xmlStreamWriter.writeCharacters(text);
         xmlStreamWriter.writeEndElement();
     }
+
+    /**
+     * Writes a link to another configurable component
+     *
+     * @param xmlStreamWriter the xml stream writer
+     * @param clazz the configurable component to link to
+     * @throws XMLStreamException thrown if there is a problem writing the XML
+     */
+    protected void writeLinkForComponent(final XMLStreamWriter 
xmlStreamWriter, final Class<?> clazz) throws XMLStreamException {
+        writeLink(xmlStreamWriter, clazz.getSimpleName(), "../" + 
clazz.getCanonicalName() + "/index.html");
+    }
+
+    /**
+     * Uses the {@link ExtensionManager} to discover any {@link 
ControllerService} implementations that implement a specific
+     * ControllerService API.
+     *
+     * @param parent the controller service API
+     * @return a list of controller services that implement the controller 
service API
+     */
+    private List<Class<? extends ControllerService>> 
lookupControllerServiceImpls(
+            final Class<? extends ControllerService> parent) {
+
+        final List<Class<? extends ControllerService>> implementations = new 
ArrayList<>();
+
+        // first get all ControllerService implementations
+        final Set<Class> controllerServices = 
ExtensionManager.getExtensions(ControllerService.class);
+
+        // then iterate over all controller services looking for any that is a 
child of the parent
+        // ControllerService API that was passed in as a parameter
+        for (final Class<? extends ControllerService> controllerServiceClass : 
controllerServices) {
+            if (parent.isAssignableFrom(controllerServiceClass)) {
+                implementations.add(controllerServiceClass);
+            }
+        }
+
+        return implementations;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java
index 4320c6e..cd68267 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java
@@ -26,9 +26,9 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.processor.util.StandardValidators;
 
-@CapabilityDescription("A documented controller service that can help you do 
things")
+@CapabilityDescription("A documented controller service that can help you do 
things")
 @Tags({"one", "two", "three"})
-public class FullyDocumentedControllerService extends 
AbstractControllerService {
+public class FullyDocumentedControllerService extends 
AbstractControllerService implements SampleService{
 
     public static final PropertyDescriptor KEYSTORE = new 
PropertyDescriptor.Builder().name("Keystore Filename")
             .description("The fully-qualified filename of the 
Keystore").defaultValue(null)
@@ -53,6 +53,10 @@ public class FullyDocumentedControllerService extends 
AbstractControllerService
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;
-    }
+    }
 
+    @Override
+    public void doSomething() {
+        // TODO Auto-generated method stub
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java
index c6ed9fb..6016f95 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java
@@ -30,7 +30,7 @@ import 
org.apache.nifi.documentation.mock.MockProcessorInitializationContext;
 import org.junit.Test;
 
 public class ProcessorDocumentationWriterTest {
-
+
     @Test
     public void testFullyDocumentedProcessor() throws IOException {
         FullyDocumentedProcessor processor = new FullyDocumentedProcessor();
@@ -59,7 +59,7 @@ public class ProcessorDocumentationWriterTest {
         assertContains(results, 
FullyDocumentedProcessor.REL_SUCCESS.getDescription());
         assertContains(results, 
FullyDocumentedProcessor.REL_FAILURE.getName());
         assertContains(results, 
FullyDocumentedProcessor.REL_FAILURE.getDescription());
-        assertContains(results, "Controller Service: ");
+        assertContains(results, "Controller Service API: ");
         assertContains(results, "SampleService");
 
         assertNotContains(results, "iconSecure.png");
@@ -97,6 +97,6 @@ public class ProcessorDocumentationWriterTest {
         // relationships
         assertContains(results, "This processor has no relationships.");
 
-    }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
index 5859e1b..8ba33c5 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
@@ -101,7 +101,7 @@ public class FileBasedClusterNodeFirewall implements 
ClusterNodeFirewall {
             try {
                 ip = InetAddress.getByName(hostOrIp).getHostAddress();
             } catch (final UnknownHostException uhe) {
-                logger.warn("Blocking unknown host: " + hostOrIp, uhe);
+                logger.warn("Blocking unknown host '{}'", hostOrIp, uhe);
                 return false;
             }
 
@@ -113,9 +113,11 @@ public class FileBasedClusterNodeFirewall implements 
ClusterNodeFirewall {
             }
 
             // no match
+            logger.debug("Blocking host '{}' because it does not match our 
allowed list.", hostOrIp);
             return false;
 
         } catch (final IllegalArgumentException iae) {
+            logger.debug("Blocking requested host, '{}', because it is 
malformed.", hostOrIp, iae);
             return false;
         }
     }
@@ -166,30 +168,30 @@ public class FileBasedClusterNodeFirewall implements 
ClusterNodeFirewall {
                 if (ipOrHostLine.contains("/")) {
                     ipCidr = ipOrHostLine;
                 } else if (ipOrHostLine.contains("\\")) {
-                    logger.warn("CIDR IP notation uses forward slashes '/'.  
Replacing backslash '\\' with forward slash'/' for '" + ipOrHostLine + "'");
+                    logger.warn("CIDR IP notation uses forward slashes '/'.  
Replacing backslash '\\' with forward slash'/' for '{}'", ipOrHostLine);
                     ipCidr = ipOrHostLine.replace("\\", "/");
                 } else {
                     try {
                         ipCidr = 
InetAddress.getByName(ipOrHostLine).getHostAddress();
                         if (!ipOrHostLine.equals(ipCidr)) {
-                            logger.debug(String.format("Resolved host '%s' to 
ip '%s'", ipOrHostLine, ipCidr));
+                            logger.debug("Resolved host '{}' to ip '{}'", 
ipOrHostLine, ipCidr);
                         }
                         ipCidr += "/32";
-                        logger.debug("Adding CIDR to exact IP: " + ipCidr);
+                        logger.debug("Adding CIDR to exact IP: '{}'", ipCidr);
                     } catch (final UnknownHostException uhe) {
-                        logger.warn("Firewall is skipping unknown host 
address: " + ipOrHostLine);
+                        logger.warn("Firewall is skipping unknown host 
address: '{}'", ipOrHostLine);
                         continue;
                     }
                 }
 
                 try {
-                    logger.debug("Adding CIDR IP to firewall: " + ipCidr);
+                    logger.debug("Adding CIDR IP to firewall: '{}'", ipCidr);
                     final SubnetUtils subnetUtils = new SubnetUtils(ipCidr);
                     subnetUtils.setInclusiveHostCount(true);
                     subnetInfos.add(subnetUtils.getInfo());
                     totalIpsAdded++;
                 } catch (final IllegalArgumentException iae) {
-                    logger.warn("Firewall is skipping invalid CIDR address: " 
+ ipOrHostLine);
+                    logger.warn("Firewall is skipping invalid CIDR address: 
'{}'", ipOrHostLine);
                 }
 
             }
@@ -197,7 +199,7 @@ public class FileBasedClusterNodeFirewall implements 
ClusterNodeFirewall {
             if (totalIpsAdded == 0) {
                 logger.info("No IPs added to firewall.  Firewall will accept 
all requests.");
             } else {
-                logger.info(String.format("Added %d IP(s) to firewall.  Only 
requests originating from the configured IPs will be accepted.", 
totalIpsAdded));
+                logger.info("Added {} IP(s) to firewall.  Only requests 
originating from the configured IPs will be accepted.", totalIpsAdded);
             }
 
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
index 441a3b2..5259f4f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java
@@ -17,14 +17,17 @@
 package org.apache.nifi.cluster.firewall.impl;
 
 import java.io.File;
-import java.io.IOException;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.After;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class FileBasedClusterNodeFirewallTest {
 
@@ -38,21 +41,52 @@ public class FileBasedClusterNodeFirewallTest {
 
     private File restoreDirectory;
 
+    @Rule
+    public final TemporaryFolder temp = new TemporaryFolder();
+
+    private static boolean badHostsDoNotResolve = false;
+
+    /**
+     * We have tests that rely on known bad host/ip parameters; make sure DNS 
doesn't resolve them.
+     * This can be a problem i.e. on residential ISPs in the USA because the 
provider will often
+     * wildcard match all possible DNS names in an attempt to serve 
advertising.
+     */
+    @BeforeClass
+    public static void ensureBadHostsDoNotWork() {
+        final InetAddress ip;
+        try {
+            ip = InetAddress.getByName("I typed a search term and my browser 
expected a host.");
+        } catch (final UnknownHostException uhe) {
+            badHostsDoNotResolve = true;
+        }
+    }
+
     @Before
     public void setup() throws Exception {
 
-        ipsConfig = new 
File("src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt");
-        emptyConfig = new 
File("src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt");
+        ipsConfig = new 
File(getClass().getResource("/org/apache/nifi/cluster/firewall/impl/ips.txt").toURI());
+        emptyConfig = new 
File(getClass().getResource("/org/apache/nifi/cluster/firewall/impl/empty.txt").toURI());
 
-        restoreDirectory = new File(System.getProperty("java.io.tmpdir") + 
"/firewall_restore");
+        restoreDirectory = temp.newFolder("firewall_restore");
 
         ipsFirewall = new FileBasedClusterNodeFirewall(ipsConfig, 
restoreDirectory);
         acceptAllFirewall = new FileBasedClusterNodeFirewall(emptyConfig);
     }
 
-    @After
-    public void teardown() throws IOException {
-        deleteFile(restoreDirectory);
+    /**
+     * We have two garbage lines in our test config file, ensure they didn't 
get turned into hosts.
+     */
+    @Test
+    public void ensureBadDataWasIgnored() {
+        assumeTrue(badHostsDoNotResolve);
+        assertFalse("firewall treated our malformed data as a host. If " +
+            "`host \"bad data should be skipped\"` works locally, this test 
should have been " +
+            "skipped.",
+            ipsFirewall.isPermissible("bad data should be skipped"));
+        assertFalse("firewall treated our malformed data as a host. If " +
+            "`host \"more bad data\"` works locally, this test should have 
been " +
+            "skipped.",
+            ipsFirewall.isPermissible("more bad data"));
     }
 
     @Test
@@ -77,7 +111,10 @@ public class FileBasedClusterNodeFirewallTest {
 
     @Test
     public void testIsPermissibleWithMalformedData() {
-        assertFalse(ipsFirewall.isPermissible("abc"));
+        assumeTrue(badHostsDoNotResolve);
+        assertFalse("firewall allowed host 'abc' rather than rejecting as 
malformed. If `host abc` "
+            + "works locally, this test should have been skipped.",
+            ipsFirewall.isPermissible("abc"));
     }
 
     @Test
@@ -87,14 +124,10 @@ public class FileBasedClusterNodeFirewallTest {
 
     @Test
     public void testIsPermissibleWithEmptyConfigWithMalformedData() {
-        assertTrue(acceptAllFirewall.isPermissible("abc"));
-    }
-
-    private boolean deleteFile(final File file) {
-        if (file.isDirectory()) {
-            FileUtils.deleteFilesInDir(file, null, null, true, true);
-        }
-        return FileUtils.deleteFile(file, null, 10);
+        assumeTrue(badHostsDoNotResolve);
+        assertTrue("firewall did not allow malformed host 'abc' under 
permissive configs. If " +
+            "`host abc` works locally, this test should have been skipped.",
+            acceptAllFirewall.isPermissible("abc"));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
index 1a3fdb6..9e4237f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
@@ -18,6 +18,7 @@ package org.apache.nifi.cluster.protocol.impl;
 
 import java.io.IOException;
 import java.net.InetAddress;
+
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;
@@ -31,13 +32,17 @@ import 
org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.io.socket.ServerSocketConfiguration;
 import org.apache.nifi.io.socket.SocketConfiguration;
 import org.junit.After;
+
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+
 import org.junit.Before;
 import org.junit.Test;
+
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -57,22 +62,27 @@ public class ClusterManagerProtocolSenderImplTest {
     private ProtocolHandler mockHandler;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() throws IOException, InterruptedException {
 
         address = InetAddress.getLocalHost();
-        ServerSocketConfiguration serverSocketConfiguration = new 
ServerSocketConfiguration();
+        final ServerSocketConfiguration serverSocketConfiguration = new 
ServerSocketConfiguration();
 
         mockHandler = mock(ProtocolHandler.class);
 
-        ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        final ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
 
         listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
         listener.addHandler(mockHandler);
         listener.start();
 
+        // Need to be sure that we give the listener plenty of time to 
startup. Otherwise, we get intermittent
+        // test failures because the Thread started by listener.start() isn't 
ready to accept connections
+        // before we make them.
+        Thread.sleep(1000L);
+
         port = listener.getPort();
 
-        SocketConfiguration socketConfiguration = new SocketConfiguration();
+        final SocketConfiguration socketConfiguration = new 
SocketConfiguration();
         sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, 
protocolContext);
     }
 
@@ -85,12 +95,11 @@ public class ClusterManagerProtocolSenderImplTest {
 
     @Test
     public void testRequestFlow() throws Exception {
-
         
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
         when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
FlowResponseMessage());
-        FlowRequestMessage request = new FlowRequestMessage();
-        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
-        FlowResponseMessage response = sender.requestFlow(request);
+        final FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
"localhost", port));
+        final FlowResponseMessage response = sender.requestFlow(request);
         assertNotNull(response);
     }
 
@@ -99,12 +108,12 @@ public class ClusterManagerProtocolSenderImplTest {
 
         
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
         when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
-        FlowRequestMessage request = new FlowRequestMessage();
-        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        final FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
"localhost", port));
         try {
             sender.requestFlow(request);
             fail("failed to throw exception");
-        } catch (ProtocolException pe) {
+        } catch (final ProtocolException pe) {
         }
 
     }
@@ -118,17 +127,17 @@ public class ClusterManagerProtocolSenderImplTest {
         
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
         when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<FlowResponseMessage>() {
             @Override
-            public FlowResponseMessage answer(InvocationOnMock invocation) 
throws Throwable {
+            public FlowResponseMessage answer(final InvocationOnMock 
invocation) throws Throwable {
                 Thread.sleep(time * 3);
                 return new FlowResponseMessage();
             }
         });
-        FlowRequestMessage request = new FlowRequestMessage();
-        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        final FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
"localhost", port));
         try {
             sender.requestFlow(request);
             fail("failed to throw exception");
-        } catch (ProtocolException pe) {
+        } catch (final ProtocolException pe) {
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index e241112..6c655cb 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -53,6 +53,8 @@ import org.apache.nifi.admin.service.UserService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -74,8 +76,8 @@ import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.connectable.StandardConnection;
 import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
@@ -294,7 +296,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      */
     private final AtomicReference<HeartbeatMessageGeneratorTask> 
heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
 
-    private AtomicReference<NodeBulletinProcessingStrategy> 
nodeBulletinSubscriber;
+    private final AtomicReference<NodeBulletinProcessingStrategy> 
nodeBulletinSubscriber;
 
     // guarded by rwLock
     /**
@@ -447,7 +449,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         this.protocolSender = protocolSender;
         try {
             this.templateManager = new 
TemplateManager(properties.getTemplateDirectory());
-        } catch (IOException e) {
+        } catch (final IOException e) {
             throw new RuntimeException(e);
         }
 
@@ -792,7 +794,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws NullPointerException if either arg is null
      * @throws ProcessorInstantiationException if the processor cannot be 
instantiated for any reason
      */
-    public ProcessorNode createProcessor(final String type, String id) throws 
ProcessorInstantiationException {
+    public ProcessorNode createProcessor(final String type, final String id) 
throws ProcessorInstantiationException {
         return createProcessor(type, id, true);
     }
 
@@ -1506,7 +1508,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 }
 
                 if (config.getProperties() != null) {
-                    for (Map.Entry<String, String> entry : 
config.getProperties().entrySet()) {
+                    for (final Map.Entry<String, String> entry : 
config.getProperties().entrySet()) {
                         if (entry.getValue() != null) {
                             procNode.setProperty(entry.getKey(), 
entry.getValue());
                         }
@@ -1659,7 +1661,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
         if (ports != null) {
             remotePorts = new LinkedHashSet<>(ports.size());
-            for (RemoteProcessGroupPortDTO port : ports) {
+            for (final RemoteProcessGroupPortDTO port : ports) {
                 final StandardRemoteProcessGroupPortDescriptor descriptor = 
new StandardRemoteProcessGroupPortDescriptor();
                 descriptor.setId(port.getId());
                 descriptor.setName(port.getName());
@@ -3019,6 +3021,18 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
             LOG.info("Setting primary flag from '" + this.primary + "' to '" + 
primary + "'");
 
+            final PrimaryNodeState nodeState = primary ? 
PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
+            final ProcessGroup rootGroup = getGroup(getRootGroupId());
+            for (final ProcessorNode procNode : rootGroup.findAllProcessors()) 
{
+                
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class,
 procNode.getProcessor(), nodeState);
+            }
+            for (final ControllerServiceNode serviceNode : 
getAllControllerServices()) {
+                
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class,
 serviceNode.getControllerServiceImplementation(), nodeState);
+            }
+            for (final ReportingTaskNode reportingTaskNode : 
getAllReportingTasks()) {
+                
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class,
 reportingTaskNode.getReportingTask(), nodeState);
+            }
+
             // update primary
             this.primary = primary;
             eventDrivenWorkerQueue.setPrimary(primary);
@@ -3078,7 +3092,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             public boolean isInputAvailable() {
                 try {
                     return 
contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(),
 event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier()));
-                } catch (IOException e) {
+                } catch (final IOException e) {
                     return false;
                 }
             }
@@ -3087,7 +3101,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             public boolean isOutputAvailable() {
                 try {
                     return 
contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), 
event.getContentClaimSection(), event.getContentClaimIdentifier()));
-                } catch (IOException e) {
+                } catch (final IOException e) {
                     return false;
                 }
             }
@@ -3387,7 +3401,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         private final NodeProtocolSender protocolSender;
         private final DateFormat dateFormatter = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
 
-        public BulletinsTask(NodeProtocolSender protocolSender) {
+        public BulletinsTask(final NodeProtocolSender protocolSender) {
             if (protocolSender == null) {
                 throw new IllegalArgumentException("NodeProtocolSender may not 
be null.");
             }
@@ -3543,7 +3557,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
     private class HeartbeatMessageGeneratorTask implements Runnable {
 
-        private AtomicReference<HeartbeatMessage> heartbeatMessageRef = new 
AtomicReference<>();
+        private final AtomicReference<HeartbeatMessage> heartbeatMessageRef = 
new AtomicReference<>();
 
         @Override
         public void run() {
@@ -3610,7 +3624,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     @Override
-    public List<ProvenanceEventRecord> getProvenanceEvents(long firstEventId, 
int maxRecords) throws IOException {
+    public List<ProvenanceEventRecord> getProvenanceEvents(final long 
firstEventId, final int maxRecords) throws IOException {
         return new 
ArrayList<ProvenanceEventRecord>(provenanceEventRepository.getEvents(firstEventId,
 maxRecords));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
index f1fef15..2837d35 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -41,7 +42,6 @@ import java.util.jar.Manifest;
 import org.apache.nifi.util.FileUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,22 +61,34 @@ public final class NarUnpacker {
     };
 
     public static ExtensionMapping unpackNars(final NiFiProperties props) {
-        final File narLibraryDir = props.getNarLibraryDirectory();
+        final List<Path> narLibraryDirs = props.getNarLibraryDirectories();
         final File frameworkWorkingDir = props.getFrameworkWorkingDirectory();
         final File extensionsWorkingDir = 
props.getExtensionsWorkingDirectory();
         final File docsWorkingDir = 
props.getComponentDocumentationWorkingDirectory();
 
         try {
+            File unpackedFramework = null;
+            final Set<File> unpackedExtensions = new HashSet<>();
+            final List<File> narFiles = new ArrayList<>();
+
             // make sure the nar directories are there and accessible
-            FileUtils.ensureDirectoryExistAndCanAccess(narLibraryDir);
             FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDir);
             FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDir);
             FileUtils.ensureDirectoryExistAndCanAccess(docsWorkingDir);
 
-            File unpackedFramework = null;
-            final Set<File> unpackedExtensions = new HashSet<>();
-            final File[] narFiles = narLibraryDir.listFiles(NAR_FILTER);
-            if (narFiles != null) {
+            for (Path narLibraryDir : narLibraryDirs) {
+
+                File narDir = narLibraryDir.toFile();
+                FileUtils.ensureDirectoryExistAndCanAccess(narDir);
+
+                File[] dirFiles = narDir.listFiles(NAR_FILTER);
+                if (dirFiles != null) {
+                    List<File> fileList = Arrays.asList(dirFiles);
+                    narFiles.addAll(fileList);
+                }
+            }
+
+            if (!narFiles.isEmpty()) {
                 for (File narFile : narFiles) {
                     logger.debug("Expanding NAR file: " + 
narFile.getAbsolutePath());
 
@@ -91,7 +103,8 @@ public final class NarUnpacker {
                         // determine if this is the framework
                         if (NarClassLoaders.FRAMEWORK_NAR_ID.equals(narId)) {
                             if (unpackedFramework != null) {
-                                throw new IllegalStateException("Multiple 
framework NARs discovered. Only one framework is permitted.");
+                                throw new IllegalStateException(
+                                        "Multiple framework NARs discovered. 
Only one framework is permitted.");
                             }
 
                             unpackedFramework = unpackNar(narFile, 
frameworkWorkingDir);
@@ -108,7 +121,8 @@ public final class NarUnpacker {
                     throw new IllegalStateException("Framework NAR cannot be 
read.");
                 }
 
-                // Determine if any nars no longer exist and delete their 
working directories. This happens
+                // Determine if any nars no longer exist and delete their
+                // working directories. This happens
                 // if a new version of a nar is dropped into the lib dir.
                 // ensure no old framework are present
                 final File[] frameworkWorkingDirContents = 
frameworkWorkingDir.listFiles();
@@ -131,7 +145,8 @@ public final class NarUnpacker {
                 }
             }
 
-            // attempt to delete any docs files that exist so that any 
components that have been removed
+            // attempt to delete any docs files that exist so that any
+            // components that have been removed
             // will no longer have entries in the docs folder
             final File[] docsFiles = docsWorkingDir.listFiles();
             if (docsFiles != null) {
@@ -144,7 +159,8 @@ public final class NarUnpacker {
             mapExtensions(extensionsWorkingDir, docsWorkingDir, 
extensionMapping);
             return extensionMapping;
         } catch (IOException e) {
-            logger.warn("Unable to load NAR library bundles due to " + e + " 
Will proceed without loading any further Nar bundles");
+            logger.warn("Unable to load NAR library bundles due to " + e
+                    + " Will proceed without loading any further Nar bundles");
             if (logger.isDebugEnabled()) {
                 logger.warn("", e);
             }
@@ -153,7 +169,8 @@ public final class NarUnpacker {
         return null;
     }
 
-    private static void mapExtensions(final File workingDirectory, final File 
docsDirectory, final ExtensionMapping mapping) throws IOException {
+    private static void mapExtensions(final File workingDirectory, final File 
docsDirectory,
+            final ExtensionMapping mapping) throws IOException {
         final File[] directoryContents = workingDirectory.listFiles();
         if (directoryContents != null) {
             for (final File file : directoryContents) {
@@ -169,19 +186,24 @@ public final class NarUnpacker {
     /**
      * Unpacks the specified nar into the specified base working directory.
      *
-     * @param nar the nar to unpack
-     * @param baseWorkingDirectory the directory to unpack to
+     * @param nar
+     *            the nar to unpack
+     * @param baseWorkingDirectory
+     *            the directory to unpack to
      * @return the directory to the unpacked NAR
-     * @throws IOException if unable to explode nar
+     * @throws IOException
+     *             if unable to explode nar
      */
-    private static File unpackNar(final File nar, final File 
baseWorkingDirectory) throws IOException {
+    private static File unpackNar(final File nar, final File 
baseWorkingDirectory)
+            throws IOException {
         final File narWorkingDirectory = new File(baseWorkingDirectory, 
nar.getName() + "-unpacked");
 
         // if the working directory doesn't exist, unpack the nar
         if (!narWorkingDirectory.exists()) {
             unpack(nar, narWorkingDirectory, calculateMd5sum(nar));
         } else {
-            // the working directory does exist. Run MD5 sum against the nar 
file and check if the nar has changed since it was deployed.
+            // the working directory does exist. Run MD5 sum against the nar
+            // file and check if the nar has changed since it was deployed.
             final byte[] narMd5 = calculateMd5sum(nar);
             final File workingHashFile = new File(narWorkingDirectory, 
HASH_FILENAME);
             if (!workingHashFile.exists()) {
@@ -190,7 +212,8 @@ public final class NarUnpacker {
             } else {
                 final byte[] hashFileContents = 
Files.readAllBytes(workingHashFile.toPath());
                 if (!Arrays.equals(hashFileContents, narMd5)) {
-                    logger.info("Contents of nar {} have changed. Reloading.", 
new Object[]{nar.getAbsolutePath()});
+                    logger.info("Contents of nar {} have changed. Reloading.",
+                            new Object[] { nar.getAbsolutePath() });
                     FileUtils.deleteFile(narWorkingDirectory, true);
                     unpack(nar, narWorkingDirectory, narMd5);
                 }
@@ -204,11 +227,13 @@ public final class NarUnpacker {
      * Unpacks the NAR to the specified directory. Creates a checksum file that
      * used to determine if future expansion is necessary.
      *
-     * @param workingDirectory the root directory to which the NAR should be
-     * unpacked.
-     * @throws IOException if the NAR could not be unpacked.
+     * @param workingDirectory
+     *            the root directory to which the NAR should be unpacked.
+     * @throws IOException
+     *             if the NAR could not be unpacked.
      */
-    private static void unpack(final File nar, final File workingDirectory, 
final byte[] hash) throws IOException {
+    private static void unpack(final File nar, final File workingDirectory, 
final byte[] hash)
+            throws IOException {
 
         try (JarFile jarFile = new JarFile(nar)) {
             Enumeration<JarEntry> jarEntries = jarFile.entries();
@@ -230,7 +255,8 @@ public final class NarUnpacker {
         }
     }
 
-    private static void unpackDocumentation(final File jar, final File 
docsDirectory, final ExtensionMapping extensionMapping) throws IOException {
+    private static void unpackDocumentation(final File jar, final File 
docsDirectory,
+            final ExtensionMapping extensionMapping) throws IOException {
         // determine the components that may have documentation
         determineDocumentedNiFiComponents(jar, extensionMapping);
 
@@ -240,7 +266,8 @@ public final class NarUnpacker {
                 final String entryName = "docs/" + componentName;
 
                 // go through each entry in this jar
-                for (final Enumeration<JarEntry> jarEnumeration = 
jarFile.entries(); jarEnumeration.hasMoreElements();) {
+                for (final Enumeration<JarEntry> jarEnumeration = 
jarFile.entries(); jarEnumeration
+                        .hasMoreElements();) {
                     final JarEntry jarEntry = jarEnumeration.nextElement();
 
                     // if this entry is documentation for this component
@@ -252,8 +279,10 @@ public final class NarUnpacker {
                             final File componentDocsDirectory = new 
File(docsDirectory, name);
 
                             // ensure the documentation directory can be 
created
-                            if (!componentDocsDirectory.exists() && 
!componentDocsDirectory.mkdirs()) {
-                                logger.warn("Unable to create docs directory " 
+ componentDocsDirectory.getAbsolutePath());
+                            if (!componentDocsDirectory.exists()
+                                    && !componentDocsDirectory.mkdirs()) {
+                                logger.warn("Unable to create docs directory "
+                                        + 
componentDocsDirectory.getAbsolutePath());
                                 break;
                             }
                         } else {
@@ -268,19 +297,27 @@ public final class NarUnpacker {
         }
     }
 
-    private static void determineDocumentedNiFiComponents(final File jar, 
final ExtensionMapping extensionMapping) throws IOException {
+    private static void determineDocumentedNiFiComponents(final File jar,
+            final ExtensionMapping extensionMapping) throws IOException {
         try (final JarFile jarFile = new JarFile(jar)) {
-            final JarEntry processorEntry = 
jarFile.getJarEntry("META-INF/services/org.apache.nifi.processor.Processor");
-            final JarEntry reportingTaskEntry = 
jarFile.getJarEntry("META-INF/services/org.apache.nifi.reporting.ReportingTask");
-            final JarEntry controllerServiceEntry = 
jarFile.getJarEntry("META-INF/services/org.apache.nifi.controller.ControllerService");
-
-            
extensionMapping.addAllProcessors(determineDocumentedNiFiComponents(jarFile, 
processorEntry));
-            
extensionMapping.addAllReportingTasks(determineDocumentedNiFiComponents(jarFile,
 reportingTaskEntry));
-            
extensionMapping.addAllControllerServices(determineDocumentedNiFiComponents(jarFile,
 controllerServiceEntry));
+            final JarEntry processorEntry = jarFile
+                    
.getJarEntry("META-INF/services/org.apache.nifi.processor.Processor");
+            final JarEntry reportingTaskEntry = jarFile
+                    
.getJarEntry("META-INF/services/org.apache.nifi.reporting.ReportingTask");
+            final JarEntry controllerServiceEntry = jarFile
+                    
.getJarEntry("META-INF/services/org.apache.nifi.controller.ControllerService");
+
+            
extensionMapping.addAllProcessors(determineDocumentedNiFiComponents(jarFile,
+                    processorEntry));
+            
extensionMapping.addAllReportingTasks(determineDocumentedNiFiComponents(jarFile,
+                    reportingTaskEntry));
+            
extensionMapping.addAllControllerServices(determineDocumentedNiFiComponents(jarFile,
+                    controllerServiceEntry));
         }
     }
 
-    private static List<String> determineDocumentedNiFiComponents(final 
JarFile jarFile, final JarEntry jarEntry) throws IOException {
+    private static List<String> determineDocumentedNiFiComponents(final 
JarFile jarFile,
+            final JarEntry jarEntry) throws IOException {
         final List<String> componentNames = new ArrayList<>();
 
         if (jarEntry == null) {
@@ -288,13 +325,15 @@ public final class NarUnpacker {
         }
 
         try (final InputStream entryInputStream = 
jarFile.getInputStream(jarEntry);
-                final BufferedReader reader = new BufferedReader(new 
InputStreamReader(entryInputStream))) {
+                final BufferedReader reader = new BufferedReader(new 
InputStreamReader(
+                        entryInputStream))) {
             String line;
             while ((line = reader.readLine()) != null) {
                 final String trimmedLine = line.trim();
                 if (!trimmedLine.isEmpty() && !trimmedLine.startsWith("#")) {
                     final int indexOfPound = trimmedLine.indexOf("#");
-                    final String effectiveLine = (indexOfPound > 0) ? 
trimmedLine.substring(0, indexOfPound) : trimmedLine;
+                    final String effectiveLine = (indexOfPound > 0) ? 
trimmedLine.substring(0,
+                            indexOfPound) : trimmedLine;
                     componentNames.add(effectiveLine);
                 }
             }
@@ -307,12 +346,16 @@ public final class NarUnpacker {
      * Creates the specified file, whose contents will come from the
      * <tt>InputStream</tt>.
      *
-     * @param inputStream the contents of the file to create.
-     * @param file the file to create.
-     * @throws IOException if the file could not be created.
+     * @param inputStream
+     *            the contents of the file to create.
+     * @param file
+     *            the file to create.
+     * @throws IOException
+     *             if the file could not be created.
      */
     private static void makeFile(final InputStream inputStream, final File 
file) throws IOException {
-        try (final InputStream in = inputStream; final FileOutputStream fos = 
new FileOutputStream(file)) {
+        try (final InputStream in = inputStream;
+                final FileOutputStream fos = new FileOutputStream(file)) {
             byte[] bytes = new byte[65536];
             int numRead;
             while ((numRead = in.read(bytes)) != -1) {
@@ -324,9 +367,11 @@ public final class NarUnpacker {
     /**
      * Calculates an md5 sum of the specified file.
      *
-     * @param file to calculate the md5sum of
+     * @param file
+     *            to calculate the md5sum of
      * @return the md5sum bytes
-     * @throws IOException if cannot read file
+     * @throws IOException
+     *             if cannot read file
      */
     private static byte[] calculateMd5sum(final File file) throws IOException {
         try (final FileInputStream inputStream = new FileInputStream(file)) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java
new file mode 100644
index 0000000..fa3cd62
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java
@@ -0,0 +1,185 @@
+package org.apache.nifi.nar;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class NarUnpackerTest {
+
+    @BeforeClass
+    public static void copyResources() throws IOException {
+
+        final Path sourcePath = Paths.get("./src/test/resources");
+        final Path targetPath = Paths.get("./target");
+
+        Files.walkFileTree(sourcePath, new SimpleFileVisitor<Path>() {
+
+            @Override
+            public FileVisitResult preVisitDirectory(Path dir, 
BasicFileAttributes attrs)
+                    throws IOException {
+
+                Path relativeSource = sourcePath.relativize(dir);
+                Path target = targetPath.resolve(relativeSource);
+
+                Files.createDirectories(target);
+
+                return FileVisitResult.CONTINUE;
+
+            }
+
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs)
+                    throws IOException {
+
+                Path relativeSource = sourcePath.relativize(file);
+                Path target = targetPath.resolve(relativeSource);
+
+                Files.copy(file, target, REPLACE_EXISTING);
+
+                return FileVisitResult.CONTINUE;
+            }
+        });
+    }
+
+    @Test
+    public void testUnpackNars() {
+
+        NiFiProperties properties = 
loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties");
+
+        assertEquals("./target/NarUnpacker/lib/",
+                properties.getProperty("nifi.nar.library.directory"));
+        assertEquals("./target/NarUnpacker/lib2/",
+                properties.getProperty("nifi.nar.library.directory.alt"));
+
+        final ExtensionMapping extensionMapping = 
NarUnpacker.unpackNars(properties);
+
+        assertEquals(2, extensionMapping.getAllExtensionNames().size());
+
+        assertTrue(extensionMapping.getAllExtensionNames().contains(
+                "org.apache.nifi.processors.dummy.one"));
+        assertTrue(extensionMapping.getAllExtensionNames().contains(
+                "org.apache.nifi.processors.dummy.two"));
+
+        final File extensionsWorkingDir = 
properties.getExtensionsWorkingDirectory();
+        File[] extensionFiles = extensionsWorkingDir.listFiles();
+
+        assertEquals(2, extensionFiles.length);
+        assertEquals("dummy-one.nar-unpacked", extensionFiles[0].getName());
+        assertEquals("dummy-two.nar-unpacked", extensionFiles[1].getName());
+    }
+
+    @Test
+    public void testUnpackNarsFromEmptyDir() throws IOException {
+
+        NiFiProperties properties = 
loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties");
+
+        final File emptyDir = new File("./target/empty/dir");
+        emptyDir.delete();
+        emptyDir.deleteOnExit();
+        assertTrue(emptyDir.mkdirs());
+
+        properties.setProperty("nifi.nar.library.directory.alt", 
emptyDir.toString());
+
+        final ExtensionMapping extensionMapping = 
NarUnpacker.unpackNars(properties);
+
+        assertEquals(1, extensionMapping.getAllExtensionNames().size());
+        assertTrue(extensionMapping.getAllExtensionNames().contains(
+                "org.apache.nifi.processors.dummy.one"));
+
+        final File extensionsWorkingDir = 
properties.getExtensionsWorkingDirectory();
+        File[] extensionFiles = extensionsWorkingDir.listFiles();
+
+        assertEquals(1, extensionFiles.length);
+        assertEquals("dummy-one.nar-unpacked", extensionFiles[0].getName());
+    }
+
+    @Test
+    public void testUnpackNarsFromNonExistantDir() {
+
+        final File nonExistantDir = new 
File("./target/this/dir/should/not/exist/");
+        nonExistantDir.delete();
+        nonExistantDir.deleteOnExit();
+
+        NiFiProperties properties = 
loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties");
+        properties.setProperty("nifi.nar.library.directory.alt", 
nonExistantDir.toString());
+
+        final ExtensionMapping extensionMapping = 
NarUnpacker.unpackNars(properties);
+
+        assertTrue(extensionMapping.getAllExtensionNames().contains(
+                "org.apache.nifi.processors.dummy.one"));
+
+        assertEquals(1, extensionMapping.getAllExtensionNames().size());
+
+        final File extensionsWorkingDir = 
properties.getExtensionsWorkingDirectory();
+        File[] extensionFiles = extensionsWorkingDir.listFiles();
+
+        assertEquals(1, extensionFiles.length);
+        assertEquals("dummy-one.nar-unpacked", extensionFiles[0].getName());
+    }
+
+    @Test
+    public void testUnpackNarsFromNonDir() throws IOException {
+
+        final File nonDir = new File("./target/file.txt");
+        nonDir.createNewFile();
+        nonDir.deleteOnExit();
+
+        NiFiProperties properties = 
loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties");
+        properties.setProperty("nifi.nar.library.directory.alt", 
nonDir.toString());
+
+        final ExtensionMapping extensionMapping = 
NarUnpacker.unpackNars(properties);
+
+        assertNull(extensionMapping);
+    }
+
+    private NiFiProperties loadSpecifiedProperties(String propertiesFile) {
+        String file = 
NarUnpackerTest.class.getResource(propertiesFile).getFile();
+
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, file);
+
+        NiFiProperties properties = NiFiProperties.getInstance();
+
+        // clear out existing properties
+        for (String prop : properties.stringPropertyNames()) {
+            properties.remove(prop);
+        }
+
+        InputStream inStream = null;
+        try {
+            inStream = new BufferedInputStream(new FileInputStream(file));
+            properties.load(inStream);
+        } catch (final Exception ex) {
+            throw new RuntimeException("Cannot load properties file due to "
+                    + ex.getLocalizedMessage(), ex);
+        } finally {
+            if (null != inStream) {
+                try {
+                    inStream.close();
+                } catch (final Exception ex) {
+                    /**
+                     * do nothing *
+                     */
+                }
+            }
+        }
+
+        return properties;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties
new file mode 100644
index 0000000..92e4a92
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Core Properties #
+nifi.version=nifi-test 3.0.0
+nifi.flow.configuration.file=./target/flow.xml.gz
+nifi.flow.configuration.archive.dir=./target/archive/
+nifi.flowcontroller.autoResumeState=true
+nifi.flowcontroller.graceful.shutdown.period=10 sec
+nifi.flowservice.writedelay.interval=2 sec
+nifi.administrative.yield.duration=30 sec
+
+nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
+nifi.controller.service.configuration.file=./target/controller-services.xml
+nifi.templates.directory=./target/templates
+nifi.ui.banner.text=UI Banner Text
+nifi.ui.autorefresh.interval=30 sec
+nifi.nar.library.directory=./target/NarUnpacker/lib/
+nifi.nar.library.directory.alt=./target/NarUnpacker/lib2/
+
+nifi.nar.working.directory=./target/work/nar/
+
+# H2 Settings
+nifi.database.directory=./target/database_repository
+nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
+
+# FlowFile Repository
+nifi.flowfile.repository.directory=./target/test-repo
+nifi.flowfile.repository.partitions=1
+nifi.flowfile.repository.checkpoint.interval=2 mins
+nifi.queue.swap.threshold=20000
+nifi.swap.storage.directory=./target/test-repo/swap
+nifi.swap.in.period=5 sec
+nifi.swap.in.threads=1
+nifi.swap.out.period=5 sec
+nifi.swap.out.threads=4
+
+# Content Repository
+nifi.content.claim.max.appendable.size=10 MB
+nifi.content.claim.max.flow.files=100
+nifi.content.repository.directory.default=./target/content_repository
+
+# Provenance Repository Properties
+nifi.provenance.repository.storage.directory=./target/provenance_repository
+nifi.provenance.repository.max.storage.time=24 hours
+nifi.provenance.repository.max.storage.size=1 GB
+nifi.provenance.repository.rollover.time=5 mins
+nifi.provenance.repository.rollover.size=100 MB
+
+# Site to Site properties
+nifi.remote.input.socket.port=9990
+nifi.remote.input.secure=true
+
+# web properties #
+nifi.web.war.directory=./target/lib
+nifi.web.http.host=
+nifi.web.http.port=8080
+nifi.web.https.host=
+nifi.web.https.port=
+nifi.web.jetty.working.directory=./target/work/jetty
+
+# security properties #
+nifi.sensitive.props.key=key
+nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
+nifi.sensitive.props.provider=BC
+
+nifi.security.keystore=
+nifi.security.keystoreType=
+nifi.security.keystorePasswd=
+nifi.security.keyPasswd=
+nifi.security.truststore=
+nifi.security.truststoreType=
+nifi.security.truststorePasswd=
+nifi.security.needClientAuth=
+nifi.security.authorizedUsers.file=./target/conf/authorized-users.xml
+nifi.security.user.credential.cache.duration=24 hours
+nifi.security.user.authority.provider=nifi.authorization.FileAuthorizationProvider
+nifi.security.support.new.account.requests=
+nifi.security.default.user.roles=
+
+# cluster common properties (cluster manager and nodes must have same values) #
+nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.is.secure=false
+nifi.cluster.protocol.socket.timeout=30 sec
+nifi.cluster.protocol.connection.handshake.timeout=45 sec
+# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties 
must be configured #
+nifi.cluster.protocol.use.multicast=false
+nifi.cluster.protocol.multicast.address=
+nifi.cluster.protocol.multicast.port=
+nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms
+nifi.cluster.protocol.multicast.service.locator.attempts=3
+nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec
+
+# cluster node properties (only configure for cluster nodes) #
+nifi.cluster.is.node=false
+nifi.cluster.node.address=
+nifi.cluster.node.protocol.port=
+nifi.cluster.node.protocol.threads=2
+# if multicast is not used, nifi.cluster.node.unicast.xxx must have same 
values as nifi.cluster.manager.xxx #
+nifi.cluster.node.unicast.manager.address=
+nifi.cluster.node.unicast.manager.protocol.port=
+nifi.cluster.node.unicast.manager.authority.provider.port=
+
+# cluster manager properties (only configure for cluster manager) #
+nifi.cluster.is.manager=false
+nifi.cluster.manager.address=
+nifi.cluster.manager.protocol.port=
+nifi.cluster.manager.authority.provider.port=
+nifi.cluster.manager.authority.provider.threads=10
+nifi.cluster.manager.node.firewall.file=
+nifi.cluster.manager.node.event.history.size=10
+nifi.cluster.manager.node.api.connection.timeout=30 sec
+nifi.cluster.manager.node.api.read.timeout=30 sec
+nifi.cluster.manager.node.api.request.threads=10
+nifi.cluster.manager.flow.retrieval.delay=5 sec
+nifi.cluster.manager.protocol.threads=10
+nifi.cluster.manager.safemode.duration=0 sec

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar
new file mode 100644
index 0000000..598b27f
Binary files /dev/null and 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar
 differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/nifi-framework-nar.nar
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/nifi-framework-nar.nar
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/nifi-framework-nar.nar
new file mode 100644
index 0000000..d2a8b96
Binary files /dev/null and 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/nifi-framework-nar.nar
 differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar
new file mode 100644
index 0000000..a1021ba
Binary files /dev/null and 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar
 differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 982d9ff..773f9cf 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
 
 public class StandardRemoteGroupPort extends RemoteGroupPort {
 
-    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); 
// send batches of up to 5 seconds
+    private static final long BATCH_SEND_NANOS = 
TimeUnit.MILLISECONDS.toNanos(500L); // send batches of up to 500 millis
     public static final String USER_AGENT = "NiFi-Site-to-Site";
     public static final String CONTENT_TYPE = "application/octet-stream";
 
@@ -98,7 +98,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         return targetRunning.get();
     }
 
-    public void setTargetRunning(boolean targetRunning) {
+    public void setTargetRunning(final boolean targetRunning) {
         this.targetRunning.set(targetRunning);
     }
 
@@ -126,12 +126,12 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         super.onSchedulingStart();
 
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
-                .url(remoteGroup.getTargetUri().toString())
-                .portIdentifier(getIdentifier())
-                .sslContext(sslContext)
-                .eventReporter(remoteGroup.getEventReporter())
-                .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
-                .build();
+        .url(remoteGroup.getTargetUri().toString())
+        .portIdentifier(getIdentifier())
+        .sslContext(sslContext)
+        .eventReporter(remoteGroup.getEventReporter())
+        .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
+        .build();
         clientRef.set(client);
     }
 
@@ -147,7 +147,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             return;
         }
 
-        String url = getRemoteProcessGroup().getTargetUri().toString();
+        final String url = getRemoteProcessGroup().getTargetUri().toString();
 
         // If we are sending data, we need to ensure that we have at least 1 
FlowFile to send. Otherwise,
         // we don't want to create a transaction at all.
@@ -230,7 +230,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         return remoteGroup.getYieldDuration();
     }
 
-    private int transferFlowFiles(final Transaction transaction, final 
ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) 
throws IOException, ProtocolException {
+    private int transferFlowFiles(final Transaction transaction, final 
ProcessContext context, final ProcessSession session, final FlowFile 
firstFlowFile) throws IOException, ProtocolException {
         FlowFile flowFile = firstFlowFile;
 
         try {
@@ -288,7 +288,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
 
             final String flowFileDescription = (flowFilesSent.size() < 20) ? 
flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
             logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds 
at a rate of {}", new Object[]{
-                this, flowFileDescription, dataSize, 
transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
+                    this, flowFileDescription, dataSize, 
transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
 
             return flowFilesSent.size();
         } catch (final Exception e) {
@@ -345,7 +345,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
             final String dataSize = FormatUtils.formatDataSize(bytesReceived);
             logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[]{
-                this, flowFileDescription, dataSize, 
transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
+                    this, flowFileDescription, dataSize, 
transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
         }
 
         return flowFilesReceived.size();
@@ -367,16 +367,16 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         ValidationResult error = null;
         if (!targetExists.get()) {
             error = new ValidationResult.Builder()
-                    .explanation(String.format("Remote instance indicates that 
port '%s' no longer exists.", getName()))
-                    .subject(String.format("Remote port '%s'", getName()))
-                    .valid(false)
-                    .build();
+            .explanation(String.format("Remote instance indicates that port 
'%s' no longer exists.", getName()))
+            .subject(String.format("Remote port '%s'", getName()))
+            .valid(false)
+            .build();
         } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT 
&& getConnections(Relationship.ANONYMOUS).isEmpty()) {
             error = new ValidationResult.Builder()
-                    .explanation(String.format("Port '%s' has no outbound 
connections", getName()))
-                    .subject(String.format("Remote port '%s'", getName()))
-                    .valid(false)
-                    .build();
+            .explanation(String.format("Port '%s' has no outbound 
connections", getName()))
+            .subject(String.format("Remote port '%s'", getName()))
+            .valid(false)
+            .build();
         }
 
         if (error != null) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
index 6d89cbf..ea36abd 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
@@ -257,7 +257,7 @@ public final class SnippetUtils {
             final PropertyDescriptor descriptor = entry.getKey();
             final String propertyValue = entry.getValue();
 
-            if (descriptor.getControllerServiceDefinition() != null) {
+            if (descriptor.getControllerServiceDefinition() != null && 
propertyValue != null) {
                 final ControllerServiceNode referencedNode = 
flowController.getControllerServiceNode(propertyValue);
                 if (referencedNode == null) {
                     throw new IllegalStateException("Controller Service with 
ID " + propertyValue + " is referenced in template but cannot be found");

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 3ff1e88..ede32ab 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -42,8 +42,12 @@
             <groupId>org.apache.hadoop</groupId> 
             <artifactId>hadoop-common</artifactId> 
             <scope>provided</scope> 
-        </dependency> 
-        <dependency> 
+        </dependency>
+        <dependency>
+               <groupId>org.apache.nifi</groupId>
+               
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId> 
             <artifactId>hadoop-hdfs</artifactId> 
             <scope>provided</scope> 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 3294ead..355950f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -70,7 +70,7 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
 
     // variables shared by all threads of this processor
     // Hadoop Configuration and FileSystem
-    protected final AtomicReference<Tuple<Configuration, FileSystem>> 
hdfsResources = new AtomicReference<>();
+    private final AtomicReference<Tuple<Configuration, FileSystem>> 
hdfsResources = new AtomicReference<>();
 
     @Override
     protected void init(ProcessorInitializationContext context) {
@@ -153,7 +153,7 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
             String disableCacheName = 
String.format("fs.%s.impl.disable.cache", 
FileSystem.getDefaultUri(config).getScheme());
             config.set(disableCacheName, "true");
 
-            final FileSystem fs = FileSystem.get(config);
+            final FileSystem fs = getFileSystem(config);
             getLogger().info(
                     "Initialized a new HDFS File System with working dir: {} 
default block size: {} default replication: {} config: {}",
                     new Object[]{fs.getWorkingDirectory(), 
fs.getDefaultBlockSize(new Path(dir)),
@@ -165,6 +165,18 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
         }
     }
 
+    /**
+     * This exists in order to allow unit tests to override it so that they 
don't take several minutes waiting
+     * for UDP packets to be received
+     *
+     * @param config the configuration to use
+     * @return the FileSystem that is created for the given Configuration
+     * @throws IOException if unable to create the FileSystem
+     */
+    protected FileSystem getFileSystem(final Configuration config) throws 
IOException {
+        return FileSystem.get(config);
+    }
+
     /*
      * Drastically reduce the timeout of a socket connection from the default 
in FileSystem.get()
      */
@@ -216,4 +228,39 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
         };
     }
 
+
+    /**
+     * Returns the relative path of the child that does not include the 
filename
+     * or the root path.
+     * @param root the path to relativize from
+     * @param child the path to relativize
+     * @return the relative path
+     */
+    public static String getPathDifference(final Path root, final Path child) {
+        final int depthDiff = child.depth() - root.depth();
+        if (depthDiff <= 1) {
+            return "".intern();
+        }
+        String lastRoot = root.getName();
+        Path childsParent = child.getParent();
+        final StringBuilder builder = new StringBuilder();
+        builder.append(childsParent.getName());
+        for (int i = (depthDiff - 3); i >= 0; i--) {
+            childsParent = childsParent.getParent();
+            String name = childsParent.getName();
+            if (name.equals(lastRoot) && 
childsParent.toString().endsWith(root.toString())) {
+                break;
+            }
+            builder.insert(0, Path.SEPARATOR).insert(0, name);
+        }
+        return builder.toString();
+    }
+
+    protected Configuration getConfiguration() {
+        return hdfsResources.get().getKey();
+    }
+
+    protected FileSystem getFileSystem() {
+        return hdfsResources.get().getValue();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index f462277..186a290 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -156,7 +156,7 @@ public class CreateHadoopSequenceFile extends 
AbstractHadoopProcessor {
         final String fileName = 
flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
         flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), fileName);
         try {
-            flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, 
hdfsResources.get().getKey(), compressionType);
+            flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, 
getConfiguration(), compressionType);
             session.transfer(flowFile, RELATIONSHIP_SUCCESS);
             getLogger().info("Transferred flowfile {} to {}", new 
Object[]{flowFile, RELATIONSHIP_SUCCESS});
         } catch (ProcessException e) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
new file mode 100644
index 0000000..4a52fb7
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+@SupportsBatching
+@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
+@CapabilityDescription("Retrieves a file from HDFS. The content of the 
incoming FlowFile is replaced by the content of the file in HDFS. "
+        + "The file in HDFS is left intact without any changes being made to 
it.")
+@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile 
is routed to 'failure', this attribute is added indicating why the file could "
+        + "not be fetched from HDFS")
+@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
+public class FetchHDFS extends AbstractHadoopProcessor {
+    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+        .name("HDFS Filename")
+        .description("The name of the HDFS file to retrieve")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .defaultValue("${path}/${filename}")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles will be routed to this relationship once they 
have been updated with the content of the HDFS file")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("FlowFiles will be routed to this relationship if the 
content of the HDFS file cannot be retrieved and trying again will likely not 
be helpful. "
+                + "This would occur, for instance, if the file is not found or 
if there is a permissions issue")
+        .build();
+    static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
+        .name("comms.failure")
+        .description("FlowFiles will be routed to this relationship if the 
content of the HDFS file cannot be retrieve due to a communications failure. "
+                + "This generally indicates that the Fetch should be tried 
again.")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HADOOP_CONFIGURATION_RESOURCES);
+        properties.add(FILENAME);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_COMMS_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if ( flowFile == null ) {
+            return;
+        }
+
+        final FileSystem hdfs = getFileSystem();
+        final Path path = new 
Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
+        final URI uri = path.toUri();
+
+        final StopWatch stopWatch = new StopWatch(true);
+        try (final FSDataInputStream inStream = hdfs.open(path, 16384)) {
+            flowFile = session.importFrom(inStream, flowFile);
+            stopWatch.stop();
+            getLogger().info("Successfully received content from {} for {} in 
{}", new Object[] {uri, flowFile, stopWatch.getDuration()});
+            session.getProvenanceReporter().modifyContent(flowFile, "Fetched 
content from " + uri, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final FileNotFoundException | AccessControlException e) {
+            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to failure", new Object[] {uri, flowFile, e});
+            flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", 
e.getMessage());
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (final IOException e) {
+            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_COMMS_FAILURE);
+        }
+    }
+
+}

Reply via email to