http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java index a86a416..d7bad78 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/html/HtmlDocumentationWriter.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.Set; import javax.xml.stream.FactoryConfigurationError; import javax.xml.stream.XMLOutputFactory; @@ -36,6 +37,7 @@ import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.documentation.DocumentationWriter; +import org.apache.nifi.nar.ExtensionManager; /** * Generates HTML documentation for a ConfigurableComponent. This class is used @@ -148,9 +150,7 @@ public class HtmlDocumentationWriter implements DocumentationWriter { xmlStreamWriter.writeCharacters(", "); } - final String link = "../" + linkedComponent.getCanonicalName() + "/index.html"; - - writeLink(xmlStreamWriter, linkedComponent.getSimpleName(), link); + writeLinkForComponent(xmlStreamWriter, linkedComponent); ++index; } @@ -434,12 +434,24 @@ public class HtmlDocumentationWriter implements DocumentationWriter { } xmlStreamWriter.writeEndElement(); } else if (property.getControllerServiceDefinition() != null) { - Class<? extends ControllerService> controllerServiceClass = property - .getControllerServiceDefinition(); + Class<? extends ControllerService> controllerServiceClass = property.getControllerServiceDefinition(); - writeSimpleElement(xmlStreamWriter, "strong", "Controller Service: "); + writeSimpleElement(xmlStreamWriter, "strong", "Controller Service API: "); xmlStreamWriter.writeEmptyElement("br"); xmlStreamWriter.writeCharacters(controllerServiceClass.getSimpleName()); + + final List<Class<? extends ControllerService>> implementations = lookupControllerServiceImpls(controllerServiceClass); + xmlStreamWriter.writeEmptyElement("br"); + if (implementations.size() > 0) { + final String title = implementations.size() > 1 ? "Implementations: " : "Implementation:"; + writeSimpleElement(xmlStreamWriter, "strong", title); + for (int i = 0; i < implementations.size(); i++) { + xmlStreamWriter.writeEmptyElement("br"); + writeLinkForComponent(xmlStreamWriter, implementations.get(i)); + } + } else { + xmlStreamWriter.writeCharacters("No implementations found."); + } } } @@ -519,4 +531,41 @@ public class HtmlDocumentationWriter implements DocumentationWriter { xmlStreamWriter.writeCharacters(text); xmlStreamWriter.writeEndElement(); } + + /** + * Writes a link to another configurable component + * + * @param xmlStreamWriter the xml stream writer + * @param clazz the configurable component to link to + * @throws XMLStreamException thrown if there is a problem writing the XML + */ + protected void writeLinkForComponent(final XMLStreamWriter xmlStreamWriter, final Class<?> clazz) throws XMLStreamException { + writeLink(xmlStreamWriter, clazz.getSimpleName(), "../" + clazz.getCanonicalName() + "/index.html"); + } + + /** + * Uses the {@link ExtensionManager} to discover any {@link ControllerService} implementations that implement a specific + * ControllerService API. + * + * @param parent the controller service API + * @return a list of controller services that implement the controller service API + */ + private List<Class<? extends ControllerService>> lookupControllerServiceImpls( + final Class<? extends ControllerService> parent) { + + final List<Class<? extends ControllerService>> implementations = new ArrayList<>(); + + // first get all ControllerService implementations + final Set<Class> controllerServices = ExtensionManager.getExtensions(ControllerService.class); + + // then iterate over all controller services looking for any that is a child of the parent + // ControllerService API that was passed in as a parameter + for (final Class<? extends ControllerService> controllerServiceClass : controllerServices) { + if (parent.isAssignableFrom(controllerServiceClass)) { + implementations.add(controllerServiceClass); + } + } + + return implementations; + } }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java index 4320c6e..cd68267 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/FullyDocumentedControllerService.java @@ -26,9 +26,9 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.processor.util.StandardValidators; -@CapabilityDescription("A documented controller service that can help you do things") +@CapabilityDescription("A documented controller service that can help you do things") @Tags({"one", "two", "three"}) -public class FullyDocumentedControllerService extends AbstractControllerService { +public class FullyDocumentedControllerService extends AbstractControllerService implements SampleService{ public static final PropertyDescriptor KEYSTORE = new PropertyDescriptor.Builder().name("Keystore Filename") .description("The fully-qualified filename of the Keystore").defaultValue(null) @@ -53,6 +53,10 @@ public class FullyDocumentedControllerService extends AbstractControllerService @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; - } + } + @Override + public void doSomething() { + // TODO Auto-generated method stub + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java index c6ed9fb..6016f95 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/html/ProcessorDocumentationWriterTest.java @@ -30,7 +30,7 @@ import org.apache.nifi.documentation.mock.MockProcessorInitializationContext; import org.junit.Test; public class ProcessorDocumentationWriterTest { - + @Test public void testFullyDocumentedProcessor() throws IOException { FullyDocumentedProcessor processor = new FullyDocumentedProcessor(); @@ -59,7 +59,7 @@ public class ProcessorDocumentationWriterTest { assertContains(results, FullyDocumentedProcessor.REL_SUCCESS.getDescription()); assertContains(results, FullyDocumentedProcessor.REL_FAILURE.getName()); assertContains(results, FullyDocumentedProcessor.REL_FAILURE.getDescription()); - assertContains(results, "Controller Service: "); + assertContains(results, "Controller Service API: "); assertContains(results, "SampleService"); assertNotContains(results, "iconSecure.png"); @@ -97,6 +97,6 @@ public class ProcessorDocumentationWriterTest { // relationships assertContains(results, "This processor has no relationships."); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java index 5859e1b..8ba33c5 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java @@ -101,7 +101,7 @@ public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall { try { ip = InetAddress.getByName(hostOrIp).getHostAddress(); } catch (final UnknownHostException uhe) { - logger.warn("Blocking unknown host: " + hostOrIp, uhe); + logger.warn("Blocking unknown host '{}'", hostOrIp, uhe); return false; } @@ -113,9 +113,11 @@ public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall { } // no match + logger.debug("Blocking host '{}' because it does not match our allowed list.", hostOrIp); return false; } catch (final IllegalArgumentException iae) { + logger.debug("Blocking requested host, '{}', because it is malformed.", hostOrIp, iae); return false; } } @@ -166,30 +168,30 @@ public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall { if (ipOrHostLine.contains("/")) { ipCidr = ipOrHostLine; } else if (ipOrHostLine.contains("\\")) { - logger.warn("CIDR IP notation uses forward slashes '/'. Replacing backslash '\\' with forward slash'/' for '" + ipOrHostLine + "'"); + logger.warn("CIDR IP notation uses forward slashes '/'. Replacing backslash '\\' with forward slash'/' for '{}'", ipOrHostLine); ipCidr = ipOrHostLine.replace("\\", "/"); } else { try { ipCidr = InetAddress.getByName(ipOrHostLine).getHostAddress(); if (!ipOrHostLine.equals(ipCidr)) { - logger.debug(String.format("Resolved host '%s' to ip '%s'", ipOrHostLine, ipCidr)); + logger.debug("Resolved host '{}' to ip '{}'", ipOrHostLine, ipCidr); } ipCidr += "/32"; - logger.debug("Adding CIDR to exact IP: " + ipCidr); + logger.debug("Adding CIDR to exact IP: '{}'", ipCidr); } catch (final UnknownHostException uhe) { - logger.warn("Firewall is skipping unknown host address: " + ipOrHostLine); + logger.warn("Firewall is skipping unknown host address: '{}'", ipOrHostLine); continue; } } try { - logger.debug("Adding CIDR IP to firewall: " + ipCidr); + logger.debug("Adding CIDR IP to firewall: '{}'", ipCidr); final SubnetUtils subnetUtils = new SubnetUtils(ipCidr); subnetUtils.setInclusiveHostCount(true); subnetInfos.add(subnetUtils.getInfo()); totalIpsAdded++; } catch (final IllegalArgumentException iae) { - logger.warn("Firewall is skipping invalid CIDR address: " + ipOrHostLine); + logger.warn("Firewall is skipping invalid CIDR address: '{}'", ipOrHostLine); } } @@ -197,7 +199,7 @@ public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall { if (totalIpsAdded == 0) { logger.info("No IPs added to firewall. Firewall will accept all requests."); } else { - logger.info(String.format("Added %d IP(s) to firewall. Only requests originating from the configured IPs will be accepted.", totalIpsAdded)); + logger.info("Added {} IP(s) to firewall. Only requests originating from the configured IPs will be accepted.", totalIpsAdded); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java index 441a3b2..5259f4f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java @@ -17,14 +17,17 @@ package org.apache.nifi.cluster.firewall.impl; import java.io.File; -import java.io.IOException; -import org.apache.nifi.util.file.FileUtils; -import org.junit.After; +import java.net.InetAddress; +import java.net.UnknownHostException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; public class FileBasedClusterNodeFirewallTest { @@ -38,21 +41,52 @@ public class FileBasedClusterNodeFirewallTest { private File restoreDirectory; + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + private static boolean badHostsDoNotResolve = false; + + /** + * We have tests that rely on known bad host/ip parameters; make sure DNS doesn't resolve them. + * This can be a problem i.e. on residential ISPs in the USA because the provider will often + * wildcard match all possible DNS names in an attempt to serve advertising. + */ + @BeforeClass + public static void ensureBadHostsDoNotWork() { + final InetAddress ip; + try { + ip = InetAddress.getByName("I typed a search term and my browser expected a host."); + } catch (final UnknownHostException uhe) { + badHostsDoNotResolve = true; + } + } + @Before public void setup() throws Exception { - ipsConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt"); - emptyConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt"); + ipsConfig = new File(getClass().getResource("/org/apache/nifi/cluster/firewall/impl/ips.txt").toURI()); + emptyConfig = new File(getClass().getResource("/org/apache/nifi/cluster/firewall/impl/empty.txt").toURI()); - restoreDirectory = new File(System.getProperty("java.io.tmpdir") + "/firewall_restore"); + restoreDirectory = temp.newFolder("firewall_restore"); ipsFirewall = new FileBasedClusterNodeFirewall(ipsConfig, restoreDirectory); acceptAllFirewall = new FileBasedClusterNodeFirewall(emptyConfig); } - @After - public void teardown() throws IOException { - deleteFile(restoreDirectory); + /** + * We have two garbage lines in our test config file, ensure they didn't get turned into hosts. + */ + @Test + public void ensureBadDataWasIgnored() { + assumeTrue(badHostsDoNotResolve); + assertFalse("firewall treated our malformed data as a host. If " + + "`host \"bad data should be skipped\"` works locally, this test should have been " + + "skipped.", + ipsFirewall.isPermissible("bad data should be skipped")); + assertFalse("firewall treated our malformed data as a host. If " + + "`host \"more bad data\"` works locally, this test should have been " + + "skipped.", + ipsFirewall.isPermissible("more bad data")); } @Test @@ -77,7 +111,10 @@ public class FileBasedClusterNodeFirewallTest { @Test public void testIsPermissibleWithMalformedData() { - assertFalse(ipsFirewall.isPermissible("abc")); + assumeTrue(badHostsDoNotResolve); + assertFalse("firewall allowed host 'abc' rather than rejecting as malformed. If `host abc` " + + "works locally, this test should have been skipped.", + ipsFirewall.isPermissible("abc")); } @Test @@ -87,14 +124,10 @@ public class FileBasedClusterNodeFirewallTest { @Test public void testIsPermissibleWithEmptyConfigWithMalformedData() { - assertTrue(acceptAllFirewall.isPermissible("abc")); - } - - private boolean deleteFile(final File file) { - if (file.isDirectory()) { - FileUtils.deleteFilesInDir(file, null, null, true, true); - } - return FileUtils.deleteFile(file, null, 10); + assumeTrue(badHostsDoNotResolve); + assertTrue("firewall did not allow malformed host 'abc' under permissive configs. If " + + "`host abc` works locally, this test should have been skipped.", + acceptAllFirewall.isPermissible("abc")); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java index 1a3fdb6..9e4237f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java @@ -18,6 +18,7 @@ package org.apache.nifi.cluster.protocol.impl; import java.io.IOException; import java.net.InetAddress; + import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; @@ -31,13 +32,17 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.io.socket.ServerSocketConfiguration; import org.apache.nifi.io.socket.SocketConfiguration; import org.junit.After; + import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; + import org.junit.Before; import org.junit.Test; + import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -57,22 +62,27 @@ public class ClusterManagerProtocolSenderImplTest { private ProtocolHandler mockHandler; @Before - public void setup() throws IOException { + public void setup() throws IOException, InterruptedException { address = InetAddress.getLocalHost(); - ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration(); + final ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration(); mockHandler = mock(ProtocolHandler.class); - ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + final ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); listener.addHandler(mockHandler); listener.start(); + // Need to be sure that we give the listener plenty of time to startup. Otherwise, we get intermittent + // test failures because the Thread started by listener.start() isn't ready to accept connections + // before we make them. + Thread.sleep(1000L); + port = listener.getPort(); - SocketConfiguration socketConfiguration = new SocketConfiguration(); + final SocketConfiguration socketConfiguration = new SocketConfiguration(); sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext); } @@ -85,12 +95,11 @@ public class ClusterManagerProtocolSenderImplTest { @Test public void testRequestFlow() throws Exception { - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage()); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); - FlowResponseMessage response = sender.requestFlow(request); + final FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port)); + final FlowResponseMessage response = sender.requestFlow(request); assertNotNull(response); } @@ -99,12 +108,12 @@ public class ClusterManagerProtocolSenderImplTest { when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + final FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port)); try { sender.requestFlow(request); fail("failed to throw exception"); - } catch (ProtocolException pe) { + } catch (final ProtocolException pe) { } } @@ -118,17 +127,17 @@ public class ClusterManagerProtocolSenderImplTest { when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() { @Override - public FlowResponseMessage answer(InvocationOnMock invocation) throws Throwable { + public FlowResponseMessage answer(final InvocationOnMock invocation) throws Throwable { Thread.sleep(time * 3); return new FlowResponseMessage(); } }); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + final FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port)); try { sender.requestFlow(request); fail("failed to throw exception"); - } catch (ProtocolException pe) { + } catch (final ProtocolException pe) { } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index e241112..6c655cb 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -53,6 +53,8 @@ import org.apache.nifi.admin.service.UserService; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.cluster.BulletinsPayload; import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.protocol.DataFlow; @@ -74,8 +76,8 @@ import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Size; import org.apache.nifi.connectable.StandardConnection; import org.apache.nifi.controller.exception.CommunicationsException; -import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.exception.ComponentLifeCycleException; +import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.StandardLabel; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; @@ -294,7 +296,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null); - private AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber; + private final AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber; // guarded by rwLock /** @@ -447,7 +449,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.protocolSender = protocolSender; try { this.templateManager = new TemplateManager(properties.getTemplateDirectory()); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } @@ -792,7 +794,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws NullPointerException if either arg is null * @throws ProcessorInstantiationException if the processor cannot be instantiated for any reason */ - public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException { + public ProcessorNode createProcessor(final String type, final String id) throws ProcessorInstantiationException { return createProcessor(type, id, true); } @@ -1506,7 +1508,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } if (config.getProperties() != null) { - for (Map.Entry<String, String> entry : config.getProperties().entrySet()) { + for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) { if (entry.getValue() != null) { procNode.setProperty(entry.getKey(), entry.getValue()); } @@ -1659,7 +1661,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R Set<RemoteProcessGroupPortDescriptor> remotePorts = null; if (ports != null) { remotePorts = new LinkedHashSet<>(ports.size()); - for (RemoteProcessGroupPortDTO port : ports) { + for (final RemoteProcessGroupPortDTO port : ports) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); descriptor.setId(port.getId()); descriptor.setName(port.getName()); @@ -3019,6 +3021,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R LOG.info("Setting primary flag from '" + this.primary + "' to '" + primary + "'"); + final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED; + final ProcessGroup rootGroup = getGroup(getRootGroupId()); + for (final ProcessorNode procNode : rootGroup.findAllProcessors()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); + } + for (final ControllerServiceNode serviceNode : getAllControllerServices()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); + } + for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); + } + // update primary this.primary = primary; eventDrivenWorkerQueue.setPrimary(primary); @@ -3078,7 +3092,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isInputAvailable() { try { return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier())); - } catch (IOException e) { + } catch (final IOException e) { return false; } } @@ -3087,7 +3101,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isOutputAvailable() { try { return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier())); - } catch (IOException e) { + } catch (final IOException e) { return false; } } @@ -3387,7 +3401,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final NodeProtocolSender protocolSender; private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US); - public BulletinsTask(NodeProtocolSender protocolSender) { + public BulletinsTask(final NodeProtocolSender protocolSender) { if (protocolSender == null) { throw new IllegalArgumentException("NodeProtocolSender may not be null."); } @@ -3543,7 +3557,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private class HeartbeatMessageGeneratorTask implements Runnable { - private AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>(); + private final AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>(); @Override public void run() { @@ -3610,7 +3624,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public List<ProvenanceEventRecord> getProvenanceEvents(long firstEventId, int maxRecords) throws IOException { + public List<ProvenanceEventRecord> getProvenanceEvents(final long firstEventId, final int maxRecords) throws IOException { return new ArrayList<ProvenanceEventRecord>(provenanceEventRepository.getEvents(firstEventId, maxRecords)); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java index f1fef15..2837d35 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.file.Files; +import java.nio.file.Path; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -41,7 +42,6 @@ import java.util.jar.Manifest; import org.apache.nifi.util.FileUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.StringUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,22 +61,34 @@ public final class NarUnpacker { }; public static ExtensionMapping unpackNars(final NiFiProperties props) { - final File narLibraryDir = props.getNarLibraryDirectory(); + final List<Path> narLibraryDirs = props.getNarLibraryDirectories(); final File frameworkWorkingDir = props.getFrameworkWorkingDirectory(); final File extensionsWorkingDir = props.getExtensionsWorkingDirectory(); final File docsWorkingDir = props.getComponentDocumentationWorkingDirectory(); try { + File unpackedFramework = null; + final Set<File> unpackedExtensions = new HashSet<>(); + final List<File> narFiles = new ArrayList<>(); + // make sure the nar directories are there and accessible - FileUtils.ensureDirectoryExistAndCanAccess(narLibraryDir); FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDir); FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDir); FileUtils.ensureDirectoryExistAndCanAccess(docsWorkingDir); - File unpackedFramework = null; - final Set<File> unpackedExtensions = new HashSet<>(); - final File[] narFiles = narLibraryDir.listFiles(NAR_FILTER); - if (narFiles != null) { + for (Path narLibraryDir : narLibraryDirs) { + + File narDir = narLibraryDir.toFile(); + FileUtils.ensureDirectoryExistAndCanAccess(narDir); + + File[] dirFiles = narDir.listFiles(NAR_FILTER); + if (dirFiles != null) { + List<File> fileList = Arrays.asList(dirFiles); + narFiles.addAll(fileList); + } + } + + if (!narFiles.isEmpty()) { for (File narFile : narFiles) { logger.debug("Expanding NAR file: " + narFile.getAbsolutePath()); @@ -91,7 +103,8 @@ public final class NarUnpacker { // determine if this is the framework if (NarClassLoaders.FRAMEWORK_NAR_ID.equals(narId)) { if (unpackedFramework != null) { - throw new IllegalStateException("Multiple framework NARs discovered. Only one framework is permitted."); + throw new IllegalStateException( + "Multiple framework NARs discovered. Only one framework is permitted."); } unpackedFramework = unpackNar(narFile, frameworkWorkingDir); @@ -108,7 +121,8 @@ public final class NarUnpacker { throw new IllegalStateException("Framework NAR cannot be read."); } - // Determine if any nars no longer exist and delete their working directories. This happens + // Determine if any nars no longer exist and delete their + // working directories. This happens // if a new version of a nar is dropped into the lib dir. // ensure no old framework are present final File[] frameworkWorkingDirContents = frameworkWorkingDir.listFiles(); @@ -131,7 +145,8 @@ public final class NarUnpacker { } } - // attempt to delete any docs files that exist so that any components that have been removed + // attempt to delete any docs files that exist so that any + // components that have been removed // will no longer have entries in the docs folder final File[] docsFiles = docsWorkingDir.listFiles(); if (docsFiles != null) { @@ -144,7 +159,8 @@ public final class NarUnpacker { mapExtensions(extensionsWorkingDir, docsWorkingDir, extensionMapping); return extensionMapping; } catch (IOException e) { - logger.warn("Unable to load NAR library bundles due to " + e + " Will proceed without loading any further Nar bundles"); + logger.warn("Unable to load NAR library bundles due to " + e + + " Will proceed without loading any further Nar bundles"); if (logger.isDebugEnabled()) { logger.warn("", e); } @@ -153,7 +169,8 @@ public final class NarUnpacker { return null; } - private static void mapExtensions(final File workingDirectory, final File docsDirectory, final ExtensionMapping mapping) throws IOException { + private static void mapExtensions(final File workingDirectory, final File docsDirectory, + final ExtensionMapping mapping) throws IOException { final File[] directoryContents = workingDirectory.listFiles(); if (directoryContents != null) { for (final File file : directoryContents) { @@ -169,19 +186,24 @@ public final class NarUnpacker { /** * Unpacks the specified nar into the specified base working directory. * - * @param nar the nar to unpack - * @param baseWorkingDirectory the directory to unpack to + * @param nar + * the nar to unpack + * @param baseWorkingDirectory + * the directory to unpack to * @return the directory to the unpacked NAR - * @throws IOException if unable to explode nar + * @throws IOException + * if unable to explode nar */ - private static File unpackNar(final File nar, final File baseWorkingDirectory) throws IOException { + private static File unpackNar(final File nar, final File baseWorkingDirectory) + throws IOException { final File narWorkingDirectory = new File(baseWorkingDirectory, nar.getName() + "-unpacked"); // if the working directory doesn't exist, unpack the nar if (!narWorkingDirectory.exists()) { unpack(nar, narWorkingDirectory, calculateMd5sum(nar)); } else { - // the working directory does exist. Run MD5 sum against the nar file and check if the nar has changed since it was deployed. + // the working directory does exist. Run MD5 sum against the nar + // file and check if the nar has changed since it was deployed. final byte[] narMd5 = calculateMd5sum(nar); final File workingHashFile = new File(narWorkingDirectory, HASH_FILENAME); if (!workingHashFile.exists()) { @@ -190,7 +212,8 @@ public final class NarUnpacker { } else { final byte[] hashFileContents = Files.readAllBytes(workingHashFile.toPath()); if (!Arrays.equals(hashFileContents, narMd5)) { - logger.info("Contents of nar {} have changed. Reloading.", new Object[]{nar.getAbsolutePath()}); + logger.info("Contents of nar {} have changed. Reloading.", + new Object[] { nar.getAbsolutePath() }); FileUtils.deleteFile(narWorkingDirectory, true); unpack(nar, narWorkingDirectory, narMd5); } @@ -204,11 +227,13 @@ public final class NarUnpacker { * Unpacks the NAR to the specified directory. Creates a checksum file that * used to determine if future expansion is necessary. * - * @param workingDirectory the root directory to which the NAR should be - * unpacked. - * @throws IOException if the NAR could not be unpacked. + * @param workingDirectory + * the root directory to which the NAR should be unpacked. + * @throws IOException + * if the NAR could not be unpacked. */ - private static void unpack(final File nar, final File workingDirectory, final byte[] hash) throws IOException { + private static void unpack(final File nar, final File workingDirectory, final byte[] hash) + throws IOException { try (JarFile jarFile = new JarFile(nar)) { Enumeration<JarEntry> jarEntries = jarFile.entries(); @@ -230,7 +255,8 @@ public final class NarUnpacker { } } - private static void unpackDocumentation(final File jar, final File docsDirectory, final ExtensionMapping extensionMapping) throws IOException { + private static void unpackDocumentation(final File jar, final File docsDirectory, + final ExtensionMapping extensionMapping) throws IOException { // determine the components that may have documentation determineDocumentedNiFiComponents(jar, extensionMapping); @@ -240,7 +266,8 @@ public final class NarUnpacker { final String entryName = "docs/" + componentName; // go through each entry in this jar - for (final Enumeration<JarEntry> jarEnumeration = jarFile.entries(); jarEnumeration.hasMoreElements();) { + for (final Enumeration<JarEntry> jarEnumeration = jarFile.entries(); jarEnumeration + .hasMoreElements();) { final JarEntry jarEntry = jarEnumeration.nextElement(); // if this entry is documentation for this component @@ -252,8 +279,10 @@ public final class NarUnpacker { final File componentDocsDirectory = new File(docsDirectory, name); // ensure the documentation directory can be created - if (!componentDocsDirectory.exists() && !componentDocsDirectory.mkdirs()) { - logger.warn("Unable to create docs directory " + componentDocsDirectory.getAbsolutePath()); + if (!componentDocsDirectory.exists() + && !componentDocsDirectory.mkdirs()) { + logger.warn("Unable to create docs directory " + + componentDocsDirectory.getAbsolutePath()); break; } } else { @@ -268,19 +297,27 @@ public final class NarUnpacker { } } - private static void determineDocumentedNiFiComponents(final File jar, final ExtensionMapping extensionMapping) throws IOException { + private static void determineDocumentedNiFiComponents(final File jar, + final ExtensionMapping extensionMapping) throws IOException { try (final JarFile jarFile = new JarFile(jar)) { - final JarEntry processorEntry = jarFile.getJarEntry("META-INF/services/org.apache.nifi.processor.Processor"); - final JarEntry reportingTaskEntry = jarFile.getJarEntry("META-INF/services/org.apache.nifi.reporting.ReportingTask"); - final JarEntry controllerServiceEntry = jarFile.getJarEntry("META-INF/services/org.apache.nifi.controller.ControllerService"); - - extensionMapping.addAllProcessors(determineDocumentedNiFiComponents(jarFile, processorEntry)); - extensionMapping.addAllReportingTasks(determineDocumentedNiFiComponents(jarFile, reportingTaskEntry)); - extensionMapping.addAllControllerServices(determineDocumentedNiFiComponents(jarFile, controllerServiceEntry)); + final JarEntry processorEntry = jarFile + .getJarEntry("META-INF/services/org.apache.nifi.processor.Processor"); + final JarEntry reportingTaskEntry = jarFile + .getJarEntry("META-INF/services/org.apache.nifi.reporting.ReportingTask"); + final JarEntry controllerServiceEntry = jarFile + .getJarEntry("META-INF/services/org.apache.nifi.controller.ControllerService"); + + extensionMapping.addAllProcessors(determineDocumentedNiFiComponents(jarFile, + processorEntry)); + extensionMapping.addAllReportingTasks(determineDocumentedNiFiComponents(jarFile, + reportingTaskEntry)); + extensionMapping.addAllControllerServices(determineDocumentedNiFiComponents(jarFile, + controllerServiceEntry)); } } - private static List<String> determineDocumentedNiFiComponents(final JarFile jarFile, final JarEntry jarEntry) throws IOException { + private static List<String> determineDocumentedNiFiComponents(final JarFile jarFile, + final JarEntry jarEntry) throws IOException { final List<String> componentNames = new ArrayList<>(); if (jarEntry == null) { @@ -288,13 +325,15 @@ public final class NarUnpacker { } try (final InputStream entryInputStream = jarFile.getInputStream(jarEntry); - final BufferedReader reader = new BufferedReader(new InputStreamReader(entryInputStream))) { + final BufferedReader reader = new BufferedReader(new InputStreamReader( + entryInputStream))) { String line; while ((line = reader.readLine()) != null) { final String trimmedLine = line.trim(); if (!trimmedLine.isEmpty() && !trimmedLine.startsWith("#")) { final int indexOfPound = trimmedLine.indexOf("#"); - final String effectiveLine = (indexOfPound > 0) ? trimmedLine.substring(0, indexOfPound) : trimmedLine; + final String effectiveLine = (indexOfPound > 0) ? trimmedLine.substring(0, + indexOfPound) : trimmedLine; componentNames.add(effectiveLine); } } @@ -307,12 +346,16 @@ public final class NarUnpacker { * Creates the specified file, whose contents will come from the * <tt>InputStream</tt>. * - * @param inputStream the contents of the file to create. - * @param file the file to create. - * @throws IOException if the file could not be created. + * @param inputStream + * the contents of the file to create. + * @param file + * the file to create. + * @throws IOException + * if the file could not be created. */ private static void makeFile(final InputStream inputStream, final File file) throws IOException { - try (final InputStream in = inputStream; final FileOutputStream fos = new FileOutputStream(file)) { + try (final InputStream in = inputStream; + final FileOutputStream fos = new FileOutputStream(file)) { byte[] bytes = new byte[65536]; int numRead; while ((numRead = in.read(bytes)) != -1) { @@ -324,9 +367,11 @@ public final class NarUnpacker { /** * Calculates an md5 sum of the specified file. * - * @param file to calculate the md5sum of + * @param file + * to calculate the md5sum of * @return the md5sum bytes - * @throws IOException if cannot read file + * @throws IOException + * if cannot read file */ private static byte[] calculateMd5sum(final File file) throws IOException { try (final FileInputStream inputStream = new FileInputStream(file)) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java new file mode 100644 index 0000000..fa3cd62 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java @@ -0,0 +1,185 @@ +package org.apache.nifi.nar; + +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +import org.apache.nifi.util.NiFiProperties; +import org.junit.BeforeClass; +import org.junit.Test; + +public class NarUnpackerTest { + + @BeforeClass + public static void copyResources() throws IOException { + + final Path sourcePath = Paths.get("./src/test/resources"); + final Path targetPath = Paths.get("./target"); + + Files.walkFileTree(sourcePath, new SimpleFileVisitor<Path>() { + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + + Path relativeSource = sourcePath.relativize(dir); + Path target = targetPath.resolve(relativeSource); + + Files.createDirectories(target); + + return FileVisitResult.CONTINUE; + + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + + Path relativeSource = sourcePath.relativize(file); + Path target = targetPath.resolve(relativeSource); + + Files.copy(file, target, REPLACE_EXISTING); + + return FileVisitResult.CONTINUE; + } + }); + } + + @Test + public void testUnpackNars() { + + NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties"); + + assertEquals("./target/NarUnpacker/lib/", + properties.getProperty("nifi.nar.library.directory")); + assertEquals("./target/NarUnpacker/lib2/", + properties.getProperty("nifi.nar.library.directory.alt")); + + final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties); + + assertEquals(2, extensionMapping.getAllExtensionNames().size()); + + assertTrue(extensionMapping.getAllExtensionNames().contains( + "org.apache.nifi.processors.dummy.one")); + assertTrue(extensionMapping.getAllExtensionNames().contains( + "org.apache.nifi.processors.dummy.two")); + + final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory(); + File[] extensionFiles = extensionsWorkingDir.listFiles(); + + assertEquals(2, extensionFiles.length); + assertEquals("dummy-one.nar-unpacked", extensionFiles[0].getName()); + assertEquals("dummy-two.nar-unpacked", extensionFiles[1].getName()); + } + + @Test + public void testUnpackNarsFromEmptyDir() throws IOException { + + NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties"); + + final File emptyDir = new File("./target/empty/dir"); + emptyDir.delete(); + emptyDir.deleteOnExit(); + assertTrue(emptyDir.mkdirs()); + + properties.setProperty("nifi.nar.library.directory.alt", emptyDir.toString()); + + final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties); + + assertEquals(1, extensionMapping.getAllExtensionNames().size()); + assertTrue(extensionMapping.getAllExtensionNames().contains( + "org.apache.nifi.processors.dummy.one")); + + final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory(); + File[] extensionFiles = extensionsWorkingDir.listFiles(); + + assertEquals(1, extensionFiles.length); + assertEquals("dummy-one.nar-unpacked", extensionFiles[0].getName()); + } + + @Test + public void testUnpackNarsFromNonExistantDir() { + + final File nonExistantDir = new File("./target/this/dir/should/not/exist/"); + nonExistantDir.delete(); + nonExistantDir.deleteOnExit(); + + NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties"); + properties.setProperty("nifi.nar.library.directory.alt", nonExistantDir.toString()); + + final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties); + + assertTrue(extensionMapping.getAllExtensionNames().contains( + "org.apache.nifi.processors.dummy.one")); + + assertEquals(1, extensionMapping.getAllExtensionNames().size()); + + final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory(); + File[] extensionFiles = extensionsWorkingDir.listFiles(); + + assertEquals(1, extensionFiles.length); + assertEquals("dummy-one.nar-unpacked", extensionFiles[0].getName()); + } + + @Test + public void testUnpackNarsFromNonDir() throws IOException { + + final File nonDir = new File("./target/file.txt"); + nonDir.createNewFile(); + nonDir.deleteOnExit(); + + NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties"); + properties.setProperty("nifi.nar.library.directory.alt", nonDir.toString()); + + final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties); + + assertNull(extensionMapping); + } + + private NiFiProperties loadSpecifiedProperties(String propertiesFile) { + String file = NarUnpackerTest.class.getResource(propertiesFile).getFile(); + + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, file); + + NiFiProperties properties = NiFiProperties.getInstance(); + + // clear out existing properties + for (String prop : properties.stringPropertyNames()) { + properties.remove(prop); + } + + InputStream inStream = null; + try { + inStream = new BufferedInputStream(new FileInputStream(file)); + properties.load(inStream); + } catch (final Exception ex) { + throw new RuntimeException("Cannot load properties file due to " + + ex.getLocalizedMessage(), ex); + } finally { + if (null != inStream) { + try { + inStream.close(); + } catch (final Exception ex) { + /** + * do nothing * + */ + } + } + } + + return properties; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties new file mode 100644 index 0000000..92e4a92 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Core Properties # +nifi.version=nifi-test 3.0.0 +nifi.flow.configuration.file=./target/flow.xml.gz +nifi.flow.configuration.archive.dir=./target/archive/ +nifi.flowcontroller.autoResumeState=true +nifi.flowcontroller.graceful.shutdown.period=10 sec +nifi.flowservice.writedelay.interval=2 sec +nifi.administrative.yield.duration=30 sec + +nifi.reporting.task.configuration.file=./target/reporting-tasks.xml +nifi.controller.service.configuration.file=./target/controller-services.xml +nifi.templates.directory=./target/templates +nifi.ui.banner.text=UI Banner Text +nifi.ui.autorefresh.interval=30 sec +nifi.nar.library.directory=./target/NarUnpacker/lib/ +nifi.nar.library.directory.alt=./target/NarUnpacker/lib2/ + +nifi.nar.working.directory=./target/work/nar/ + +# H2 Settings +nifi.database.directory=./target/database_repository +nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE + +# FlowFile Repository +nifi.flowfile.repository.directory=./target/test-repo +nifi.flowfile.repository.partitions=1 +nifi.flowfile.repository.checkpoint.interval=2 mins +nifi.queue.swap.threshold=20000 +nifi.swap.storage.directory=./target/test-repo/swap +nifi.swap.in.period=5 sec +nifi.swap.in.threads=1 +nifi.swap.out.period=5 sec +nifi.swap.out.threads=4 + +# Content Repository +nifi.content.claim.max.appendable.size=10 MB +nifi.content.claim.max.flow.files=100 +nifi.content.repository.directory.default=./target/content_repository + +# Provenance Repository Properties +nifi.provenance.repository.storage.directory=./target/provenance_repository +nifi.provenance.repository.max.storage.time=24 hours +nifi.provenance.repository.max.storage.size=1 GB +nifi.provenance.repository.rollover.time=5 mins +nifi.provenance.repository.rollover.size=100 MB + +# Site to Site properties +nifi.remote.input.socket.port=9990 +nifi.remote.input.secure=true + +# web properties # +nifi.web.war.directory=./target/lib +nifi.web.http.host= +nifi.web.http.port=8080 +nifi.web.https.host= +nifi.web.https.port= +nifi.web.jetty.working.directory=./target/work/jetty + +# security properties # +nifi.sensitive.props.key=key +nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL +nifi.sensitive.props.provider=BC + +nifi.security.keystore= +nifi.security.keystoreType= +nifi.security.keystorePasswd= +nifi.security.keyPasswd= +nifi.security.truststore= +nifi.security.truststoreType= +nifi.security.truststorePasswd= +nifi.security.needClientAuth= +nifi.security.authorizedUsers.file=./target/conf/authorized-users.xml +nifi.security.user.credential.cache.duration=24 hours +nifi.security.user.authority.provider=nifi.authorization.FileAuthorizationProvider +nifi.security.support.new.account.requests= +nifi.security.default.user.roles= + +# cluster common properties (cluster manager and nodes must have same values) # +nifi.cluster.protocol.heartbeat.interval=5 sec +nifi.cluster.protocol.is.secure=false +nifi.cluster.protocol.socket.timeout=30 sec +nifi.cluster.protocol.connection.handshake.timeout=45 sec +# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties must be configured # +nifi.cluster.protocol.use.multicast=false +nifi.cluster.protocol.multicast.address= +nifi.cluster.protocol.multicast.port= +nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms +nifi.cluster.protocol.multicast.service.locator.attempts=3 +nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec + +# cluster node properties (only configure for cluster nodes) # +nifi.cluster.is.node=false +nifi.cluster.node.address= +nifi.cluster.node.protocol.port= +nifi.cluster.node.protocol.threads=2 +# if multicast is not used, nifi.cluster.node.unicast.xxx must have same values as nifi.cluster.manager.xxx # +nifi.cluster.node.unicast.manager.address= +nifi.cluster.node.unicast.manager.protocol.port= +nifi.cluster.node.unicast.manager.authority.provider.port= + +# cluster manager properties (only configure for cluster manager) # +nifi.cluster.is.manager=false +nifi.cluster.manager.address= +nifi.cluster.manager.protocol.port= +nifi.cluster.manager.authority.provider.port= +nifi.cluster.manager.authority.provider.threads=10 +nifi.cluster.manager.node.firewall.file= +nifi.cluster.manager.node.event.history.size=10 +nifi.cluster.manager.node.api.connection.timeout=30 sec +nifi.cluster.manager.node.api.read.timeout=30 sec +nifi.cluster.manager.node.api.request.threads=10 +nifi.cluster.manager.flow.retrieval.delay=5 sec +nifi.cluster.manager.protocol.threads=10 +nifi.cluster.manager.safemode.duration=0 sec http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar new file mode 100644 index 0000000..598b27f Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar differ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/nifi-framework-nar.nar ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/nifi-framework-nar.nar b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/nifi-framework-nar.nar new file mode 100644 index 0000000..d2a8b96 Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib/nifi-framework-nar.nar differ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar new file mode 100644 index 0000000..a1021ba Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar differ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 982d9ff..773f9cf 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; public class StandardRemoteGroupPort extends RemoteGroupPort { - private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds + private static final long BATCH_SEND_NANOS = TimeUnit.MILLISECONDS.toNanos(500L); // send batches of up to 500 millis public static final String USER_AGENT = "NiFi-Site-to-Site"; public static final String CONTENT_TYPE = "application/octet-stream"; @@ -98,7 +98,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return targetRunning.get(); } - public void setTargetRunning(boolean targetRunning) { + public void setTargetRunning(final boolean targetRunning) { this.targetRunning.set(targetRunning); } @@ -126,12 +126,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { super.onSchedulingStart(); final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url(remoteGroup.getTargetUri().toString()) - .portIdentifier(getIdentifier()) - .sslContext(sslContext) - .eventReporter(remoteGroup.getEventReporter()) - .peerPersistenceFile(getPeerPersistenceFile(getIdentifier())) - .build(); + .url(remoteGroup.getTargetUri().toString()) + .portIdentifier(getIdentifier()) + .sslContext(sslContext) + .eventReporter(remoteGroup.getEventReporter()) + .peerPersistenceFile(getPeerPersistenceFile(getIdentifier())) + .build(); clientRef.set(client); } @@ -147,7 +147,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return; } - String url = getRemoteProcessGroup().getTargetUri().toString(); + final String url = getRemoteProcessGroup().getTargetUri().toString(); // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise, // we don't want to create a transaction at all. @@ -230,7 +230,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return remoteGroup.getYieldDuration(); } - private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException { + private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, final FlowFile firstFlowFile) throws IOException, ProtocolException { FlowFile flowFile = firstFlowFile; try { @@ -288,7 +288,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ - this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); + this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); return flowFilesSent.size(); } catch (final Exception e) { @@ -345,7 +345,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final String dataSize = FormatUtils.formatDataSize(bytesReceived); logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ - this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); + this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); } return flowFilesReceived.size(); @@ -367,16 +367,16 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { ValidationResult error = null; if (!targetExists.get()) { error = new ValidationResult.Builder() - .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) - .subject(String.format("Remote port '%s'", getName())) - .valid(false) - .build(); + .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) + .subject(String.format("Remote port '%s'", getName())) + .valid(false) + .build(); } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) { error = new ValidationResult.Builder() - .explanation(String.format("Port '%s' has no outbound connections", getName())) - .subject(String.format("Remote port '%s'", getName())) - .valid(false) - .build(); + .explanation(String.format("Port '%s' has no outbound connections", getName())) + .subject(String.format("Remote port '%s'", getName())) + .valid(false) + .build(); } if (error != null) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index 6d89cbf..ea36abd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -257,7 +257,7 @@ public final class SnippetUtils { final PropertyDescriptor descriptor = entry.getKey(); final String propertyValue = entry.getValue(); - if (descriptor.getControllerServiceDefinition() != null) { + if (descriptor.getControllerServiceDefinition() != null && propertyValue != null) { final ControllerServiceNode referencedNode = flowController.getControllerServiceNode(propertyValue); if (referencedNode == null) { throw new IllegalStateException("Controller Service with ID " + propertyValue + " is referenced in template but cannot be found"); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index 3ff1e88..ede32ab 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -42,8 +42,12 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> - </dependency> - <dependency> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 3294ead..355950f 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -70,7 +70,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { // variables shared by all threads of this processor // Hadoop Configuration and FileSystem - protected final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>(); + private final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>(); @Override protected void init(ProcessorInitializationContext context) { @@ -153,7 +153,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); config.set(disableCacheName, "true"); - final FileSystem fs = FileSystem.get(config); + final FileSystem fs = getFileSystem(config); getLogger().info( "Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), @@ -165,6 +165,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } } + /** + * This exists in order to allow unit tests to override it so that they don't take several minutes waiting + * for UDP packets to be received + * + * @param config the configuration to use + * @return the FileSystem that is created for the given Configuration + * @throws IOException if unable to create the FileSystem + */ + protected FileSystem getFileSystem(final Configuration config) throws IOException { + return FileSystem.get(config); + } + /* * Drastically reduce the timeout of a socket connection from the default in FileSystem.get() */ @@ -216,4 +228,39 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { }; } + + /** + * Returns the relative path of the child that does not include the filename + * or the root path. + * @param root the path to relativize from + * @param child the path to relativize + * @return the relative path + */ + public static String getPathDifference(final Path root, final Path child) { + final int depthDiff = child.depth() - root.depth(); + if (depthDiff <= 1) { + return "".intern(); + } + String lastRoot = root.getName(); + Path childsParent = child.getParent(); + final StringBuilder builder = new StringBuilder(); + builder.append(childsParent.getName()); + for (int i = (depthDiff - 3); i >= 0; i--) { + childsParent = childsParent.getParent(); + String name = childsParent.getName(); + if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) { + break; + } + builder.insert(0, Path.SEPARATOR).insert(0, name); + } + return builder.toString(); + } + + protected Configuration getConfiguration() { + return hdfsResources.get().getKey(); + } + + protected FileSystem getFileSystem() { + return hdfsResources.get().getValue(); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index f462277..186a290 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -156,7 +156,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf"; flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName); try { - flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType); + flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType); session.transfer(flowFile, RELATIONSHIP_SUCCESS); getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS}); } catch (ProcessException e) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java new file mode 100644 index 0000000..4a52fb7 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.AccessControlException; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; + +@SupportsBatching +@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"}) +@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. " + + "The file in HDFS is left intact without any changes being made to it.") +@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " + + "not be fetched from HDFS") +@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class}) +public class FetchHDFS extends AbstractHadoopProcessor { + static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() + .name("HDFS Filename") + .description("The name of the HDFS file to retrieve") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${path}/${filename}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. " + + "This would occur, for instance, if the file is not found or if there is a permissions issue") + .build(); + static final Relationship REL_COMMS_FAILURE = new Relationship.Builder() + .name("comms.failure") + .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. " + + "This generally indicates that the Fetch should be tried again.") + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HADOOP_CONFIGURATION_RESOURCES); + properties.add(FILENAME); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_COMMS_FAILURE); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final FileSystem hdfs = getFileSystem(); + final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue()); + final URI uri = path.toUri(); + + final StopWatch stopWatch = new StopWatch(true); + try (final FSDataInputStream inStream = hdfs.open(path, 16384)) { + flowFile = session.importFrom(inStream, flowFile); + stopWatch.stop(); + getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); + session.getProvenanceReporter().modifyContent(flowFile, "Fetched content from " + uri, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final FileNotFoundException | AccessControlException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e}); + flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } catch (final IOException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_COMMS_FAILURE); + } + } + +}
