MINIFI-107 - Process group support This closes #50
Signed-off-by: Joseph Percivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/31855bbc Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/31855bbc Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/31855bbc Branch: refs/heads/master Commit: 31855bbc7ee7016b460226d24790760523e07885 Parents: 1bbeedf Author: Bryan Rosander <[email protected]> Authored: Tue Nov 1 12:54:17 2016 -0400 Committer: Joseph Percivall <[email protected]> Committed: Tue Nov 8 15:06:47 2016 -0500 ---------------------------------------------------------------------- .../bootstrap/util/ConfigTransformer.java | 183 ++++-- .../bootstrap/util/ParentGroupIdResolver.java | 87 +++ .../bootstrap/util/ConfigTransformerTest.java | 166 ++++- .../util/ParentGroupIdResolverTest.java | 129 ++++ .../bootstrap/util/TestConfigTransformer.java | 3 - .../test/resources/config-process-groups.yml | 276 ++++++++ .../minifi/commons/schema/ConfigSchema.java | 124 ++-- .../minifi/commons/schema/ConnectionSchema.java | 41 +- .../nifi/minifi/commons/schema/PortSchema.java | 31 + .../commons/schema/ProcessGroupSchema.java | 153 +++++ .../minifi/commons/schema/ProcessorSchema.java | 26 +- .../commons/schema/RemoteInputPortSchema.java | 13 +- .../schema/RemoteProcessingGroupSchema.java | 15 +- .../commons/schema/common/BaseSchema.java | 52 +- .../schema/common/BaseSchemaWithIdAndName.java | 20 +- .../commons/schema/common/CollectionUtil.java | 39 ++ .../schema/common/CommonPropertyKeys.java | 1 + .../commons/schema/common/StringUtil.java | 34 +- .../exception/SchemaInstantiatonException.java | 30 + .../commons/schema/ProcessGroupSchemaTest.java | 64 ++ .../schema/serialization/SchemaLoaderTest.java | 4 +- .../commons/schema/v1/ConfigSchemaV1Test.java | 6 +- .../schema/v1/ConnectionSchemaV1Test.java | 2 +- .../src/main/markdown/System_Admin_Guide.md | 35 +- .../src/main/resources/conf/config.yml | 3 + .../toolkit/configuration/ConfigMain.java | 135 ++-- .../configuration/dto/ConfigSchemaFunction.java | 67 +- .../dto/ConnectionSchemaFunction.java | 6 +- .../configuration/dto/PortSchemaFunction.java | 46 ++ .../dto/ProcessorSchemaFunction.java | 6 +- .../toolkit/configuration/ConfigMainTest.java | 30 +- .../dto/PortSchemaFunctionTest.java | 74 +++ .../src/test/resources/CsvToJson.yml | 3 + .../resources/DecompressionCircularFlow.yml | 3 + .../resources/InvokeHttpMiNiFiTemplateTest.yml | 3 + .../test/resources/MultipleRelationships.yml | 3 + .../ProcessGroupsAndRemoteProcessGroups.xml | 648 +++++++++++++++++++ .../ProcessGroupsAndRemoteProcessGroups.yml | 276 ++++++++ ...aceTextExpressionLanguageCSVReformatting.yml | 3 + .../src/test/resources/StressTestFramework.yml | 3 + 40 files changed, 2532 insertions(+), 311 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java index 9794415..9fa7f05 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java @@ -27,11 +27,15 @@ import org.apache.nifi.minifi.commons.schema.ContentRepositorySchema; import org.apache.nifi.minifi.commons.schema.CorePropertiesSchema; import org.apache.nifi.minifi.commons.schema.FlowControllerSchema; import org.apache.nifi.minifi.commons.schema.FlowFileRepositorySchema; +import org.apache.nifi.minifi.commons.schema.PortSchema; +import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema; import org.apache.nifi.minifi.commons.schema.ProcessorSchema; import org.apache.nifi.minifi.commons.schema.ProvenanceReportingSchema; import org.apache.nifi.minifi.commons.schema.ProvenanceRepositorySchema; import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema; import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.common.Schema; import org.apache.nifi.minifi.commons.schema.common.StringUtil; import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema; @@ -68,13 +72,13 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; public final class ConfigTransformer { // Underlying version of NIFI will be using public static final String NIFI_VERSION = "0.6.1"; + public static final String ROOT_GROUP = "Root-Group"; // Final util classes should have private constructor private ConfigTransformer() { @@ -88,22 +92,25 @@ public final class ConfigTransformer { } public static void transformConfigFile(InputStream sourceStream, String destPath) throws Exception { - ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(sourceStream); - if (!configSchema.isValid()) { - throw new InvalidConfigurationException("Failed to transform config file due to:[" - + configSchema.getValidationIssues().stream().sorted().collect(Collectors.joining("], [")) + "]"); - } + ConvertableSchema<ConfigSchema> convertableSchema = throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(sourceStream)); + ConfigSchema configSchema = throwIfInvalid(convertableSchema.convert()); // Create nifi.properties and flow.xml.gz in memory ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream(); writeNiFiProperties(configSchema, nifiPropertiesOutputStream); - DOMSource flowXml = createFlowXml(configSchema); + writeFlowXmlFile(configSchema, destPath); // Write nifi.properties and flow.xml.gz writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath); + } - writeFlowXmlFile(flowXml, destPath); + private static <T extends Schema> T throwIfInvalid(T schema) throws InvalidConfigurationException { + if (!schema.isValid()) { + throw new InvalidConfigurationException("Failed to transform config file due to:[" + + schema.getValidationIssues().stream().sorted().collect(Collectors.joining("], [")) + "]"); + } + return schema; } protected static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException { @@ -118,10 +125,8 @@ public final class ConfigTransformer { } } - protected static void writeFlowXmlFile(DOMSource domSource, String path) throws IOException, TransformerException { - final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz")); - final OutputStream outStream = new GZIPOutputStream(fileOut); - final StreamResult streamResult = new StreamResult(outStream); + protected static void writeFlowXmlFile(ConfigSchema configSchema, OutputStream outputStream) throws TransformerException, ConfigTransformerException, ConfigurationChangeException, IOException { + final StreamResult streamResult = new StreamResult(outputStream); // configure the transformer and convert the DOM final TransformerFactory transformFactory = TransformerFactory.newInstance(); @@ -130,9 +135,15 @@ public final class ConfigTransformer { transformer.setOutputProperty(OutputKeys.INDENT, "yes"); // transform the document to byte stream - transformer.transform(domSource, streamResult); - outStream.flush(); - outStream.close(); + transformer.transform(createFlowXml(configSchema), streamResult); + } + + protected static void writeFlowXmlFile(ConfigSchema configSchema, String path) throws IOException, TransformerException, ConfigurationChangeException, ConfigTransformerException { + try (OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"))) { + try (OutputStream outStream = new GZIPOutputStream(fileOut)) { + writeFlowXmlFile(configSchema, outStream); + } + } } protected static void writeNiFiProperties(ConfigSchema configSchema, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException { @@ -280,7 +291,18 @@ public final class ConfigTransformer { CorePropertiesSchema coreProperties = configSchema.getCoreProperties(); addTextElement(rootNode, "maxTimerDrivenThreadCount", String.valueOf(coreProperties.getMaxConcurrentThreads())); addTextElement(rootNode, "maxEventDrivenThreadCount", String.valueOf(coreProperties.getMaxConcurrentThreads())); - addProcessGroup(rootNode, configSchema, "rootGroup"); + + FlowControllerSchema flowControllerProperties = configSchema.getFlowControllerProperties(); + + final Element element = doc.createElement("rootGroup"); + rootNode.appendChild(element); + + ProcessGroupSchema processGroupSchema = configSchema.getProcessGroupSchema(); + processGroupSchema.setId(ROOT_GROUP); + processGroupSchema.setName(flowControllerProperties.getName()); + processGroupSchema.setComment(flowControllerProperties.getComment()); + + addProcessGroup(doc, element, processGroupSchema, new ParentGroupIdResolver(processGroupSchema)); SecurityPropertiesSchema securityProperties = configSchema.getSecurityProperties(); if (securityProperties.useSSL()) { @@ -331,37 +353,38 @@ public final class ConfigTransformer { } } - protected static void addProcessGroup(final Element parentElement, ConfigSchema configSchema, final String elementName) throws ConfigurationChangeException { + protected static void addProcessGroup(Document doc, Element element, ProcessGroupSchema processGroupSchema, ParentGroupIdResolver parentGroupIdResolver) throws ConfigurationChangeException { try { - FlowControllerSchema flowControllerProperties = configSchema.getFlowControllerProperties(); - - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement(elementName); - parentElement.appendChild(element); - addTextElement(element, "id", "Root-Group"); - addTextElement(element, "name", flowControllerProperties.getName()); + String processGroupId = processGroupSchema.getId(); + addTextElement(element, "id", processGroupId); + addTextElement(element, "name", processGroupSchema.getName()); addPosition(element); - addTextElement(element, "comment", flowControllerProperties.getComment()); + addTextElement(element, "comment", processGroupSchema.getComment()); - List<ProcessorSchema> processors = configSchema.getProcessors(); - if (processors != null) { - for (ProcessorSchema processorConfig : processors) { - addProcessor(element, processorConfig); - } + for (ProcessorSchema processorConfig : processGroupSchema.getProcessors()) { + addProcessor(element, processorConfig); } - List<RemoteProcessingGroupSchema> remoteProcessingGroups = configSchema.getRemoteProcessingGroups(); - if (remoteProcessingGroups != null) { - for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : remoteProcessingGroups) { - addRemoteProcessGroup(element, remoteProcessingGroupSchema); - } + for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : processGroupSchema.getRemoteProcessingGroups()) { + addRemoteProcessGroup(element, remoteProcessingGroupSchema); } - List<ConnectionSchema> connections = configSchema.getConnections(); - if (connections != null) { - for (ConnectionSchema connectionConfig : connections) { - addConnection(element, connectionConfig, configSchema); - } + for (PortSchema portSchema : processGroupSchema.getInputPortSchemas()) { + addPort(doc, element, portSchema, "inputPort"); + } + + for (PortSchema portSchema : processGroupSchema.getOutputPortSchemas()) { + addPort(doc, element, portSchema, "outputPort"); + } + + for (ProcessGroupSchema child : processGroupSchema.getProcessGroupSchemas()) { + Element processGroups = doc.createElement("processGroup"); + element.appendChild(processGroups); + addProcessGroup(doc, processGroups, child, parentGroupIdResolver); + } + + for (ConnectionSchema connectionConfig : processGroupSchema.getConnections()) { + addConnection(element, connectionConfig, parentGroupIdResolver); } } catch (ConfigurationChangeException e) { throw e; @@ -370,6 +393,19 @@ public final class ConfigTransformer { } } + protected static void addPort(Document doc, Element parentElement, PortSchema portSchema, String tag) { + Element element = doc.createElement(tag); + parentElement.appendChild(element); + + addTextElement(element, "id", portSchema.getId()); + addTextElement(element, "name", portSchema.getName()); + + addPosition(element); + addTextElement(element, "comments", null); + + addTextElement(element, "scheduledState", "RUNNING"); + } + protected static void addProcessor(final Element parentElement, ProcessorSchema processorConfig) throws ConfigurationChangeException { try { final Document doc = parentElement.getOwnerDocument(); @@ -511,7 +547,7 @@ public final class ConfigTransformer { } } - protected static void addConnection(final Element parentElement, ConnectionSchema connectionProperties, ConfigSchema configSchema) throws ConfigurationChangeException { + protected static void addConnection(final Element parentElement, ConnectionSchema connectionProperties, ParentGroupIdResolver parentGroupIdResolver) throws ConfigurationChangeException { try { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement("connection"); @@ -526,23 +562,16 @@ public final class ConfigTransformer { addTextElement(element, "labelIndex", "1"); addTextElement(element, "zIndex", "0"); - addTextElement(element, "sourceId", connectionProperties.getSourceId()); - addTextElement(element, "sourceGroupId", "Root-Group"); - addTextElement(element, "sourceType", "PROCESSOR"); + addConnectionSourceOrDestination(element, "source", connectionProperties.getSourceId(), parentGroupIdResolver); + addConnectionSourceOrDestination(element, "destination", connectionProperties.getDestinationId(), parentGroupIdResolver); - final String connectionDestinationId = connectionProperties.getDestinationId(); - addTextElement(element, "destinationId", connectionDestinationId); - final Optional<String> parentGroup = findInputPortParentGroup(connectionDestinationId, configSchema); - if (parentGroup.isPresent()) { - addTextElement(element, "destinationGroupId", parentGroup.get()); - addTextElement(element, "destinationType", "REMOTE_INPUT_PORT"); + List<String> sourceRelationshipNames = connectionProperties.getSourceRelationshipNames(); + if (sourceRelationshipNames.isEmpty()) { + addTextElement(element, "relationship", null); } else { - addTextElement(element, "destinationGroupId", "Root-Group"); - addTextElement(element, "destinationType", "PROCESSOR"); - } - - for (String relationshipName : connectionProperties.getSourceRelationshipNames()) { - addTextElement(element, "relationship", relationshipName); + for (String relationshipName : sourceRelationshipNames) { + addTextElement(element, "relationship", relationshipName); + } } addTextElement(element, "maxWorkQueueSize", String.valueOf(connectionProperties.getMaxWorkQueueSize())); @@ -557,22 +586,36 @@ public final class ConfigTransformer { } } - // Locate the associated parent group for a given input port by its id - protected static Optional<String> findInputPortParentGroup(String inputPortId, ConfigSchema configSchema) { - final List<RemoteProcessingGroupSchema> remoteProcessingGroups = configSchema.getRemoteProcessingGroups(); - if (remoteProcessingGroups != null) { - for (final RemoteProcessingGroupSchema remoteProcessingGroupSchema : remoteProcessingGroups) { - final List<RemoteInputPortSchema> remoteInputPorts = remoteProcessingGroupSchema.getInputPorts(); - for (final RemoteInputPortSchema remoteInputPortSchema : remoteInputPorts) { - if (remoteInputPortSchema != null && inputPortId.equals(remoteInputPortSchema.getId())) { - return Optional.of(remoteProcessingGroupSchema.getName()); - - } + protected static void addConnectionSourceOrDestination(Element element, String sourceOrDestination, String id, ParentGroupIdResolver parentGroupIdResolver) { + String idTag = sourceOrDestination + "Id"; + String groupIdTag = sourceOrDestination + "GroupId"; + String typeTag = sourceOrDestination + "Type"; + + String parentId = parentGroupIdResolver.getRemoteInputPortParentId(id); + String type; + + if (parentId != null) { + type = "REMOTE_INPUT_PORT"; + } else { + parentId = parentGroupIdResolver.getInputPortParentId(id); + if (parentId != null) { + type = "INPUT_PORT"; + } else { + parentId = parentGroupIdResolver.getOutputPortParentId(id); + if (parentId != null) { + type = "OUTPUT_PORT"; + } else { + parentId = parentGroupIdResolver.getProcessorParentId(id); + type = "PROCESSOR"; } } } - return Optional.empty(); + addTextElement(element, idTag, id); + if (parentId != null) { + addTextElement(element, groupIdTag, parentId); + } + addTextElement(element, typeTag, type); } protected static void addPosition(final Element parentElement) { @@ -583,9 +626,7 @@ public final class ConfigTransformer { } protected static void addTextElementIfNotNullOrEmpty(final Element element, final String name, final String value) { - if (!StringUtil.isNullOrEmpty(value)) { - addTextElement(element, name, value); - } + StringUtil.doIfNotNullOrEmpty(value, s -> addTextElement(element, name, value)); } protected static void addTextElement(final Element element, final String name, final String value) { http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java new file mode 100644 index 0000000..71088ee --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java @@ -0,0 +1,87 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.bootstrap.util; + +import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema; +import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema; +import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema; +import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +public class ParentGroupIdResolver { + private final Map<String, String> processorIdToParentIdMap; + private final Map<String, String> inputPortIdToParentIdMap; + private final Map<String, String> outputPortIdToParentIdMap; + private final Map<String, String> remoteInputPortIdToParentIdMap; + + public ParentGroupIdResolver(ProcessGroupSchema processGroupSchema) { + this.processorIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getProcessors); + this.inputPortIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getInputPortSchemas); + this.outputPortIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getOutputPortSchemas); + this.remoteInputPortIdToParentIdMap = getRemoteInputPortParentIdMap(processGroupSchema); + } + + protected static Map<String, String> getParentIdMap(ProcessGroupSchema processGroupSchema, Function<ProcessGroupSchema, Collection<? extends BaseSchemaWithIdAndName>> schemaAccessor) { + Map<String, String> map = new HashMap<>(); + getParentIdMap(processGroupSchema, map, schemaAccessor); + return map; + } + + protected static void getParentIdMap(ProcessGroupSchema processGroupSchema, Map<String, String> output, Function<ProcessGroupSchema, + Collection<? extends BaseSchemaWithIdAndName>> schemaAccessor) { + schemaAccessor.apply(processGroupSchema).forEach(p -> output.put(p.getId(), processGroupSchema.getId())); + processGroupSchema.getProcessGroupSchemas().forEach(p -> getParentIdMap(p, output, schemaAccessor)); + } + + protected static Map<String, String> getRemoteInputPortParentIdMap(ProcessGroupSchema processGroupSchema) { + Map<String, String> result = new HashMap<>(); + getRemoteInputPortParentIdMap(processGroupSchema, result); + return result; + } + + protected static void getRemoteInputPortParentIdMap(ProcessGroupSchema processGroupSchema, Map<String, String> output) { + for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : processGroupSchema.getRemoteProcessingGroups()) { + for (RemoteInputPortSchema remoteInputPortSchema : remoteProcessingGroupSchema.getInputPorts()) { + output.put(remoteInputPortSchema.getId(), remoteProcessingGroupSchema.getName()); + } + } + processGroupSchema.getProcessGroupSchemas().forEach(p -> getRemoteInputPortParentIdMap(p, output)); + } + + public String getRemoteInputPortParentId(String id) { + return remoteInputPortIdToParentIdMap.get(id); + } + + public String getInputPortParentId(String id) { + return inputPortIdToParentIdMap.get(id); + } + + public String getOutputPortParentId(String id) { + return outputPortIdToParentIdMap.get(id); + } + + public String getProcessorParentId(String id) { + return processorIdToParentIdMap.get(id); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java index 40dbe10..cac9d16 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java @@ -20,20 +20,35 @@ package org.apache.nifi.minifi.bootstrap.util; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; import org.apache.nifi.minifi.commons.schema.ConfigSchema; import org.apache.nifi.minifi.commons.schema.ConnectionSchema; +import org.apache.nifi.minifi.commons.schema.PortSchema; +import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema; +import org.apache.nifi.minifi.commons.schema.ProcessorSchema; +import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema; +import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema; +import org.apache.nifi.minifi.commons.schema.common.StringUtil; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; import org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer; import org.junit.Before; import org.junit.Test; +import org.w3c.dom.Document; import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import javax.xml.xpath.XPath; import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -41,17 +56,21 @@ import static org.junit.Assert.assertNull; public class ConfigTransformerTest { private XPathFactory xPathFactory; + private Document document; private Element config; + private DocumentBuilder documentBuilder; @Before public void setup() throws ParserConfigurationException { - config = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument().createElement("config"); + documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + document = documentBuilder.newDocument(); + config = document.createElement("config"); xPathFactory = XPathFactory.newInstance(); } @Test public void testNullQueuePrioritizerNotWritten() throws ConfigurationChangeException, XPathExpressionException { - ConfigTransformer.addConnection(config, new ConnectionSchema(Collections.emptyMap()), new ConfigSchema(Collections.emptyMap())); + ConfigTransformer.addConnection(config, new ConnectionSchema(Collections.emptyMap()), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME))); XPath xpath = xPathFactory.newXPath(); String expression = "connection/queuePrioritizerClass"; assertNull(xpath.evaluate(expression, config, XPathConstants.NODE)); @@ -62,7 +81,7 @@ public class ConfigTransformerTest { Map<String, Object> map = new HashMap<>(); map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, ""); - ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ConfigSchema(Collections.emptyMap())); + ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME))); XPath xpath = xPathFactory.newXPath(); String expression = "connection/queuePrioritizerClass"; assertNull(xpath.evaluate(expression, config, XPathConstants.NODE)); @@ -73,9 +92,148 @@ public class ConfigTransformerTest { Map<String, Object> map = new HashMap<>(); map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, FirstInFirstOutPrioritizer.class.getCanonicalName()); - ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ConfigSchema(Collections.emptyMap())); + ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME))); XPath xpath = xPathFactory.newXPath(); String expression = "connection/queuePrioritizerClass/text()"; assertEquals(FirstInFirstOutPrioritizer.class.getCanonicalName(), xpath.evaluate(expression, config, XPathConstants.STRING)); } + + @Test + public void testProcessGroupsTransform() throws Exception { + ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream("config-process-groups.yml")); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ConfigTransformer.writeFlowXmlFile(configSchema, outputStream); + Document document = documentBuilder.parse(new ByteArrayInputStream(outputStream.toByteArray())); + + testProcessGroup((Element) xPathFactory.newXPath().evaluate("flowController/rootGroup", document, XPathConstants.NODE), configSchema.getProcessGroupSchema()); + } + + private void testProcessGroup(Element element, ProcessGroupSchema processGroupSchema) throws XPathExpressionException { + assertEquals(processGroupSchema.getId(), getText(element, "id")); + assertEquals(processGroupSchema.getName(), getText(element, "name")); + assertEquals(nullToEmpty(processGroupSchema.getComment()), nullToEmpty(getText(element, "comment"))); + + NodeList processorElements = (NodeList) xPathFactory.newXPath().evaluate("processor", element, XPathConstants.NODESET); + assertEquals(processGroupSchema.getProcessors().size(), processorElements.getLength()); + for (int i = 0; i < processorElements.getLength(); i++) { + testProcessor((Element) processorElements.item(i), processGroupSchema.getProcessors().get(i)); + } + + NodeList remoteProcessGroupElements = (NodeList) xPathFactory.newXPath().evaluate("remoteProcessGroup", element, XPathConstants.NODESET); + assertEquals(processGroupSchema.getRemoteProcessingGroups().size(), remoteProcessGroupElements.getLength()); + for (int i = 0; i < remoteProcessGroupElements.getLength(); i++) { + testRemoteProcessGroups((Element) remoteProcessGroupElements.item(i), processGroupSchema.getRemoteProcessingGroups().get(i)); + } + + NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET); + assertEquals(processGroupSchema.getInputPortSchemas().size(), inputPortElements.getLength()); + for (int i = 0; i < inputPortElements.getLength(); i++) { + testPort((Element) inputPortElements.item(i), processGroupSchema.getInputPortSchemas().get(i)); + } + + NodeList outputPortElements = (NodeList) xPathFactory.newXPath().evaluate("outputPort", element, XPathConstants.NODESET); + assertEquals(processGroupSchema.getOutputPortSchemas().size(), outputPortElements.getLength()); + for (int i = 0; i < outputPortElements.getLength(); i++) { + testPort((Element) outputPortElements.item(i), processGroupSchema.getOutputPortSchemas().get(i)); + } + + NodeList processGroupElements = (NodeList) xPathFactory.newXPath().evaluate("processGroup", element, XPathConstants.NODESET); + assertEquals(processGroupSchema.getProcessGroupSchemas().size(), processGroupElements.getLength()); + for (int i = 0; i < processGroupElements.getLength(); i++) { + testProcessGroup((Element) processGroupElements.item(i), processGroupSchema.getProcessGroupSchemas().get(i)); + } + + NodeList connectionElements = (NodeList) xPathFactory.newXPath().evaluate("connection", element, XPathConstants.NODESET); + assertEquals(processGroupSchema.getConnections().size(), connectionElements.getLength()); + for (int i = 0; i < connectionElements.getLength(); i++) { + testConnection((Element) connectionElements.item(i), processGroupSchema.getConnections().get(i)); + } + } + + private void testProcessor(Element element, ProcessorSchema processorSchema) throws XPathExpressionException { + assertEquals(processorSchema.getId(), getText(element, "id")); + assertEquals(processorSchema.getName(), getText(element, "name")); + assertEquals(processorSchema.getProcessorClass(), getText(element, "class")); + assertEquals(processorSchema.getMaxConcurrentTasks().toString(), getText(element, "maxConcurrentTasks")); + assertEquals(processorSchema.getSchedulingPeriod(), getText(element, "schedulingPeriod")); + assertEquals(processorSchema.getPenalizationPeriod(), getText(element, "penalizationPeriod")); + assertEquals(processorSchema.getYieldPeriod(), getText(element, "yieldPeriod")); + assertEquals(processorSchema.getSchedulingStrategy(), getText(element, "schedulingStrategy")); + assertEquals(processorSchema.getRunDurationNanos().toString(), getText(element, "runDurationNanos")); + + testProperties(element, processorSchema.getProperties()); + } + + private void testRemoteProcessGroups(Element element, RemoteProcessingGroupSchema remoteProcessingGroupSchema) throws XPathExpressionException { + assertEquals(remoteProcessingGroupSchema.getName(), getText(element, "id")); + assertEquals(remoteProcessingGroupSchema.getName(), getText(element, "name")); + assertEquals(remoteProcessingGroupSchema.getComment(), getText(element, "comment")); + assertEquals(remoteProcessingGroupSchema.getUrl(), getText(element, "url")); + assertEquals(remoteProcessingGroupSchema.getTimeout(), getText(element, "timeout")); + assertEquals(remoteProcessingGroupSchema.getYieldPeriod(), getText(element, "yieldPeriod")); + + + NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET); + assertEquals(remoteProcessingGroupSchema.getInputPorts().size(), inputPortElements.getLength()); + for (int i = 0; i < inputPortElements.getLength(); i++) { + testRemoteInputPort((Element) inputPortElements.item(i), remoteProcessingGroupSchema.getInputPorts().get(i)); + } + } + + private void testRemoteInputPort(Element element, RemoteInputPortSchema remoteInputPortSchema) throws XPathExpressionException { + assertEquals(remoteInputPortSchema.getId(), getText(element, "id")); + assertEquals(remoteInputPortSchema.getName(), getText(element, "name")); + assertEquals(remoteInputPortSchema.getComment(), getText(element, "comment")); + assertEquals(remoteInputPortSchema.getMax_concurrent_tasks().toString(), getText(element, "maxConcurrentTasks")); + assertEquals(remoteInputPortSchema.getUseCompression(), Boolean.parseBoolean(getText(element, "useCompression"))); + } + + private void testPort(Element element, PortSchema portSchema) throws XPathExpressionException { + assertEquals(portSchema.getId(), getText(element, "id")); + assertEquals(portSchema.getName(), getText(element, "name")); + assertEquals("RUNNING", getText(element, "scheduledState")); + } + + private void testConnection(Element element, ConnectionSchema connectionSchema) throws XPathExpressionException { + assertEquals(connectionSchema.getId(), getText(element, "id")); + assertEquals(connectionSchema.getName(), getText(element, "name")); + + assertEquals(connectionSchema.getSourceId(), getText(element, "sourceId")); + assertEquals(connectionSchema.getDestinationId(), getText(element, "destinationId")); + + NodeList relationshipNodes = (NodeList) xPathFactory.newXPath().evaluate("relationship", element, XPathConstants.NODESET); + Set<String> sourceRelationships = new HashSet<>(); + for (int i = 0; i < relationshipNodes.getLength(); i++) { + String textContent = relationshipNodes.item(i).getTextContent(); + if (!StringUtil.isNullOrEmpty(textContent)) { + sourceRelationships.add(textContent); + } + } + + assertEquals(new HashSet<>(connectionSchema.getSourceRelationshipNames()), sourceRelationships); + + assertEquals(connectionSchema.getMaxWorkQueueSize().toString(), getText(element, "maxWorkQueueSize")); + assertEquals(connectionSchema.getMaxWorkQueueDataSize(), getText(element, "maxWorkQueueDataSize")); + assertEquals(connectionSchema.getFlowfileExpiration(), getText(element, "flowFileExpiration")); + assertEquals(connectionSchema.getQueuePrioritizerClass(), getText(element, "queuePrioritizerClass")); + } + + private void testProperties(Element element, Map<String, Object> expected) throws XPathExpressionException { + NodeList propertyElements = (NodeList) xPathFactory.newXPath().evaluate("property", element, XPathConstants.NODESET); + Map<String, String> properties = new HashMap<>(); + for (int i = 0; i < propertyElements.getLength(); i++) { + Element item = (Element) propertyElements.item(i); + properties.put(getText(item, "name"), getText(item, "value")); + } + assertEquals(expected.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> nullToEmpty(e.getValue()))), properties); + } + + private String getText(Element element, String path) throws XPathExpressionException { + return (String) xPathFactory.newXPath().evaluate(path + "/text()", element, XPathConstants.STRING); + } + + private String nullToEmpty(Object val) { + return val == null ? "" : val.toString(); + } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java new file mode 100644 index 0000000..6f557b7 --- /dev/null +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java @@ -0,0 +1,129 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.bootstrap.util; + +import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ParentGroupIdResolverTest { + + @Test + public void testRemoteInputPortParentId() throws IOException, SchemaLoaderException { + List<String> configLines = new ArrayList<>(); + configLines.add("MiNiFi Config Version: 2"); + configLines.add("Remote Processing Groups:"); + configLines.add("- name: rpgOne"); + configLines.add(" Input Ports:"); + configLines.add(" - id: one"); + configLines.add("Process Groups:"); + configLines.add("- Remote Processing Groups:"); + configLines.add(" - name: rpgTwo"); + configLines.add(" Input Ports:"); + configLines.add(" - id: two"); + ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines); + assertEquals("rpgOne", parentGroupIdResolver.getRemoteInputPortParentId("one")); + assertEquals("rpgTwo", parentGroupIdResolver.getRemoteInputPortParentId("two")); + assertNull(parentGroupIdResolver.getInputPortParentId("one")); + assertNull(parentGroupIdResolver.getInputPortParentId("two")); + assertNull(parentGroupIdResolver.getOutputPortParentId("one")); + assertNull(parentGroupIdResolver.getOutputPortParentId("two")); + assertNull(parentGroupIdResolver.getProcessorParentId("one")); + assertNull(parentGroupIdResolver.getProcessorParentId("two")); + } + + @Test + public void testInputPortParentId() throws IOException, SchemaLoaderException { + List<String> configLines = new ArrayList<>(); + configLines.add("MiNiFi Config Version: 2"); + configLines.add("Input Ports:"); + configLines.add("- id: one"); + configLines.add("Process Groups:"); + configLines.add("- id: pgTwo"); + configLines.add(" Input Ports:"); + configLines.add(" - id: two"); + ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines); + assertNull(parentGroupIdResolver.getRemoteInputPortParentId("one")); + assertNull(parentGroupIdResolver.getRemoteInputPortParentId("two")); + assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getInputPortParentId("one")); + assertEquals("pgTwo", parentGroupIdResolver.getInputPortParentId("two")); + assertNull(parentGroupIdResolver.getOutputPortParentId("one")); + assertNull(parentGroupIdResolver.getOutputPortParentId("two")); + assertNull(parentGroupIdResolver.getProcessorParentId("one")); + assertNull(parentGroupIdResolver.getProcessorParentId("two")); + } + + @Test + public void testOutputPortParentId() throws IOException, SchemaLoaderException { + List<String> configLines = new ArrayList<>(); + configLines.add("MiNiFi Config Version: 2"); + configLines.add("Output Ports:"); + configLines.add("- id: one"); + configLines.add("Process Groups:"); + configLines.add("- id: pgTwo"); + configLines.add(" Output Ports:"); + configLines.add(" - id: two"); + ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines); + assertNull(parentGroupIdResolver.getRemoteInputPortParentId("one")); + assertNull(parentGroupIdResolver.getRemoteInputPortParentId("two")); + assertNull(parentGroupIdResolver.getInputPortParentId("one")); + assertNull(parentGroupIdResolver.getInputPortParentId("two")); + assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getOutputPortParentId("one")); + assertEquals("pgTwo", parentGroupIdResolver.getOutputPortParentId("two")); + assertNull(parentGroupIdResolver.getProcessorParentId("one")); + assertNull(parentGroupIdResolver.getProcessorParentId("two")); + } + + @Test + public void testProcessorParentId() throws IOException, SchemaLoaderException { + List<String> configLines = new ArrayList<>(); + configLines.add("MiNiFi Config Version: 2"); + configLines.add("Processors:"); + configLines.add("- id: one"); + configLines.add("Process Groups:"); + configLines.add("- id: pgTwo"); + configLines.add(" Processors:"); + configLines.add(" - id: two"); + ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines); + assertNull(parentGroupIdResolver.getRemoteInputPortParentId("one")); + assertNull(parentGroupIdResolver.getRemoteInputPortParentId("two")); + assertNull(parentGroupIdResolver.getInputPortParentId("one")); + assertNull(parentGroupIdResolver.getInputPortParentId("two")); + assertNull(parentGroupIdResolver.getOutputPortParentId("one")); + assertNull(parentGroupIdResolver.getOutputPortParentId("two")); + assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getProcessorParentId("one")); + assertEquals("pgTwo", parentGroupIdResolver.getProcessorParentId("two")); + } + + private ParentGroupIdResolver createParentGroupIdResolver(List<String> configLines) throws IOException, SchemaLoaderException { + return new ParentGroupIdResolver(SchemaLoader.loadConfigSchemaFromYaml(new ByteArrayInputStream(configLines.stream().collect(Collectors.joining("\n")) + .getBytes(StandardCharsets.UTF_8))).getProcessGroupSchema()); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java index 617da90..a0077fe 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java @@ -18,8 +18,6 @@ package org.apache.nifi.minifi.bootstrap.util; import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException; -import org.apache.nifi.minifi.commons.schema.ConnectionSchema; -import org.apache.nifi.minifi.commons.schema.common.BaseSchema; import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException; import org.junit.Test; @@ -217,7 +215,6 @@ public class TestConfigTransformer { } catch (InvalidConfigurationException e){ assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required], " + "['scheduling strategy' in section 'Provenance Reporting' because it is not a valid scheduling strategy], " + - "[" + BaseSchema.getIssueText(ConnectionSchema.SOURCE_ID_KEY, "Connections", BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED) + "], " + "['source name' in section 'Connections' because it was not found and it is required]", e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/test/resources/config-process-groups.yml ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/resources/config-process-groups.yml b/minifi-bootstrap/src/test/resources/config-process-groups.yml new file mode 100644 index 0000000..e0e5ef3 --- /dev/null +++ b/minifi-bootstrap/src/test/resources/config-process-groups.yml @@ -0,0 +1,276 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +MiNiFi Config Version: 2 +Flow Controller: + name: ProcessGroupsAndRemoteProcessGroups + comment: '' +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: + threshold: 20000 + in period: 5 sec + in threads: 1 + out period: 5 sec + out threads: 4 +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false +Provenance Repository: + provenance rollover time: 1 min +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min +Security Properties: + keystore: '' + keystore type: '' + keystore password: '' + key password: '' + truststore: '' + truststore type: '' + truststore password: '' + ssl protocol: '' + Sensitive Props: + key: + algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL + provider: BC +Processors: +- id: 207748d1-0158-1000-0000-000000000000 + name: GenerateFlowFile + class: org.apache.nifi.processors.standard.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Batch Size: '1' + Data Format: Binary + File Size: 1 b + Unique FlowFiles: 'false' +- id: 2079e8bd-0158-1000-0000-000000000000 + name: LogAttribute + class: org.apache.nifi.processors.standard.LogAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + - success + Properties: + Attributes to Ignore: + Attributes to Log: + Log Level: info + Log Payload: 'false' + Log prefix: +- id: 2077ab1e-0158-1000-0000-000000000000 + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Delete Attributes Expression: + top: top +Process Groups: +- id: 207888b1-0158-1000-0000-000000000000 + name: middle + Processors: + - id: 2078f34e-0158-1000-0000-000000000000 + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Delete Attributes Expression: + middle: middle + Process Groups: + - id: 20794cd4-0158-1000-0000-000000000000 + name: bottom + Processors: + - id: 207a89ba-0158-1000-0000-000000000000 + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Delete Attributes Expression: + bottom: bottom + Process Groups: [] + Input Ports: + - id: 207a5f50-0158-1000-0000-000000000000 + name: input + Output Ports: + - id: 207a6d92-0158-1000-0000-000000000000 + name: output + Connections: + - id: 21a6abb9-0158-1000-0000-000000000000 + name: UpdateAttribute/success/21a39aba-0158-1000-a1a0-1b55bcddcd72 + source id: 207a89ba-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 21a39aba-0158-1000-a1a0-1b55bcddcd72 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 207ad5e9-0158-1000-0000-000000000000 + name: UpdateAttribute/success/null + source id: 207a89ba-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 207a6d92-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 207aca0d-0158-1000-0000-000000000000 + name: null//UpdateAttribute + source id: 207a5f50-0158-1000-0000-000000000000 + source relationship names: [] + destination id: 207a89ba-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + Remote Processing Groups: + - name: http://localhost:9091/nifi + url: http://localhost:9091/nifi + comment: '' + timeout: 30 sec + yield period: 10 sec + Input Ports: + - id: 21a39aba-0158-1000-a1a0-1b55bcddcd72 + name: input2 + comment: '' + max concurrent tasks: 1 + use compression: false + Input Ports: + - id: 2078c936-0158-1000-0000-000000000000 + name: input + Output Ports: + - id: 2079b327-0158-1000-0000-000000000000 + name: output + Connections: + - id: 21a5b1f1-0158-1000-0000-000000000000 + name: UpdateAttribute/success/21a2fb5e-0158-1000-3b5e-5a7d3aaee01b + source id: 2078f34e-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 207b0eb1-0158-1000-0000-000000000000 + name: UpdateAttribute/success/null + source id: 2078f34e-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 207a5f50-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 20792ec2-0158-1000-0000-000000000000 + name: null//UpdateAttribute + source id: 2078c936-0158-1000-0000-000000000000 + source relationship names: [] + destination id: 2078f34e-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 207b1880-0158-1000-0000-000000000000 + name: null//null + source id: 207a6d92-0158-1000-0000-000000000000 + source relationship names: [] + destination id: 2079b327-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + Remote Processing Groups: + - name: http://localhost:9090/nifi + url: http://localhost:9090/nifi + comment: '' + timeout: 30 sec + yield period: 10 sec + Input Ports: + - id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b + name: input + comment: '' + max concurrent tasks: 1 + use compression: false +Input Ports: [] +Output Ports: [] +Connections: +- id: 2077bf8f-0158-1000-0000-000000000000 + name: GenerateFlowFile/success/UpdateAttribute + source id: 207748d1-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 2077ab1e-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' +- id: 2079cf6f-0158-1000-0000-000000000000 + name: UpdateAttribute/success/null + source id: 2077ab1e-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 2078c936-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' +- id: 2079faa0-0158-1000-0000-000000000000 + name: null//LogAttribute + source id: 2079b327-0158-1000-0000-000000000000 + source relationship names: [] + destination id: 2079e8bd-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' +Remote Processing Groups: [] http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java index 8dfd9d4..abd6a6c 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java @@ -23,6 +23,8 @@ import org.apache.nifi.minifi.commons.schema.common.StringUtil; import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -31,31 +33,27 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY; -/** - * - */ public class ConfigSchema extends BaseSchema implements WritableSchema, ConvertableSchema<ConfigSchema> { - public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: "; - public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: "; - public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: "; - public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: "; - public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in Processors and Remote Input Ports: "; public static final int CONFIG_VERSION = 2; public static final String VERSION = "MiNiFi Config Version"; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: "; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS = "Found the following duplicate input port ids: "; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS = "Found the following duplicate output port ids: "; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in more than one Processor(s), Input Port(s), Output Port(s) and/or Remote Input Port(s): "; public static final String CONNECTION_WITH_ID = "Connection with id "; public static final String HAS_INVALID_SOURCE_ID = " has invalid source id "; public static final String HAS_INVALID_DESTINATION_ID = " has invalid destination id "; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: "; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: "; + public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: "; public static String TOP_LEVEL_NAME = "top level"; private FlowControllerSchema flowControllerProperties; private CorePropertiesSchema coreProperties; @@ -63,9 +61,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta private ContentRepositorySchema contentRepositoryProperties; private ComponentStatusRepositorySchema componentStatusRepositoryProperties; private SecurityPropertiesSchema securityProperties; - private List<ProcessorSchema> processors; - private List<ConnectionSchema> connections; - private List<RemoteProcessingGroupSchema> remoteProcessingGroups; + private ProcessGroupSchema processGroupSchema; private ProvenanceReportingSchema provenanceReportingProperties; private ProvenanceRepositorySchema provenanceRepositorySchema; @@ -85,13 +81,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta componentStatusRepositoryProperties = getMapAsType(map, COMPONENT_STATUS_REPO_KEY, ComponentStatusRepositorySchema.class, TOP_LEVEL_NAME, false); securityProperties = getMapAsType(map, SECURITY_PROPS_KEY, SecurityPropertiesSchema.class, TOP_LEVEL_NAME, false); - processors = convertListToType(getOptionalKeyAsType(map, PROCESSORS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()), "processor", ProcessorSchema.class, PROCESSORS_KEY); - - remoteProcessingGroups = convertListToType(getOptionalKeyAsType(map, REMOTE_PROCESSING_GROUPS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()), "remote processing group", - RemoteProcessingGroupSchema.class, REMOTE_PROCESSING_GROUPS_KEY); - - connections = convertListToType(getOptionalKeyAsType(map, CONNECTIONS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()), - "connection", ConnectionSchema.class, CONNECTIONS_KEY); + processGroupSchema = new ProcessGroupSchema(map, TOP_LEVEL_NAME); provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false); @@ -101,47 +91,63 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta addIssuesIfNotNull(contentRepositoryProperties); addIssuesIfNotNull(componentStatusRepositoryProperties); addIssuesIfNotNull(securityProperties); + addIssuesIfNotNull(processGroupSchema); addIssuesIfNotNull(provenanceReportingProperties); addIssuesIfNotNull(provenanceRepositorySchema); - addIssuesIfNotNull(processors); - addIssuesIfNotNull(connections); - addIssuesIfNotNull(remoteProcessingGroups); - Set<String> processorIds = new HashSet<>(); - List<String> processorIdList = processors.stream().map(ProcessorSchema::getId).collect(Collectors.toList()); - processorIds.addAll(processorIdList); + List<ProcessGroupSchema> allProcessGroups = getAllProcessGroups(processGroupSchema); + List<ConnectionSchema> allConnectionSchemas = allProcessGroups.stream().flatMap(p -> p.getConnections().stream()).collect(Collectors.toList()); + List<RemoteProcessingGroupSchema> allRemoteProcessingGroups = allProcessGroups.stream().flatMap(p -> p.getRemoteProcessingGroups().stream()).collect(Collectors.toList()); - checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, processorIdList); - checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, connections.stream().map(ConnectionSchema::getId).collect(Collectors.toList())); - checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, - remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList())); - - Set<String> remoteInputPortIds = new HashSet<>(); - List<String> remoteInputPortIdList = remoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) + List<String> allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList()); + List<String> allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList()); + List<String> allRemoteProcessingGroupNames = allRemoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList()); + List<String> allRemoteInputPortIds = allRemoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) .flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toList()); - checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, remoteInputPortIdList); - remoteInputPortIds.addAll(remoteInputPortIdList); - - Set<String> duplicateIds = new HashSet<>(processorIds); - duplicateIds.retainAll(remoteInputPortIds); - if (duplicateIds.size() > 0) { - addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + duplicateIds.stream().sorted().collect(Collectors.joining(", "))); + List<String> allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList()); + List<String> allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList()); + + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds); + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds); + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, allRemoteProcessingGroupNames); + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, allRemoteInputPortIds); + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS, allInputPortIds); + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds); + + // Potential connection sources and destinations need to have unique ids + OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds)); + if (overlapResults.duplicates.size() > 0) { + addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.duplicates.stream().sorted().collect(Collectors.joining(", "))); } - Set<String> connectableIds = new HashSet<>(processorIds); - connectableIds.addAll(remoteInputPortIds); - connections.forEach(c -> { + allConnectionSchemas.forEach(c -> { String destinationId = c.getDestinationId(); - if (!StringUtil.isNullOrEmpty(destinationId) && !connectableIds.contains(destinationId)) { + if (!StringUtil.isNullOrEmpty(destinationId) && !overlapResults.seen.contains(destinationId)) { addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_DESTINATION_ID + destinationId); } String sourceId = c.getSourceId(); - if (!StringUtil.isNullOrEmpty(sourceId) && !connectableIds.contains(sourceId)) { + if (!StringUtil.isNullOrEmpty(sourceId) && !overlapResults.seen.contains(sourceId)) { addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_SOURCE_ID + sourceId); } }); } + protected static <T> OverlapResults<T> findOverlap(Collection<T>... collections) { + Set<T> seen = new HashSet<>(); + return new OverlapResults<>(seen, Arrays.stream(collections).flatMap(c -> c.stream()).sequential().filter(s -> !seen.add(s)).collect(Collectors.toSet())); + } + + public static List<ProcessGroupSchema> getAllProcessGroups(ProcessGroupSchema processGroupSchema) { + List<ProcessGroupSchema> result = new ArrayList<>(); + addProcessGroups(processGroupSchema, result); + return result; + } + + private static void addProcessGroups(ProcessGroupSchema processGroupSchema, List<ProcessGroupSchema> result) { + result.add(processGroupSchema); + processGroupSchema.getProcessGroupSchemas().forEach(p -> addProcessGroups(p, result)); + } + public Map<String, Object> toMap() { Map<String, Object> result = mapSupplier.get(); result.put(VERSION, getVersion()); @@ -152,9 +158,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta putIfNotNull(result, PROVENANCE_REPO_KEY, provenanceRepositorySchema); putIfNotNull(result, COMPONENT_STATUS_REPO_KEY, componentStatusRepositoryProperties); putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties); - putListIfNotNull(result, PROCESSORS_KEY, processors); - putListIfNotNull(result, CONNECTIONS_KEY, connections); - putListIfNotNull(result, REMOTE_PROCESSING_GROUPS_KEY, remoteProcessingGroups); + result.putAll(processGroupSchema.toMap()); putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties); return result; } @@ -179,16 +183,8 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta return securityProperties; } - public List<ProcessorSchema> getProcessors() { - return processors; - } - - public List<ConnectionSchema> getConnections() { - return connections; - } - - public List<RemoteProcessingGroupSchema> getRemoteProcessingGroups() { - return remoteProcessingGroups; + public ProcessGroupSchema getProcessGroupSchema() { + return processGroupSchema; } public ProvenanceReportingSchema getProvenanceReportingProperties() { @@ -212,4 +208,14 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta public ConfigSchema convert() { return this; } + + private static class OverlapResults<T> { + private final Set<T> seen; + private final Set<T> duplicates; + + private OverlapResults(Set<T> seen, Set<T> duplicates) { + this.seen = seen; + this.duplicates = duplicates; + } + } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java index 768213c..47a87d9 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java @@ -25,8 +25,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; - public class ConnectionSchema extends BaseSchemaWithIdAndName { public static final String SOURCE_ID_KEY = "source id"; public static final String SOURCE_RELATIONSHIP_NAMES_KEY = "source relationship names"; @@ -41,6 +39,7 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName { public static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec"; private String sourceId; + private boolean needsSourceRelationships = true; private List<String> sourceRelationshipNames; private String destinationId; @@ -50,21 +49,19 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName { private String queuePrioritizerClass; public ConnectionSchema(Map map) { - super(map, CONNECTIONS_KEY); + super(map, "Connection(id: {id}, name: {name})"); + String wrapperName = getWrapperName(); // In case of older version, these may not be available until after construction, validated in getValidationIssues() - sourceId = getOptionalKeyAsType(map, SOURCE_ID_KEY, String.class, CONNECTIONS_KEY, ""); - destinationId = getOptionalKeyAsType(map, DESTINATION_ID_KEY, String.class, CONNECTIONS_KEY, ""); - - sourceRelationshipNames = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, CONNECTIONS_KEY, new ArrayList<>()); - if (sourceRelationshipNames.isEmpty()) { - addValidationIssue("Expected at least one value in " + SOURCE_RELATIONSHIP_NAMES_KEY + " for " + CONNECTIONS_KEY + " " + getName()); - } - - maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, CONNECTIONS_KEY, DEFAULT_MAX_WORK_QUEUE_SIZE); - maxWorkQueueDataSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, CONNECTIONS_KEY, DEFAULT_MAX_QUEUE_DATA_SIZE); - flowfileExpiration = getOptionalKeyAsType(map, FLOWFILE_EXPIRATION__KEY, String.class, CONNECTIONS_KEY, DEFAULT_FLOWFILE_EXPIRATION); - queuePrioritizerClass = getOptionalKeyAsType(map, QUEUE_PRIORITIZER_CLASS_KEY, String.class, CONNECTIONS_KEY, ""); + sourceId = getOptionalKeyAsType(map, SOURCE_ID_KEY, String.class, wrapperName, ""); + destinationId = getOptionalKeyAsType(map, DESTINATION_ID_KEY, String.class, wrapperName, ""); + + // This could be empty if the source is a port. + sourceRelationshipNames = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, wrapperName, new ArrayList<>()); + maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, wrapperName, DEFAULT_MAX_WORK_QUEUE_SIZE); + maxWorkQueueDataSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, wrapperName, DEFAULT_MAX_QUEUE_DATA_SIZE); + flowfileExpiration = getOptionalKeyAsType(map, FLOWFILE_EXPIRATION__KEY, String.class, wrapperName, DEFAULT_FLOWFILE_EXPIRATION); + queuePrioritizerClass = getOptionalKeyAsType(map, QUEUE_PRIORITIZER_CLASS_KEY, String.class, wrapperName, ""); } @Override @@ -117,14 +114,18 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName { return queuePrioritizerClass; } + public void setNeedsSourceRelationships(boolean needsSourceRelationships) { + this.needsSourceRelationships = needsSourceRelationships; + } + @Override public List<String> getValidationIssues() { + String wrapperName = getWrapperName(); List<String> validationIssues = super.getValidationIssues(); - if (StringUtil.isNullOrEmpty(getSourceId())) { - validationIssues.add(getIssueText(SOURCE_ID_KEY, CONNECTIONS_KEY, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED)); - } - if (StringUtil.isNullOrEmpty(getDestinationId())) { - validationIssues.add(getIssueText(DESTINATION_ID_KEY, CONNECTIONS_KEY, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED)); + StringUtil.doIfNullOrEmpty(getSourceId(), id -> validationIssues.add(getIssueText(SOURCE_ID_KEY, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED))); + StringUtil.doIfNullOrEmpty(getDestinationId(), id -> validationIssues.add(getIssueText(DESTINATION_ID_KEY, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED))); + if (needsSourceRelationships && sourceRelationshipNames.isEmpty()) { + validationIssues.add("Expected at least one value in " + SOURCE_RELATIONSHIP_NAMES_KEY + " for " + wrapperName + " " + getName()); } return Collections.unmodifiableList(validationIssues); } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/PortSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/PortSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/PortSchema.java new file mode 100644 index 0000000..27c6afc --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/PortSchema.java @@ -0,0 +1,31 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName; + +import java.util.Map; + +public class PortSchema extends BaseSchemaWithIdAndName { + + public PortSchema(Map map, String wrapperName) { + super(map, wrapperName); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java new file mode 100644 index 0000000..88688fc --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java @@ -0,0 +1,153 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.common.StringUtil; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.OUTPUT_PORTS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY; + +public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements WritableSchema, ConvertableSchema<ProcessGroupSchema> { + + public static final String PROCESS_GROUPS_KEY = "Process Groups"; + public static final String ID_DEFAULT = "Root-Group"; + + private String comment; + private List<ProcessorSchema> processors; + private List<ConnectionSchema> connections; + private List<RemoteProcessingGroupSchema> remoteProcessingGroups; + private List<ProcessGroupSchema> processGroupSchemas; + private List<PortSchema> inputPortSchemas; + private List<PortSchema> outputPortSchemas; + + public ProcessGroupSchema(Map map, String wrapperName) { + super(map, wrapperName); + + processors = getOptionalKeyAsList(map, PROCESSORS_KEY, ProcessorSchema::new, wrapperName); + remoteProcessingGroups = getOptionalKeyAsList(map, REMOTE_PROCESSING_GROUPS_KEY, RemoteProcessingGroupSchema::new, wrapperName); + connections = getOptionalKeyAsList(map, CONNECTIONS_KEY, ConnectionSchema::new, wrapperName); + inputPortSchemas = getOptionalKeyAsList(map, INPUT_PORTS_KEY, m -> new PortSchema(m, "InputPort(id: {id}, name: {name})"), wrapperName); + outputPortSchemas = getOptionalKeyAsList(map, OUTPUT_PORTS_KEY, m -> new PortSchema(m, "OutputPort(id: {id}, name: {name})"), wrapperName); + processGroupSchemas = getOptionalKeyAsList(map, PROCESS_GROUPS_KEY, m -> new ProcessGroupSchema(m, "ProcessGroup(id: {id}, name: {name})"), wrapperName); + + if (ConfigSchema.TOP_LEVEL_NAME.equals(wrapperName)) { + if (inputPortSchemas.size() > 0) { + addValidationIssue(INPUT_PORTS_KEY, wrapperName, "must be empty in root group as external input/output ports are currently unsupported"); + } + if (outputPortSchemas.size() > 0) { + addValidationIssue(OUTPUT_PORTS_KEY, wrapperName, "must be empty in root group as external input/output ports are currently unsupported"); + } + } else if (ID_DEFAULT.equals(getId())) { + addValidationIssue(ID_KEY, wrapperName, "must be set to a value not " + ID_DEFAULT + " if not in root group"); + } + + Set<String> portIds = getPortIds(); + connections.stream().filter(c -> portIds.contains(c.getSourceId())).forEachOrdered(c -> c.setNeedsSourceRelationships(false)); + + addIssuesIfNotNull(processors); + addIssuesIfNotNull(remoteProcessingGroups); + addIssuesIfNotNull(processGroupSchemas); + addIssuesIfNotNull(connections); + } + + public Map<String, Object> toMap() { + Map<String, Object> result = mapSupplier.get(); + String id = getId(); + if (!ID_DEFAULT.equals(id)) { + result.put(ID_KEY, id); + } + StringUtil.doIfNotNullOrEmpty(getName(), name -> result.put(NAME_KEY, name)); + putListIfNotNull(result, PROCESSORS_KEY, processors); + putListIfNotNull(result, PROCESS_GROUPS_KEY, processGroupSchemas); + putListIfNotNull(result, INPUT_PORTS_KEY, inputPortSchemas); + putListIfNotNull(result, OUTPUT_PORTS_KEY, outputPortSchemas); + putListIfNotNull(result, CONNECTIONS_KEY, connections); + putListIfNotNull(result, REMOTE_PROCESSING_GROUPS_KEY, remoteProcessingGroups); + return result; + } + + public List<ProcessorSchema> getProcessors() { + return processors; + } + + public List<ConnectionSchema> getConnections() { + return connections; + } + + public List<RemoteProcessingGroupSchema> getRemoteProcessingGroups() { + return remoteProcessingGroups; + } + + public List<ProcessGroupSchema> getProcessGroupSchemas() { + return processGroupSchemas; + } + + public Set<String> getPortIds() { + Set<String> result = new HashSet<>(); + inputPortSchemas.stream().map(PortSchema::getId).forEachOrdered(result::add); + outputPortSchemas.stream().map(PortSchema::getId).forEachOrdered(result::add); + processGroupSchemas.stream().flatMap(p -> p.getPortIds().stream()).forEachOrdered(result::add); + return result; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + @Override + protected String getId(Map map, String wrapperName) { + return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, ID_DEFAULT); + } + + @Override + public ProcessGroupSchema convert() { + return this; + } + + @Override + public int getVersion() { + return ConfigSchema.CONFIG_VERSION; + } + + public List<PortSchema> getOutputPortSchemas() { + return outputPortSchemas; + } + + public List<PortSchema> getInputPortSchemas() { + return inputPortSchemas; + } +}
