http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java index 8bb25c8..633cce2 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java @@ -19,6 +19,8 @@ package org.apache.nifi.minifi.bootstrap.util; import org.apache.nifi.controller.FlowSerializationException; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; +import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.w3c.dom.DOMException; import org.w3c.dom.Document; import org.w3c.dom.Element; @@ -37,6 +39,8 @@ import javax.xml.transform.stream.StreamResult; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; @@ -151,11 +155,19 @@ public final class ConfigTransformer { // Verify the parsed object is a Map structure if (loadedObject instanceof Map) { final Map<String, Object> result = (Map<String, Object>) loadedObject; + + // Create nifi.properties and flow.xml.gz in memory + ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream(); + writeNiFiProperties(result, nifiPropertiesOutputStream); + + DOMSource flowXml = createFlowXml(result); + // Write nifi.properties and flow.xml.gz - writeNiFiProperties(result, destPath); - writeFlowXml(result, destPath); + writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath); + + writeFlowXmlFile(flowXml, destPath); } else { - throw new IllegalArgumentException("Provided YAML configuration is malformed."); + throw new IllegalArgumentException("Provided YAML configuration is not a Map."); } } finally { if (sourceStream != null) { @@ -164,20 +176,49 @@ public final class ConfigTransformer { } } - private static void writeNiFiProperties(Map<String, Object> topLevelYaml, String path) throws FileNotFoundException, UnsupportedEncodingException { + private static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException { + try { + final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties"); + FileOutputStream nifiProperties = new FileOutputStream(new File(nifiPropertiesPath.toString())); + nifiProperties.write(nifiPropertiesOutputStream.getUnderlyingBuffer()); + } finally { + if (nifiPropertiesOutputStream != null){ + nifiPropertiesOutputStream.flush(); + nifiPropertiesOutputStream.close(); + } + } + } + + private static void writeFlowXmlFile(DOMSource domSource, String path) throws IOException, TransformerException { + + final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz")); + final OutputStream outStream = new GZIPOutputStream(fileOut); + final StreamResult streamResult = new StreamResult(outStream); + + // configure the transformer and convert the DOM + final TransformerFactory transformFactory = TransformerFactory.newInstance(); + final Transformer transformer = transformFactory.newTransformer(); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + + // transform the document to byte stream + transformer.transform(domSource, streamResult); + outStream.flush(); + outStream.close(); + } + + private static void writeNiFiProperties(Map<String, Object> topLevelYaml, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException { PrintWriter writer = null; try { - final Path nifiPropertiesPath = Paths.get(path, "nifi.properties"); - writer = new PrintWriter(nifiPropertiesPath.toFile(), "UTF-8"); + writer = new PrintWriter(outputStream, true); - Map<String,Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY); - Map<String,Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY); + Map<String, Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY); + Map<String, Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY); Map<String, Object> swapProperties = (Map<String, Object>) flowfileRepo.get(SWAP_PROPS_KEY); - Map<String,Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY); - Map<String,Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY); - Map<String,Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY); - Map<String,Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY); - + Map<String, Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY); + Map<String, Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY); + Map<String, Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY); + Map<String, Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY); writer.print(PROPERTIES_FILE_APACHE_2_0_LICENSE); writer.println("# Core Properties #"); @@ -284,6 +325,8 @@ public final class ConfigTransformer { writer.println(); writer.println("# cluster manager properties (only configure for cluster manager) #"); writer.println("nifi.cluster.is.manager=false"); + } catch (NullPointerException e) { + throw new ConfigurationChangeException("Failed to parse the config YAML while creating the nifi.properties", e); } finally { if (writer != null){ writer.flush(); @@ -291,7 +334,7 @@ public final class ConfigTransformer { } } } - private static void writeFlowXml(Map<String, Object> topLevelYaml, String path) throws Exception { + private static DOMSource createFlowXml(Map<String, Object> topLevelYaml) throws IOException, ConfigurationChangeException { try { // create a new, empty document final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); @@ -304,39 +347,32 @@ public final class ConfigTransformer { final Element rootNode = doc.createElement("flowController"); doc.appendChild(rootNode); Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY); - addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY)); - addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY)); + addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1")); + addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1")); addProcessGroup(rootNode, topLevelYaml, "rootGroup"); Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY); - String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY); - if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) { - final Element controllerServicesNode = doc.createElement("controllerServices"); - rootNode.appendChild(controllerServicesNode); - addSSLControllerService(controllerServicesNode, securityProps); + if (securityProps != null) { + String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY); + if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) { + final Element controllerServicesNode = doc.createElement("controllerServices"); + rootNode.appendChild(controllerServicesNode); + addSSLControllerService(controllerServicesNode, securityProps); + } + } + + Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY); + if (provenanceProperties.get(SCHEDULING_STRATEGY_KEY) != null) { + final Element reportingTasksNode = doc.createElement("reportingTasks"); + rootNode.appendChild(reportingTasksNode); + addProvenanceReportingTask(reportingTasksNode, topLevelYaml); } - final Element reportingTasksNode = doc.createElement("reportingTasks"); - rootNode.appendChild(reportingTasksNode); - addProvenanceReportingTask(reportingTasksNode, topLevelYaml); - - final DOMSource domSource = new DOMSource(doc); - final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz")); - final OutputStream outStream = new GZIPOutputStream(fileOut); - final StreamResult streamResult = new StreamResult(outStream); - - // configure the transformer and convert the DOM - final TransformerFactory transformFactory = TransformerFactory.newInstance(); - final Transformer transformer = transformFactory.newTransformer(); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - - // transform the document to byte stream - transformer.transform(domSource, streamResult); - outStream.flush(); - outStream.close(); - } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) { + return new DOMSource(doc); + } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) { throw new FlowSerializationException(e); + } catch (Exception e){ + throw new ConfigurationChangeException("Failed to parse the config YAML while writing the top level of the flow xml", e); } } @@ -345,109 +381,140 @@ public final class ConfigTransformer { return value == null ? "" : value.toString(); } - private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) { - final Element serviceElement = element.getOwnerDocument().createElement("controllerService"); - addTextElement(serviceElement, "id", "SSL-Context-Service"); - addTextElement(serviceElement, "name", "SSL-Context-Service"); - addTextElement(serviceElement, "comment", ""); - addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService"); - - addTextElement(serviceElement, "enabled", "true"); - - Map<String, Object> attributes = new HashMap<>(); - attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY)); - attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY)); - attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY)); - attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY)); - attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY)); - attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY)); - attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY)); - - addConfiguration(serviceElement, attributes); + private static <K> String getValueString(Map<K,Object> map, K key, String theDefault){ + Object value = null; + if (map != null){ + value = map.get(key); + } + return value == null ? theDefault : value.toString(); + } - element.appendChild(serviceElement); + private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) throws ConfigurationChangeException { + try { + final Element serviceElement = element.getOwnerDocument().createElement("controllerService"); + addTextElement(serviceElement, "id", "SSL-Context-Service"); + addTextElement(serviceElement, "name", "SSL-Context-Service"); + addTextElement(serviceElement, "comment", ""); + addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService"); + + addTextElement(serviceElement, "enabled", "true"); + + Map<String, Object> attributes = new HashMap<>(); + attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY)); + attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY)); + attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY)); + attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY)); + attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY)); + attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY)); + attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY)); + + addConfiguration(serviceElement, attributes); + + element.appendChild(serviceElement); + } catch (Exception e){ + throw new ConfigurationChangeException("Failed to parse the config YAML while trying to create an SSL Controller Service", e); + } } - private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) { - Map<String,Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY); + private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) throws ConfigurationChangeException { + try { + Map<String, Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY); - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement(elementName); - parentElement.appendChild(element); - addTextElement(element, "id", "Root-Group"); - addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY) ); - addPosition(element); - addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY)); + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement(elementName); + parentElement.appendChild(element); + addTextElement(element, "id", "Root-Group"); + addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY)); + addPosition(element); + addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY)); - Map<String,Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY); - addProcessor(element, processorConfig); + Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY); + addProcessor(element, processorConfig); - Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY); - addRemoteProcessGroup(element, remoteProcessingGroup); + Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY); + addRemoteProcessGroup(element, remoteProcessingGroup); - addConnection(element, topLevelYaml); + addConnection(element, topLevelYaml); + } catch (ConfigurationChangeException e){ + throw e; + } catch (Exception e){ + throw new ConfigurationChangeException("Failed to parse the config YAML while trying to creating the root Process Group", e); + } } - private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) { + private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) throws ConfigurationChangeException { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement("processor"); - parentElement.appendChild(element); - addTextElement(element, "id", "Processor"); - addTextElement(element, "name", getValueString(processorConfig, NAME_KEY)); - - addPosition(element); - addStyle(element); - - addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY)); - addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY)); - addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY)); - addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY)); - addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY)); - addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY)); - addTextElement(element, "bulletinLevel", "WARN"); - addTextElement(element, "lossTolerant", "false"); - addTextElement(element, "scheduledState", "RUNNING"); - addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY)); - addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY)); - - addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY)); - - Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY); - if (autoTerminatedRelationships != null) { - for (String rel : autoTerminatedRelationships) { - addTextElement(element, "autoTerminatedRelationship", rel); + try { + if (processorConfig.get(CLASS_KEY) == null) { + // Only add a processor if it has a class + return; + } + + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement("processor"); + parentElement.appendChild(element); + addTextElement(element, "id", "Processor"); + addTextElement(element, "name", getValueString(processorConfig, NAME_KEY)); + + addPosition(element); + addStyle(element); + + addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY)); + addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY)); + addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY)); + addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY)); + addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY)); + addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY)); + addTextElement(element, "bulletinLevel", "WARN"); + addTextElement(element, "lossTolerant", "false"); + addTextElement(element, "scheduledState", "RUNNING"); + addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY)); + addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY)); + + addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY)); + + Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY); + if (autoTerminatedRelationships != null) { + for (String rel : autoTerminatedRelationships) { + addTextElement(element, "autoTerminatedRelationship", rel); + } } + } catch (Exception e){ + throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Processor", e); } } - private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) { - Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY); - final Element taskElement = element.getOwnerDocument().createElement("reportingTask"); - addTextElement(taskElement, "id", "Provenance-Reporting"); - addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting"); - addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY)); - addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask"); - addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY)); - addTextElement(taskElement, "scheduledState", "RUNNING"); - addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY)); - - Map<String, Object> attributes = new HashMap<>(); - attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY)); - attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY)); - attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY)); - attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY)); - attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY)); - - Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY); - String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY); - if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) { - attributes.put("SSL Context Service", "SSL-Context-Service"); - } + private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) throws ConfigurationChangeException { + try { + Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY); + final Element taskElement = element.getOwnerDocument().createElement("reportingTask"); + addTextElement(taskElement, "id", "Provenance-Reporting"); + addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting"); + addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY)); + addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask"); + addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY)); + addTextElement(taskElement, "scheduledState", "RUNNING"); + addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY)); + + Map<String, Object> attributes = new HashMap<>(); + attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY)); + attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY)); + attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY)); + attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY)); + attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY)); + + Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY); + String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY); + if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) { + attributes.put("SSL Context Service", "SSL-Context-Service"); + } - addConfiguration(taskElement, attributes); + addConfiguration(taskElement, attributes); - element.appendChild(taskElement); + element.appendChild(taskElement); + } catch (Exception e){ + throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e); + } } private static void addConfiguration(final Element element, Map<String, Object> elementConfig) { @@ -472,75 +539,103 @@ public final class ConfigTransformer { parentElement.appendChild(element); } - private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) { - - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement("remoteProcessGroup"); - parentElement.appendChild(element); - addTextElement(element, "id", "Remote-Process-Group"); - addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY)); - addPosition(element); - addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY)); - addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY)); - addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY)); - addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY)); - addTextElement(element, "transmitting", "true"); - - Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY); - addRemoteGroupPort(element, inputPort, "inputPort"); + private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) throws ConfigurationChangeException { + try { + if (remoteProcessingGroup.get(URL_KEY) == null) { + // Only add an an RPG if it has a URL + return; + } - parentElement.appendChild(element); + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement("remoteProcessGroup"); + parentElement.appendChild(element); + addTextElement(element, "id", "Remote-Process-Group"); + addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY)); + addPosition(element); + addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY)); + addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY)); + addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY)); + addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY)); + addTextElement(element, "transmitting", "true"); + + Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY); + addRemoteGroupPort(element, inputPort, "inputPort"); + + parentElement.appendChild(element); + } catch (Exception e){ + throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Remote Process Group", e); + } } - private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement(elementName); - parentElement.appendChild(element); - addTextElement(element, "id", getValueString(inputPort, ID_KEY)); - addTextElement(element, "name", getValueString(inputPort, NAME_KEY)); - addPosition(element); - addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY)); - addTextElement(element, "scheduledState", "RUNNING"); - addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY)); - addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY)); + private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) throws ConfigurationChangeException { - parentElement.appendChild(element); + try { + if (inputPort.get(ID_KEY) == null) { + // Only add an input port if it has an ID + return; + } + + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement(elementName); + parentElement.appendChild(element); + addTextElement(element, "id", getValueString(inputPort, ID_KEY)); + addTextElement(element, "name", getValueString(inputPort, NAME_KEY)); + addPosition(element); + addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY)); + addTextElement(element, "scheduledState", "RUNNING"); + addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY)); + addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY)); + + parentElement.appendChild(element); + } catch (Exception e){ + throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the input port of the Remote Process Group", e); + } } - private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) { - Map<String,Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY); - Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY); - Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY); - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement("connection"); - parentElement.appendChild(element); - addTextElement(element, "id", "Connection"); - addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY)); + private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) throws ConfigurationChangeException { + try { + Map<String, Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY); + Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY); + Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY); + Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY); - final Element bendPointsElement = doc.createElement("bendPoints"); - element.appendChild(bendPointsElement); + if (inputPort.get(ID_KEY) == null || processorConfig.get(CLASS_KEY) == null) { + // Only add the connection if the input port and processor config are created + return; + } - addTextElement(element, "labelIndex", "1"); - addTextElement(element, "zIndex", "0"); + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement("connection"); + parentElement.appendChild(element); + addTextElement(element, "id", "Connection"); + addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY)); - addTextElement(element, "sourceId", "Processor"); - addTextElement(element, "sourceGroupId", "Root-Group"); - addTextElement(element, "sourceType", "PROCESSOR"); + final Element bendPointsElement = doc.createElement("bendPoints"); + element.appendChild(bendPointsElement); - addTextElement(element, "destinationId", getValueString(inputPort,ID_KEY)); - addTextElement(element, "destinationGroupId", "Remote-Process-Group"); - addTextElement(element, "destinationType", "REMOTE_INPUT_PORT"); + addTextElement(element, "labelIndex", "1"); + addTextElement(element, "zIndex", "0"); - addTextElement(element, "relationship", "success"); + addTextElement(element, "sourceId", "Processor"); + addTextElement(element, "sourceGroupId", "Root-Group"); + addTextElement(element, "sourceType", "PROCESSOR"); - addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY)); - addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY)); + addTextElement(element, "destinationId", getValueString(inputPort, ID_KEY)); + addTextElement(element, "destinationGroupId", "Remote-Process-Group"); + addTextElement(element, "destinationType", "REMOTE_INPUT_PORT"); - addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY)); - addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY)); + addTextElement(element, "relationship", "success"); + addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY)); + addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY)); - parentElement.appendChild(element); + addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY)); + addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY)); + + parentElement.appendChild(element); + } catch (Exception e){ + throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the connection from the Processor to the input port of the Remote Process Group", e); + } } private static void addPosition(final Element parentElement) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java deleted file mode 100644 index 9432a2f..0000000 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.minifi.bootstrap.configuration; - -import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.InputStream; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -public class TestFileChangeNotifier { - - private static final String CONFIG_FILENAME = "config.yml"; - private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml"; - - private FileChangeNotifier notifierSpy; - private WatchService mockWatchService; - private Properties testProperties; - - @Before - public void setUp() throws Exception { - mockWatchService = Mockito.mock(WatchService.class); - notifierSpy = Mockito.spy(new FileChangeNotifier()); - notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH)); - notifierSpy.setWatchService(mockWatchService); - - testProperties = new Properties(); - testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH); - testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL); - } - - @After - public void tearDown() throws Exception { - notifierSpy.close(); - } - - @Test(expected = IllegalStateException.class) - public void testInitialize_invalidFile() throws Exception { - testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe"); - notifierSpy.initialize(testProperties); - } - - @Test - public void testInitialize_validFile() throws Exception { - notifierSpy.initialize(testProperties); - } - - @Test(expected = IllegalStateException.class) - public void testInitialize_invalidPollingPeriod() throws Exception { - testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc"); - notifierSpy.initialize(testProperties); - } - - @Test - public void testInitialize_useDefaultPolling() throws Exception { - testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY); - notifierSpy.initialize(testProperties); - } - - - @Test - public void testNotifyListeners() throws Exception { - final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class); - boolean wasRegistered = notifierSpy.registerListener(testListener); - - Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered); - Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); - - notifierSpy.notifyListeners(); - - verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class)); - } - - @Test - public void testRegisterListener() throws Exception { - final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class); - boolean wasRegistered = notifierSpy.registerListener(firstListener); - - Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered); - Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); - - final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class); - wasRegistered = notifierSpy.registerListener(secondListener); - Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2); - - } - - @Test - public void testRegisterDuplicateListener() throws Exception { - final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class); - boolean wasRegistered = notifierSpy.registerListener(firstListener); - - Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered); - Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); - - wasRegistered = notifierSpy.registerListener(firstListener); - - Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); - Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered); - } - - /* Verify handleChange events */ - @Test - public void testTargetChangedNoModification() throws Exception { - final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class); - - // In this case the WatchKey is null because there were no events found - establishMockEnvironmentForChangeTests(testListener, null); - - verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class)); - } - - @Test - public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception { - final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class); - - // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored - final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml"); - - establishMockEnvironmentForChangeTests(testListener, mockWatchKey); - - notifierSpy.targetChanged(); - - verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class)); - } - - @Test - public void testTargetChangedWithModificationEvent() throws Exception { - final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class); - - final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME); - // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class - establishMockEnvironmentForChangeTests(testListener, mockWatchKey); - - // Invoke the method of interest - notifierSpy.run(); - - verify(mockWatchService, Mockito.atLeastOnce()).poll(); - verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class)); - } - - /* Helper methods to establish mock environment */ - private WatchKey createMockWatchKeyForPath(String configFilePath) { - final WatchKey mockWatchKey = Mockito.mock(WatchKey.class); - final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class); - when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents); - when(mockWatchKey.reset()).thenReturn(true); - - final Iterator mockIterator = Mockito.mock(Iterator.class); - when(mockWatchEvents.iterator()).thenReturn(mockIterator); - - final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class); - when(mockIterator.hasNext()).thenReturn(true, false); - when(mockIterator.next()).thenReturn(mockWatchEvent); - - // In this case, we receive a trigger event for the directory monitored, and it was the file monitored - when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath)); - when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY); - - return mockWatchKey; - } - - private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception { - final boolean wasRegistered = notifierSpy.registerListener(listener); - - // Establish the file mock and its parent directory - final Path mockConfigFilePath = Mockito.mock(Path.class); - final Path mockConfigFileParentPath = Mockito.mock(Path.class); - - // When getting the parent of the file, get the directory - when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath); - - Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered); - Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); - - when(mockWatchService.poll()).thenReturn(watchKey); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java deleted file mode 100644 index 75b44e3..0000000 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.minifi.bootstrap.configuration; - - -import com.squareup.okhttp.OkHttpClient; -import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon; -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import java.net.MalformedURLException; -import java.util.Properties; - - -public class TestRestChangeNotifier extends TestRestChangeNotifierCommon { - - @BeforeClass - public static void setUp() throws InterruptedException, MalformedURLException { - Properties properties = new Properties(); - restChangeNotifier = new RestChangeNotifier(); - restChangeNotifier.initialize(properties); - restChangeNotifier.registerListener(mockChangeListener); - restChangeNotifier.start(); - - client = new OkHttpClient(); - - url = restChangeNotifier.getURI().toURL().toString(); - Thread.sleep(1000); - } - - @AfterClass - public static void stop() throws Exception { - restChangeNotifier.close(); - client = null; - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java deleted file mode 100644 index 908e693..0000000 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.minifi.bootstrap.configuration; - - -import com.squareup.okhttp.OkHttpClient; -import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon; -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; -import java.io.IOException; -import java.security.KeyManagementException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.Properties; - - -public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon { - - - @BeforeClass - public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException { - Properties properties = new Properties(); - properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks"); - properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest"); - properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS"); - properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks"); - properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest"); - properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS"); - properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true"); - restChangeNotifier = new RestChangeNotifier(); - restChangeNotifier.initialize(properties); - restChangeNotifier.registerListener(mockChangeListener); - restChangeNotifier.start(); - - client = new OkHttpClient(); - - SSLContext sslContext = SSLContext.getInstance("TLS"); - TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks")); - - KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray()); - - sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom()); - client.setSslSocketFactory(sslContext.getSocketFactory()); - - url = restChangeNotifier.getURI().toURL().toString(); - Thread.sleep(1000); - } - - @AfterClass - public static void stop() throws Exception { - restChangeNotifier.close(); - client = null; - } - - private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException { - KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); - - char[] password = "localtest".toCharArray(); - - java.io.FileInputStream fis = null; - try { - fis = new java.io.FileInputStream(path); - ks.load(fis, password); - } finally { - if (fis != null) { - fis.close(); - } - } - return ks; - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java new file mode 100644 index 0000000..145c2fe --- /dev/null +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.configuration.notifiers; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.InputStream; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; +import org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestFileChangeNotifier { + + private static final String CONFIG_FILENAME = "config.yml"; + private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml"; + + private FileChangeNotifier notifierSpy; + private WatchService mockWatchService; + private Properties testProperties; + + @Before + public void setUp() throws Exception { + mockWatchService = Mockito.mock(WatchService.class); + notifierSpy = Mockito.spy(new FileChangeNotifier()); + notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH)); + notifierSpy.setWatchService(mockWatchService); + + testProperties = new Properties(); + testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH); + testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL); + } + + @After + public void tearDown() throws Exception { + notifierSpy.close(); + } + + @Test(expected = IllegalStateException.class) + public void testInitialize_invalidFile() throws Exception { + testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe"); + notifierSpy.initialize(testProperties); + } + + @Test + public void testInitialize_validFile() throws Exception { + notifierSpy.initialize(testProperties); + } + + @Test(expected = IllegalStateException.class) + public void testInitialize_invalidPollingPeriod() throws Exception { + testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc"); + notifierSpy.initialize(testProperties); + } + + @Test + public void testInitialize_useDefaultPolling() throws Exception { + testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY); + notifierSpy.initialize(testProperties); + } + + + @Test + public void testNotifyListeners() throws Exception { + final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class); + boolean wasRegistered = notifierSpy.registerListener(testListener); + + Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered); + Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); + + notifierSpy.notifyListeners(); + + verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class)); + } + + @Test + public void testRegisterListener() throws Exception { + final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class); + boolean wasRegistered = notifierSpy.registerListener(firstListener); + + Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered); + Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); + + final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class); + wasRegistered = notifierSpy.registerListener(secondListener); + Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2); + + } + + @Test + public void testRegisterDuplicateListener() throws Exception { + final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class); + boolean wasRegistered = notifierSpy.registerListener(firstListener); + + Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered); + Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); + + wasRegistered = notifierSpy.registerListener(firstListener); + + Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); + Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered); + } + + /* Verify handleChange events */ + @Test + public void testTargetChangedNoModification() throws Exception { + final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class); + + // In this case the WatchKey is null because there were no events found + establishMockEnvironmentForChangeTests(testListener, null); + + verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class)); + } + + @Test + public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception { + final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class); + + // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored + final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml"); + + establishMockEnvironmentForChangeTests(testListener, mockWatchKey); + + notifierSpy.targetChanged(); + + verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class)); + } + + @Test + public void testTargetChangedWithModificationEvent() throws Exception { + final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class); + + final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME); + // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class + establishMockEnvironmentForChangeTests(testListener, mockWatchKey); + + // Invoke the method of interest + notifierSpy.run(); + + verify(mockWatchService, Mockito.atLeastOnce()).poll(); + verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class)); + } + + /* Helper methods to establish mock environment */ + private WatchKey createMockWatchKeyForPath(String configFilePath) { + final WatchKey mockWatchKey = Mockito.mock(WatchKey.class); + final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class); + when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents); + when(mockWatchKey.reset()).thenReturn(true); + + final Iterator mockIterator = Mockito.mock(Iterator.class); + when(mockWatchEvents.iterator()).thenReturn(mockIterator); + + final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class); + when(mockIterator.hasNext()).thenReturn(true, false); + when(mockIterator.next()).thenReturn(mockWatchEvent); + + // In this case, we receive a trigger event for the directory monitored, and it was the file monitored + when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath)); + when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY); + + return mockWatchKey; + } + + private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception { + final boolean wasRegistered = notifierSpy.registerListener(listener); + + // Establish the file mock and its parent directory + final Path mockConfigFilePath = Mockito.mock(Path.class); + final Path mockConfigFileParentPath = Mockito.mock(Path.class); + + // When getting the parent of the file, get the directory + when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath); + + Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered); + Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1); + + when(mockWatchService.poll()).thenReturn(watchKey); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java new file mode 100644 index 0000000..1cd37fd --- /dev/null +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.bootstrap.configuration.notifiers; + + +import com.squareup.okhttp.OkHttpClient; +import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.net.MalformedURLException; +import java.util.Properties; + + +public class TestRestChangeNotifier extends TestRestChangeNotifierCommon { + + @BeforeClass + public static void setUp() throws InterruptedException, MalformedURLException { + Properties properties = new Properties(); + restChangeNotifier = new RestChangeNotifier(); + restChangeNotifier.initialize(properties); + restChangeNotifier.registerListener(mockChangeListener); + restChangeNotifier.start(); + + client = new OkHttpClient(); + + url = restChangeNotifier.getURI().toURL().toString(); + Thread.sleep(1000); + } + + @AfterClass + public static void stop() throws Exception { + restChangeNotifier.close(); + client = null; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java new file mode 100644 index 0000000..6073a6f --- /dev/null +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.bootstrap.configuration.notifiers; + + +import com.squareup.okhttp.OkHttpClient; +import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.Properties; + + +public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon { + + + @BeforeClass + public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException { + Properties properties = new Properties(); + properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks"); + properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest"); + properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS"); + properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks"); + properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest"); + properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS"); + properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true"); + restChangeNotifier = new RestChangeNotifier(); + restChangeNotifier.initialize(properties); + restChangeNotifier.registerListener(mockChangeListener); + restChangeNotifier.start(); + + client = new OkHttpClient(); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks")); + + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray()); + + sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom()); + client.setSslSocketFactory(sslContext.getSocketFactory()); + + url = restChangeNotifier.getURI().toURL().toString(); + Thread.sleep(1000); + } + + @AfterClass + public static void stop() throws Exception { + restChangeNotifier.close(); + client = null; + } + + private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException { + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + + char[] password = "localtest".toCharArray(); + + java.io.FileInputStream fis = null; + try { + fis = new java.io.FileInputStream(path); + ks.load(fis, password); + } finally { + if (fis != null) { + fis.close(); + } + } + return ks; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java new file mode 100644 index 0000000..eae5872 --- /dev/null +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; + +import java.io.IOException; +import java.io.InputStream; + +public class MockChangeListener implements ConfigurationChangeListener { + String confFile; + + @Override + public void handleChange(InputStream inputStream) { + try { + confFile = IOUtils.toString(inputStream, "UTF-8"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getDescriptor() { + return "MockChangeListener"; + } + + public String getConfFile() { + return confFile; + } + + public void setConfFile(String confFile) { + this.confFile = confFile; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java new file mode 100644 index 0000000..78f6cd5 --- /dev/null +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util; + +import com.squareup.okhttp.Headers; +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; +import org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public abstract class TestRestChangeNotifierCommon { + + public static OkHttpClient client; + public static RestChangeNotifier restChangeNotifier; + public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8"); + public static String url; + public static MockChangeListener mockChangeListener = new MockChangeListener(); + + @Test + public void testGet() throws Exception { + assertEquals(1, restChangeNotifier.getChangeListeners().size()); + + Request request = new Request.Builder() + .url(url) + .build(); + + Response response = client.newCall(request).execute(); + if (!response.isSuccessful()) throw new IOException("Unexpected code " + response); + + Headers responseHeaders = response.headers(); + for (int i = 0; i < responseHeaders.size(); i++) { + System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i)); + } + + assertEquals(RestChangeNotifier.GET_TEXT, response.body().string()); + } + + @Test + public void testFileUpload() throws Exception { + assertEquals(1, restChangeNotifier.getChangeListeners().size()); + + File file = new File("src/test/resources/testUploadFile.txt"); + assertTrue(file.exists()); + assertTrue(file.canRead()); + + Request request = new Request.Builder() + .url(url) + .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file)) + .addHeader("charset","UTF-8") + .build(); + + Response response = client.newCall(request).execute(); + if (!response.isSuccessful()) throw new IOException("Unexpected code " + response); + + Headers responseHeaders = response.headers(); + for (int i = 0; i < responseHeaders.size(); i++) { + System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i)); + } + + assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string()); + + assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile()); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java deleted file mode 100644 index 6843889..0000000 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.minifi.bootstrap.configuration.util; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; - -import java.io.IOException; -import java.io.InputStream; - -public class MockChangeListener implements ConfigurationChangeListener { - String confFile; - - @Override - public void handleChange(InputStream inputStream) { - try { - confFile = IOUtils.toString(inputStream, "UTF-8"); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public String getConfFile() { - return confFile; - } - - public void setConfFile(String confFile) { - this.confFile = confFile; - } - -} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java deleted file mode 100644 index b3c4f54..0000000 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.minifi.bootstrap.configuration.util; - -import com.squareup.okhttp.Headers; -import com.squareup.okhttp.MediaType; -import com.squareup.okhttp.OkHttpClient; -import com.squareup.okhttp.Request; -import com.squareup.okhttp.RequestBody; -import com.squareup.okhttp.Response; -import org.apache.nifi.minifi.bootstrap.configuration.RestChangeNotifier; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public abstract class TestRestChangeNotifierCommon { - - public static OkHttpClient client; - public static RestChangeNotifier restChangeNotifier; - public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8"); - public static String url; - public static MockChangeListener mockChangeListener = new MockChangeListener(); - - @Test - public void testGet() throws Exception { - assertEquals(1, restChangeNotifier.getChangeListeners().size()); - - Request request = new Request.Builder() - .url(url) - .build(); - - Response response = client.newCall(request).execute(); - if (!response.isSuccessful()) throw new IOException("Unexpected code " + response); - - Headers responseHeaders = response.headers(); - for (int i = 0; i < responseHeaders.size(); i++) { - System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i)); - } - - assertEquals(RestChangeNotifier.GET_TEXT, response.body().string()); - } - - @Test - public void testFileUpload() throws Exception { - assertEquals(1, restChangeNotifier.getChangeListeners().size()); - - File file = new File("src/test/resources/testUploadFile.txt"); - assertTrue(file.exists()); - assertTrue(file.canRead()); - - Request request = new Request.Builder() - .url(url) - .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file)) - .addHeader("charset","UTF-8") - .build(); - - Response response = client.newCall(request).execute(); - if (!response.isSuccessful()) throw new IOException("Unexpected code " + response); - - Headers responseHeaders = response.headers(); - for (int i = 0; i < responseHeaders.size(); i++) { - System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i)); - } - - assertEquals(RestChangeNotifier.POST_TEXT, response.body().string()); - - assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile()); - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java index 1a7f261..d0a7d71 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileInputStream; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; import org.junit.Assert; import org.junit.Test; @@ -63,6 +64,24 @@ public class TestConfigTransformer { flowXml.deleteOnExit(); } + @Test + public void doesTransformOnDefaultFile() throws Exception { + + ConfigTransformer.transformConfigFile("./src/test/resources/default.yml", "./target/"); + File nifiPropertiesFile = new File("./target/nifi.properties"); + + assertTrue(nifiPropertiesFile.exists()); + assertTrue(nifiPropertiesFile.canRead()); + + nifiPropertiesFile.deleteOnExit(); + + File flowXml = new File("./target/flow.xml.gz"); + assertTrue(flowXml.exists()); + assertTrue(flowXml.canRead()); + + flowXml.deleteOnExit(); + } + @Test(expected = IllegalArgumentException.class) public void handleTransformInvalidFile() throws Exception { @@ -70,4 +89,12 @@ public class TestConfigTransformer { Assert.fail("Invalid configuration file was not detected."); } + + @Test(expected = ConfigurationChangeException.class) + public void handleTransformEmptyFile() throws Exception { + + ConfigTransformer.transformConfigFile("./src/test/resources/config-empty.yml", "./target/"); + + Assert.fail("Invalid configuration file was not detected."); + } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/resources/config-empty.yml ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/resources/config-empty.yml b/minifi-bootstrap/src/test/resources/config-empty.yml new file mode 100644 index 0000000..fbbbeb9 --- /dev/null +++ b/minifi-bootstrap/src/test/resources/config-empty.yml @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Flow Controller: + name: MiNiFi Flow + comment: \ No newline at end of file
