MINIFI-30 Added changes to the config transformer to support multiple processors, multiple RPGs, multiple Input Ports in RPGS, and greater validation on the config yaml
This closes #17 Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/6f528bbc Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/6f528bbc Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/6f528bbc Branch: refs/heads/master Commit: 6f528bbc1454ee66f2fa26d7c01c8de28290f649 Parents: fb55481 Author: Joseph Percivall <[email protected]> Authored: Tue Apr 26 15:11:00 2016 -0400 Committer: Joseph Percivall <[email protected]> Committed: Tue Jun 14 15:27:10 2016 -0400 ---------------------------------------------------------------------- .../InvalidConfigurationException.java | 38 ++ .../bootstrap/util/ConfigTransformer.java | 484 ++++++++----------- .../schema/ComponentStatusRepositorySchema.java | 50 ++ .../bootstrap/util/schema/ConfigSchema.java | 157 ++++++ .../bootstrap/util/schema/ConnectionSchema.java | 93 ++++ .../util/schema/ContentRepositorySchema.java | 57 +++ .../util/schema/CorePropertiesSchema.java | 73 +++ .../util/schema/FlowControllerSchema.java | 51 ++ .../util/schema/FlowFileRepositorySchema.java | 68 +++ .../bootstrap/util/schema/ProcessorSchema.java | 127 +++++ .../util/schema/ProvenanceReportingSchema.java | 112 +++++ .../util/schema/ProvenanceRepositorySchema.java | 43 ++ .../util/schema/RemoteInputPortSchema.java | 72 +++ .../schema/RemoteProcessingGroupSchema.java | 88 ++++ .../util/schema/SecurityPropertiesSchema.java | 164 +++++++ .../util/schema/SensitivePropsSchema.java | 59 +++ .../bootstrap/util/schema/SwapSchema.java | 75 +++ .../util/schema/common/BaseSchema.java | 134 +++++ .../util/schema/common/CommonPropertyKeys.java | 49 ++ .../bootstrap/util/TestConfigTransformer.java | 122 ++++- .../src/test/resources/config-empty.yml | 6 +- .../test/resources/config-malformed-field.yml | 109 +++++ .../src/test/resources/config-minimal.yml | 35 ++ .../resources/config-missing-required-field.yml | 109 +++++ .../src/test/resources/config-multiple-RPGs.yml | 127 +++++ .../resources/config-multiple-input-ports.yml | 121 +++++ .../test/resources/config-multiple-problems.yml | 111 +++++ .../resources/config-multiple-processors.yml | 165 +++++++ minifi-bootstrap/src/test/resources/config.yml | 68 +-- minifi-bootstrap/src/test/resources/default.yml | 46 +- minifi-docs/Properties_Guide.md | 246 ++++++++-- .../src/main/resources/conf/config.yml | 46 +- 32 files changed, 2865 insertions(+), 440 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/exception/InvalidConfigurationException.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/exception/InvalidConfigurationException.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/exception/InvalidConfigurationException.java new file mode 100644 index 0000000..bdc51d4 --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/exception/InvalidConfigurationException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.exception; + +public class InvalidConfigurationException extends Exception { + + private static final long serialVersionUID = 1L; + + public InvalidConfigurationException() { + super(); + } + + public InvalidConfigurationException(final String message) { + super(message); + } + + public InvalidConfigurationException(final Throwable t) { + super(t); + } + + public InvalidConfigurationException(final String message, final Throwable t) { + super(message, t); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java index e7383ad..956f4dc 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java @@ -20,6 +20,22 @@ package org.apache.nifi.minifi.bootstrap.util; import org.apache.nifi.controller.FlowSerializationException; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; +import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException; +import org.apache.nifi.minifi.bootstrap.util.schema.ComponentStatusRepositorySchema; +import org.apache.nifi.minifi.bootstrap.util.schema.ConfigSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.ConnectionSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.ContentRepositorySchema; +import org.apache.nifi.minifi.bootstrap.util.schema.CorePropertiesSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.FlowControllerSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.FlowFileRepositorySchema; +import org.apache.nifi.minifi.bootstrap.util.schema.ProcessorSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.ProvenanceReportingSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.ProvenanceRepositorySchema; +import org.apache.nifi.minifi.bootstrap.util.schema.RemoteInputPortSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.RemoteProcessingGroupSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.SecurityPropertiesSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.SensitivePropsSchema; +import org.apache.nifi.minifi.bootstrap.util.schema.SwapSchema; import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.w3c.dom.DOMException; import org.w3c.dom.Document; @@ -50,96 +66,17 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.zip.GZIPOutputStream; public final class ConfigTransformer { - // Underlying version NIFI POC will be using + // Underlying version of NIFI will be using public static final String NIFI_VERSION = "0.6.0"; - public static final String NAME_KEY = "name"; - public static final String COMMENT_KEY = "comment"; - public static final String ALWAYS_SYNC_KEY = "always sync"; - public static final String YIELD_PERIOD_KEY = "yield period"; - public static final String MAX_CONCURRENT_TASKS_KEY = "max concurrent tasks"; - public static final String ID_KEY = "id"; - - public static final String FLOW_CONTROLLER_PROPS_KEY = "Flow Controller"; - - public static final String CORE_PROPS_KEY = "Core Properties"; - public static final String FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY = "flow controller graceful shutdown period"; - public static final String FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY = "flow service write delay interval"; - public static final String ADMINISTRATIVE_YIELD_DURATION_KEY = "administrative yield duration"; - public static final String BORED_YIELD_DURATION_KEY = "bored yield duration"; - - public static final String FLOWFILE_REPO_KEY = "FlowFile Repository"; - public static final String PARTITIONS_KEY = "partitions"; - public static final String CHECKPOINT_INTERVAL_KEY = "checkpoint interval"; - public static final String THRESHOLD_KEY = "queue swap threshold"; - public static final String SWAP_PROPS_KEY = "Swap"; - public static final String IN_PERIOD_KEY = "in period"; - public static final String IN_THREADS_KEY = "in threads"; - public static final String OUT_PERIOD_KEY = "out period"; - public static final String OUT_THREADS_KEY = "out threads"; - - - public static final String PROVENANCE_REPO_KEY = "Provenance Repository"; - public static final String PROVENANCE_REPO_ROLLOVER_TIME_KEY = "provenance rollover time"; - - public static final String CONTENT_REPO_KEY = "Content Repository"; - public static final String CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY = "content claim max appendable size"; - public static final String CONTENT_CLAIM_MAX_FLOW_FILES_KEY = "content claim max flow files"; - - public static final String COMPONENT_STATUS_REPO_KEY = "Component Status Repository"; - public static final String BUFFER_SIZE_KEY = "buffer size"; - public static final String SNAPSHOT_FREQUENCY_KEY = "snapshot frequency"; - - public static final String SECURITY_PROPS_KEY = "Security Properties"; - public static final String KEYSTORE_KEY = "keystore"; - public static final String KEYSTORE_TYPE_KEY = "keystore type"; - public static final String KEYSTORE_PASSWORD_KEY = "keystore password"; - public static final String KEY_PASSWORD_KEY = "key password"; - public static final String TRUSTSTORE_KEY = "truststore"; - public static final String TRUSTSTORE_TYPE_KEY = "truststore type"; - public static final String TRUSTSTORE_PASSWORD_KEY = "truststore password"; - public static final String SENSITIVE_PROPS_KEY = "Sensitive Props"; - public static final String SENSITIVE_PROPS_KEY__KEY = "key"; - public static final String SENSITIVE_PROPS_ALGORITHM_KEY = "algorithm"; - public static final String SENSITIVE_PROPS_PROVIDER_KEY = "provider"; - - public static final String PROCESSOR_CONFIG_KEY = "Processor Configuration"; - public static final String CLASS_KEY = "class"; - public static final String SCHEDULING_PERIOD_KEY = "scheduling period"; - public static final String PENALIZATION_PERIOD_KEY = "penalization period"; - public static final String SCHEDULING_STRATEGY_KEY = "scheduling strategy"; - public static final String RUN_DURATION_NANOS_KEY = "run duration nanos"; - public static final String AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY = "auto-terminated relationships list"; - - public static final String PROCESSOR_PROPS_KEY = "Properties"; - - public static final String CONNECTION_PROPS_KEY = "Connection Properties"; - public static final String MAX_WORK_QUEUE_SIZE_KEY = "max work queue size"; - public static final String MAX_WORK_QUEUE_DATA_SIZE_KEY = "max work queue data size"; - public static final String FLOWFILE_EXPIRATION__KEY = "flowfile expiration"; - public static final String QUEUE_PRIORITIZER_CLASS_KEY = "queue prioritizer class"; - - public static final String REMOTE_PROCESSING_GROUP_KEY = "Remote Processing Group"; - public static final String URL_KEY = "url"; - public static final String TIMEOUT_KEY = "timeout"; - - public static final String INPUT_PORT_KEY = "Input Port"; - public static final String USE_COMPRESSION_KEY = "use compression"; - - public static final String PROVENANCE_REPORTING_KEY = "Provenance Reporting"; - public static final String DESTINATION_URL_KEY = "destination url"; - public static final String PORT_NAME_KEY = "port name"; - public static final String ORIGINATING_URL_KEY = "originating url"; - public static final String BATCH_SIZE_KEY = "batch size"; - - public static final String SSL_PROTOCOL_KEY = "ssl protocol"; - // Final util classes should have private constructor - private ConfigTransformer() {} + private ConfigTransformer() { + } public static void transformConfigFile(String sourceFile, String destPath) throws Exception { File ymlConfigFile = new File(sourceFile); @@ -157,20 +94,24 @@ public final class ConfigTransformer { // Verify the parsed object is a Map structure if (loadedObject instanceof Map) { - final Map<String, Object> result = (Map<String, Object>) loadedObject; + final Map<String, Object> loadedMap = (Map<String, Object>) loadedObject; + ConfigSchema configSchema = new ConfigSchema(loadedMap); + if (!configSchema.isValid()) { + throw new InvalidConfigurationException("Failed to transform config file due to:" + configSchema.getValidationIssuesAsString()); + } // Create nifi.properties and flow.xml.gz in memory ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream(); - writeNiFiProperties(result, nifiPropertiesOutputStream); + writeNiFiProperties(configSchema, nifiPropertiesOutputStream); - DOMSource flowXml = createFlowXml(result); + DOMSource flowXml = createFlowXml(configSchema); // Write nifi.properties and flow.xml.gz writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath); writeFlowXmlFile(flowXml, destPath); } else { - throw new IllegalArgumentException("Provided YAML configuration is not a Map."); + throw new InvalidConfigurationException("Provided YAML configuration is not a Map"); } } finally { if (sourceStream != null) { @@ -180,12 +121,11 @@ public final class ConfigTransformer { } private static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException { - try { - final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties"); - FileOutputStream nifiProperties = new FileOutputStream(new File(nifiPropertiesPath.toString())); - nifiProperties.write(nifiPropertiesOutputStream.getUnderlyingBuffer()); + final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties"); + try (FileOutputStream nifiProperties = new FileOutputStream(nifiPropertiesPath.toString())) { + nifiPropertiesOutputStream.writeTo(nifiProperties); } finally { - if (nifiPropertiesOutputStream != null){ + if (nifiPropertiesOutputStream != null) { nifiPropertiesOutputStream.flush(); nifiPropertiesOutputStream.close(); } @@ -210,32 +150,32 @@ public final class ConfigTransformer { outStream.close(); } - private static void writeNiFiProperties(Map<String, Object> topLevelYaml, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException { + private static void writeNiFiProperties(ConfigSchema configSchema, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException { PrintWriter writer = null; try { writer = new PrintWriter(outputStream, true); - Map<String, Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY); - Map<String, Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY); - Map<String, Object> swapProperties = (Map<String, Object>) flowfileRepo.get(SWAP_PROPS_KEY); - Map<String, Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY); - Map<String, Object> provenanceRepo = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPO_KEY); - Map<String, Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY); - Map<String, Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY); - Map<String, Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY); + CorePropertiesSchema coreProperties = configSchema.getCoreProperties(); + FlowFileRepositorySchema flowfileRepoSchema = configSchema.getFlowfileRepositoryProperties(); + SwapSchema swapProperties = flowfileRepoSchema.getSwapProperties(); + ContentRepositorySchema contentRepoProperties = configSchema.getContentRepositoryProperties(); + ComponentStatusRepositorySchema componentStatusRepoProperties = configSchema.getComponentStatusRepositoryProperties(); + SecurityPropertiesSchema securityProperties = configSchema.getSecurityProperties(); + SensitivePropsSchema sensitiveProperties = securityProperties.getSensitiveProps(); + ProvenanceRepositorySchema provenanceRepositorySchema = configSchema.getProvenanceRepositorySchema(); writer.print(PROPERTIES_FILE_APACHE_2_0_LICENSE); writer.println("# Core Properties #"); writer.println(); - writer.println("nifi.version="+NIFI_VERSION); + writer.println("nifi.version=" + NIFI_VERSION); writer.println("nifi.flow.configuration.file=./conf/flow.xml.gz"); writer.println("nifi.flow.configuration.archive.dir=./conf/archive/"); writer.println("nifi.flowcontroller.autoResumeState=true"); - writer.println("nifi.flowcontroller.graceful.shutdown.period=" + getValueString(coreProperties, FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY)); - writer.println("nifi.flowservice.writedelay.interval=" + getValueString(coreProperties, FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY)); - writer.println("nifi.administrative.yield.duration=" + getValueString(coreProperties, ADMINISTRATIVE_YIELD_DURATION_KEY)); + writer.println("nifi.flowcontroller.graceful.shutdown.period=" + coreProperties.getFlowControllerGracefulShutdownPeriod()); + writer.println("nifi.flowservice.writedelay.interval=" + coreProperties.getFlowServiceWriteDelayInterval()); + writer.println("nifi.administrative.yield.duration=" + coreProperties.getAdministrativeYieldDuration()); writer.println("# If a component has no work to do (is \"bored\"), how long should we wait before checking again for work?"); - writer.println("nifi.bored.yield.duration=" + getValueString(coreProperties, BORED_YIELD_DURATION_KEY)); + writer.println("nifi.bored.yield.duration=" + coreProperties.getBoredYieldDuration()); writer.println(); writer.println("nifi.authority.provider.configuration.file=./conf/authority-providers.xml"); writer.println("nifi.login.identity.provider.configuration.file=./conf/login-identity-providers.xml"); @@ -260,38 +200,38 @@ public final class ConfigTransformer { writer.println("# FlowFile Repository"); writer.println("nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"); writer.println("nifi.flowfile.repository.directory=./flowfile_repository"); - writer.println("nifi.flowfile.repository.partitions=" + getValueString(flowfileRepo, PARTITIONS_KEY)); - writer.println("nifi.flowfile.repository.checkpoint.interval=" + getValueString(flowfileRepo,CHECKPOINT_INTERVAL_KEY)); - writer.println("nifi.flowfile.repository.always.sync=" + getValueString(flowfileRepo,ALWAYS_SYNC_KEY)); + writer.println("nifi.flowfile.repository.partitions=" + flowfileRepoSchema.getPartitions()); + writer.println("nifi.flowfile.repository.checkpoint.interval=" + flowfileRepoSchema.getCheckpointInterval()); + writer.println("nifi.flowfile.repository.always.sync=" + flowfileRepoSchema.getAlwaysSync()); writer.println(); writer.println("nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager"); - writer.println("nifi.queue.swap.threshold=" + getValueString(swapProperties, THRESHOLD_KEY)); - writer.println("nifi.swap.in.period=" + getValueString(swapProperties, IN_PERIOD_KEY)); - writer.println("nifi.swap.in.threads=" + getValueString(swapProperties, IN_THREADS_KEY)); - writer.println("nifi.swap.out.period=" + getValueString(swapProperties, OUT_PERIOD_KEY)); - writer.println("nifi.swap.out.threads=" + getValueString(swapProperties, OUT_THREADS_KEY)); + writer.println("nifi.queue.swap.threshold=" + swapProperties.getThreshold()); + writer.println("nifi.swap.in.period=" + swapProperties.getInPeriod()); + writer.println("nifi.swap.in.threads=" + swapProperties.getInThreads()); + writer.println("nifi.swap.out.period=" + swapProperties.getOutPeriod()); + writer.println("nifi.swap.out.threads=" + swapProperties.getOutThreads()); writer.println(); writer.println("# Content Repository"); writer.println("nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository"); - writer.println("nifi.content.claim.max.appendable.size=" + getValueString(contentRepo, CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY)); - writer.println("nifi.content.claim.max.flow.files=" + getValueString(contentRepo, CONTENT_CLAIM_MAX_FLOW_FILES_KEY)); + writer.println("nifi.content.claim.max.appendable.size=" + contentRepoProperties.getContentClaimMaxAppendableSize()); + writer.println("nifi.content.claim.max.flow.files=" + contentRepoProperties.getContentClaimMaxFlowFiles()); writer.println("nifi.content.repository.archive.max.retention.period="); writer.println("nifi.content.repository.archive.max.usage.percentage="); writer.println("nifi.content.repository.archive.enabled=false"); writer.println("nifi.content.repository.directory.default=./content_repository"); - writer.println("nifi.content.repository.always.sync=" + getValueString(contentRepo, ALWAYS_SYNC_KEY)); + writer.println("nifi.content.repository.always.sync=" + contentRepoProperties.getAlwaysSync()); writer.println(); writer.println("# Provenance Repository Properties"); writer.println("nifi.provenance.repository.implementation=org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository"); - writer.println("nifi.provenance.repository.rollover.time=" + getValueString(provenanceRepo, PROVENANCE_REPO_ROLLOVER_TIME_KEY)); + writer.println("nifi.provenance.repository.rollover.time=" + provenanceRepositorySchema.getProvenanceRepoRolloverTimeKey()); writer.println(); writer.println("# Volatile Provenance Respository Properties"); - writer.println("nifi.provenance.repository.buffer.size=100000"); + writer.println("nifi.provenance.repository.buffer.size=10000"); writer.println(); writer.println("# Component Status Repository"); writer.println("nifi.components.status.repository.implementation=org.apache.nifi.controller.status.history.VolatileComponentStatusRepository"); - writer.println("nifi.components.status.repository.buffer.size=" + getValueString(componentStatusRepo, BUFFER_SIZE_KEY)); - writer.println("nifi.components.status.snapshot.frequency=" + getValueString(componentStatusRepo, SNAPSHOT_FREQUENCY_KEY)); + writer.println("nifi.components.status.repository.buffer.size=" + componentStatusRepoProperties.getBufferSize()); + writer.println("nifi.components.status.snapshot.frequency=" + componentStatusRepoProperties.getSnapshotFrequency()); writer.println(); writer.println("# web properties #"); writer.println("nifi.web.war.directory=./lib"); @@ -303,17 +243,17 @@ public final class ConfigTransformer { writer.println("nifi.web.jetty.threads=200"); writer.println(); writer.println("# security properties #"); - writer.println("nifi.sensitive.props.key=" + getValueString(sensitiveProperties, SENSITIVE_PROPS_KEY__KEY)); - writer.println("nifi.sensitive.props.algorithm=" + getValueString(sensitiveProperties, SENSITIVE_PROPS_ALGORITHM_KEY)); - writer.println("nifi.sensitive.props.provider=" + getValueString(sensitiveProperties, SENSITIVE_PROPS_PROVIDER_KEY)); + writer.println("nifi.sensitive.props.key=" + sensitiveProperties.getKey()); + writer.println("nifi.sensitive.props.algorithm=" + sensitiveProperties.getAlgorithm()); + writer.println("nifi.sensitive.props.provider=" + sensitiveProperties.getProvider()); writer.println(); - writer.println("nifi.security.keystore=" + getValueString(securityProperties, KEYSTORE_KEY)); - writer.println("nifi.security.keystoreType=" + getValueString(securityProperties, KEYSTORE_TYPE_KEY)); - writer.println("nifi.security.keystorePasswd=" + getValueString(securityProperties, KEYSTORE_PASSWORD_KEY)); - writer.println("nifi.security.keyPasswd=" + getValueString(securityProperties, KEY_PASSWORD_KEY)); - writer.println("nifi.security.truststore=" + getValueString(securityProperties, TRUSTSTORE_KEY)); - writer.println("nifi.security.truststoreType=" + getValueString(securityProperties, TRUSTSTORE_TYPE_KEY)); - writer.println("nifi.security.truststorePasswd=" + getValueString(securityProperties, TRUSTSTORE_PASSWORD_KEY)); + writer.println("nifi.security.keystore=" + securityProperties.getKeystore()); + writer.println("nifi.security.keystoreType=" + securityProperties.getKeystoreType()); + writer.println("nifi.security.keystorePasswd=" + securityProperties.getKeystorePassword()); + writer.println("nifi.security.keyPasswd=" + securityProperties.getKeyPassword()); + writer.println("nifi.security.truststore=" + securityProperties.getTruststore()); + writer.println("nifi.security.truststoreType=" + securityProperties.getTruststoreType()); + writer.println("nifi.security.truststorePasswd=" + securityProperties.getTruststorePassword()); writer.println("nifi.security.needClientAuth="); writer.println("nifi.security.user.credential.cache.duration=24 hours"); writer.println("nifi.security.user.authority.provider=file-provider"); @@ -333,13 +273,14 @@ public final class ConfigTransformer { } catch (NullPointerException e) { throw new ConfigurationChangeException("Failed to parse the config YAML while creating the nifi.properties", e); } finally { - if (writer != null){ + if (writer != null) { writer.flush(); writer.close(); } } } - private static DOMSource createFlowXml(Map<String, Object> topLevelYaml) throws IOException, ConfigurationChangeException { + + private static DOMSource createFlowXml(ConfigSchema configSchema) throws IOException, ConfigurationChangeException { try { // create a new, empty document final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); @@ -351,50 +292,34 @@ public final class ConfigTransformer { // populate document with controller state final Element rootNode = doc.createElement("flowController"); doc.appendChild(rootNode); - Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY); - addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1")); - addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1")); - addProcessGroup(rootNode, topLevelYaml, "rootGroup"); - - Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY); - if (securityProps != null) { - String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY); - if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) { - final Element controllerServicesNode = doc.createElement("controllerServices"); - rootNode.appendChild(controllerServicesNode); - addSSLControllerService(controllerServicesNode, securityProps); - } + CorePropertiesSchema coreProperties = configSchema.getCoreProperties(); + addTextElement(rootNode, "maxTimerDrivenThreadCount", String.valueOf(coreProperties.getMaxConcurrentThreads())); + addTextElement(rootNode, "maxEventDrivenThreadCount", String.valueOf(coreProperties.getMaxConcurrentThreads())); + addProcessGroup(rootNode, configSchema, "rootGroup"); + + SecurityPropertiesSchema securityProperties = configSchema.getSecurityProperties(); + if (securityProperties.useSSL()) { + final Element controllerServicesNode = doc.createElement("controllerServices"); + rootNode.appendChild(controllerServicesNode); + addSSLControllerService(controllerServicesNode, securityProperties); } - Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY); - if (provenanceProperties.get(SCHEDULING_STRATEGY_KEY) != null) { + ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties(); + if (provenanceProperties != null) { final Element reportingTasksNode = doc.createElement("reportingTasks"); rootNode.appendChild(reportingTasksNode); - addProvenanceReportingTask(reportingTasksNode, topLevelYaml); + addProvenanceReportingTask(reportingTasksNode, configSchema); } return new DOMSource(doc); } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) { throw new FlowSerializationException(e); - } catch (Exception e){ + } catch (Exception e) { throw new ConfigurationChangeException("Failed to parse the config YAML while writing the top level of the flow xml", e); } } - private static <K> String getValueString(Map<K,Object> map, K key){ - Object value = map.get(key); - return value == null ? "" : value.toString(); - } - - private static <K> String getValueString(Map<K,Object> map, K key, String theDefault){ - Object value = null; - if (map != null){ - value = map.get(key); - } - return value == null ? theDefault : value.toString(); - } - - private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) throws ConfigurationChangeException { + private static void addSSLControllerService(final Element element, SecurityPropertiesSchema securityProperties) throws ConfigurationChangeException { try { final Element serviceElement = element.getOwnerDocument().createElement("controllerService"); addTextElement(serviceElement, "id", "SSL-Context-Service"); @@ -405,126 +330,133 @@ public final class ConfigTransformer { addTextElement(serviceElement, "enabled", "true"); Map<String, Object> attributes = new HashMap<>(); - attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY)); - attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY)); - attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY)); - attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY)); - attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY)); - attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY)); - attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY)); + attributes.put("Keystore Filename", securityProperties.getKeystore()); + attributes.put("Keystore Type", securityProperties.getKeystoreType()); + attributes.put("Keystore Password", securityProperties.getKeyPassword()); + attributes.put("Truststore Filename", securityProperties.getTruststore()); + attributes.put("Truststore Type", securityProperties.getTruststoreType()); + attributes.put("Truststore Password", securityProperties.getTruststorePassword()); + attributes.put("SSL Protocol", securityProperties.getSslProtocol()); addConfiguration(serviceElement, attributes); element.appendChild(serviceElement); - } catch (Exception e){ + } catch (Exception e) { throw new ConfigurationChangeException("Failed to parse the config YAML while trying to create an SSL Controller Service", e); } } - private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) throws ConfigurationChangeException { + private static void addProcessGroup(final Element parentElement, ConfigSchema configSchema, final String elementName) throws ConfigurationChangeException { try { - Map<String, Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY); + FlowControllerSchema flowControllerProperties = configSchema.getFlowControllerProperties(); final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement(elementName); parentElement.appendChild(element); addTextElement(element, "id", "Root-Group"); - addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY)); + addTextElement(element, "name", flowControllerProperties.getName()); addPosition(element); - addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY)); + addTextElement(element, "comment", flowControllerProperties.getComment()); - Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY); - addProcessor(element, processorConfig); + List<ProcessorSchema> processors = configSchema.getProcessors(); + if (processors != null) { + for (ProcessorSchema processorConfig : processors) { + addProcessor(element, processorConfig); + } + } - Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY); - addRemoteProcessGroup(element, remoteProcessingGroup); + List<RemoteProcessingGroupSchema> remoteProcessingGroups = configSchema.getRemoteProcessingGroups(); + if (remoteProcessingGroups != null) { + for (RemoteProcessingGroupSchema remoteProcessingGroupSchema: remoteProcessingGroups) { + addRemoteProcessGroup(element, remoteProcessingGroupSchema); + } + } - addConnection(element, topLevelYaml); - } catch (ConfigurationChangeException e){ + List<ConnectionSchema> connections = configSchema.getConnections(); + if (connections != null) { + for (ConnectionSchema connectionConfig : connections) { + addConnection(element, connectionConfig, configSchema); + } + } + } catch (ConfigurationChangeException e) { throw e; - } catch (Exception e){ + } catch (Exception e) { throw new ConfigurationChangeException("Failed to parse the config YAML while trying to creating the root Process Group", e); } } - private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) throws ConfigurationChangeException { - + private static void addProcessor(final Element parentElement, ProcessorSchema processorConfig) throws ConfigurationChangeException { try { - if (processorConfig.get(CLASS_KEY) == null) { - // Only add a processor if it has a class - return; - } - final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement("processor"); parentElement.appendChild(element); - addTextElement(element, "id", "Processor"); - addTextElement(element, "name", getValueString(processorConfig, NAME_KEY)); + addTextElement(element, "id", processorConfig.getName()); + addTextElement(element, "name", processorConfig.getName()); addPosition(element); addStyle(element); - addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY)); - addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY)); - addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY)); - addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY)); - addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY)); - addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY)); + addTextElement(element, "comment", ""); + addTextElement(element, "class", processorConfig.getProcessorClass()); + addTextElement(element, "maxConcurrentTasks", String.valueOf(processorConfig.getMaxConcurrentTasks())); + addTextElement(element, "schedulingPeriod", processorConfig.getSchedulingPeriod()); + addTextElement(element, "penalizationPeriod", processorConfig.getPenalizationPeriod()); + addTextElement(element, "yieldPeriod", processorConfig.getYieldPeriod()); addTextElement(element, "bulletinLevel", "WARN"); addTextElement(element, "lossTolerant", "false"); addTextElement(element, "scheduledState", "RUNNING"); - addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY)); - addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY)); + addTextElement(element, "schedulingStrategy", processorConfig.getSchedulingStrategy()); + addTextElement(element, "runDurationNanos", String.valueOf(processorConfig.getRunDurationNanos())); - addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY)); + addConfiguration(element, processorConfig.getProperties()); - Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY); + Collection<String> autoTerminatedRelationships = processorConfig.getAutoTerminatedRelationshipsList(); if (autoTerminatedRelationships != null) { for (String rel : autoTerminatedRelationships) { addTextElement(element, "autoTerminatedRelationship", rel); } } - } catch (Exception e){ - throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Processor", e); + } catch (Exception e) { + throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add a Processor", e); } } - private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) throws ConfigurationChangeException { + private static void addProvenanceReportingTask(final Element element, ConfigSchema configSchema) throws ConfigurationChangeException { try { - Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY); + ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties(); final Element taskElement = element.getOwnerDocument().createElement("reportingTask"); addTextElement(taskElement, "id", "Provenance-Reporting"); addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting"); - addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY)); + addTextElement(taskElement, "comment", provenanceProperties.getComment()); addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask"); - addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY)); + addTextElement(taskElement, "schedulingPeriod", provenanceProperties.getSchedulingPeriod()); addTextElement(taskElement, "scheduledState", "RUNNING"); - addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY)); + addTextElement(taskElement, "schedulingStrategy", provenanceProperties.getSchedulingStrategy()); Map<String, Object> attributes = new HashMap<>(); - attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY)); - attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY)); - attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY)); - attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY)); - attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY)); - - Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY); - String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY); - if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) { + attributes.put("Destination URL", provenanceProperties.getDestinationUrl()); + attributes.put("Input Port Name", provenanceProperties.getPortName()); + attributes.put("MiNiFi URL", provenanceProperties.getOriginatingUrl()); + attributes.put("Compress Events", provenanceProperties.getUseCompression()); + attributes.put("Batch Size", provenanceProperties.getBatchSize()); + attributes.put("Communications Timeout", provenanceProperties.getTimeout()); + + SecurityPropertiesSchema securityProps = configSchema.getSecurityProperties(); + if (securityProps.useSSL()) { attributes.put("SSL Context Service", "SSL-Context-Service"); } addConfiguration(taskElement, attributes); element.appendChild(taskElement); - } catch (Exception e){ + } catch (Exception e) { throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e); } } private static void addConfiguration(final Element element, Map<String, Object> elementConfig) { final Document doc = element.getOwnerDocument(); - if (elementConfig == null){ + if (elementConfig == null) { return; } for (final Map.Entry<String, Object> entry : elementConfig.entrySet()) { @@ -544,76 +476,57 @@ public final class ConfigTransformer { parentElement.appendChild(element); } - private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) throws ConfigurationChangeException { + private static void addRemoteProcessGroup(final Element parentElement, RemoteProcessingGroupSchema remoteProcessingGroupProperties) throws ConfigurationChangeException { try { - if (remoteProcessingGroup.get(URL_KEY) == null) { - // Only add an an RPG if it has a URL - return; - } - final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement("remoteProcessGroup"); parentElement.appendChild(element); - addTextElement(element, "id", "Remote-Process-Group"); - addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY)); + addTextElement(element, "id", remoteProcessingGroupProperties.getName()); + addTextElement(element, "name", remoteProcessingGroupProperties.getName()); addPosition(element); - addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY)); - addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY)); - addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY)); - addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY)); + addTextElement(element, "comment", remoteProcessingGroupProperties.getComment()); + addTextElement(element, "url", remoteProcessingGroupProperties.getUrl()); + addTextElement(element, "timeout", remoteProcessingGroupProperties.getTimeout()); + addTextElement(element, "yieldPeriod", remoteProcessingGroupProperties.getYieldPeriod()); addTextElement(element, "transmitting", "true"); - Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY); - addRemoteGroupPort(element, inputPort, "inputPort"); + List<RemoteInputPortSchema> remoteInputPorts = remoteProcessingGroupProperties.getInputPorts(); + for(RemoteInputPortSchema remoteInputPortSchema: remoteInputPorts) { + addRemoteGroupPort(element, remoteInputPortSchema); + } parentElement.appendChild(element); - } catch (Exception e){ + } catch (Exception e) { throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Remote Process Group", e); } } - private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) throws ConfigurationChangeException { - + private static void addRemoteGroupPort(final Element parentElement, RemoteInputPortSchema inputPort) throws ConfigurationChangeException { try { - if (inputPort.get(ID_KEY) == null) { - // Only add an input port if it has an ID - return; - } - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement(elementName); + final Element element = doc.createElement("inputPort"); parentElement.appendChild(element); - addTextElement(element, "id", getValueString(inputPort, ID_KEY)); - addTextElement(element, "name", getValueString(inputPort, NAME_KEY)); + addTextElement(element, "id", inputPort.getId()); + addTextElement(element, "name", inputPort.getName()); addPosition(element); - addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY)); + addTextElement(element, "comments", inputPort.getComment()); addTextElement(element, "scheduledState", "RUNNING"); - addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY)); - addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY)); + addTextElement(element, "maxConcurrentTasks", String.valueOf(inputPort.getMax_concurrent_tasks())); + addTextElement(element, "useCompression", String.valueOf(inputPort.getUseCompression())); parentElement.appendChild(element); - } catch (Exception e){ + } catch (Exception e) { throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the input port of the Remote Process Group", e); } } - private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) throws ConfigurationChangeException { + private static void addConnection(final Element parentElement, ConnectionSchema connectionProperties, ConfigSchema configSchema) throws ConfigurationChangeException { try { - Map<String, Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY); - Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY); - Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY); - Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY); - - if (inputPort.get(ID_KEY) == null || processorConfig.get(CLASS_KEY) == null) { - // Only add the connection if the input port and processor config are created - return; - } - final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement("connection"); parentElement.appendChild(element); - addTextElement(element, "id", "Connection"); - addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY)); + addTextElement(element, "id", connectionProperties.getName()); + addTextElement(element, "name", connectionProperties.getName()); final Element bendPointsElement = doc.createElement("bendPoints"); element.appendChild(bendPointsElement); @@ -621,28 +534,55 @@ public final class ConfigTransformer { addTextElement(element, "labelIndex", "1"); addTextElement(element, "zIndex", "0"); - addTextElement(element, "sourceId", "Processor"); + addTextElement(element, "sourceId", connectionProperties.getSourceName()); addTextElement(element, "sourceGroupId", "Root-Group"); addTextElement(element, "sourceType", "PROCESSOR"); - addTextElement(element, "destinationId", getValueString(inputPort, ID_KEY)); - addTextElement(element, "destinationGroupId", "Remote-Process-Group"); - addTextElement(element, "destinationType", "REMOTE_INPUT_PORT"); + addTextElement(element, "destinationId", connectionProperties.getDestinationName()); - addTextElement(element, "relationship", "success"); + if (isInputPortId(connectionProperties.getDestinationName(), configSchema)) { + addTextElement(element, "destinationGroupId", "Remote-Process-Group"); + addTextElement(element, "destinationType", "REMOTE_INPUT_PORT"); + } else { + addTextElement(element, "destinationGroupId", "Root-Group"); + addTextElement(element, "destinationType", "PROCESSOR"); + } + + addTextElement(element, "relationship", connectionProperties.getSourceRelationshipName()); - addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY)); - addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY)); + addTextElement(element, "maxWorkQueueSize", String.valueOf(connectionProperties.getMaxWorkQueueSize())); + addTextElement(element, "maxWorkQueueDataSize", connectionProperties.getMaxWorkQueueDataSize()); - addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY)); - addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY)); + addTextElement(element, "flowFileExpiration", connectionProperties.getFlowfileExpiration()); + addTextElement(element, "queuePrioritizerClass", connectionProperties.getQueuePrioritizerClass()); parentElement.appendChild(element); - } catch (Exception e){ + } catch (Exception e) { throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the connection from the Processor to the input port of the Remote Process Group", e); } } + private static boolean isInputPortId(String id, ConfigSchema configSchema) { + boolean isInputPortId = false; + try { + List<RemoteProcessingGroupSchema> remoteProcessingGroups = configSchema.getRemoteProcessingGroups(); + if (remoteProcessingGroups != null) { + for (RemoteProcessingGroupSchema remoteProcessingGroupSchema: remoteProcessingGroups) { + List<RemoteInputPortSchema> remoteInputPorts = remoteProcessingGroupSchema.getInputPorts(); + for (RemoteInputPortSchema remoteInputPortSchema: remoteInputPorts) { + if (remoteInputPortSchema != null && id.equals(remoteInputPortSchema.getId())) { + isInputPortId = true; + break; + } + } + } + } + } catch (Exception e) { + // If an exception was thrown then it isn't the InputPort + } + return isInputPortId; + } + private static void addPosition(final Element parentElement) { final Element element = parentElement.getOwnerDocument().createElement("position"); element.setAttribute("x", String.valueOf("0")); http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ComponentStatusRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ComponentStatusRepositorySchema.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ComponentStatusRepositorySchema.java new file mode 100644 index 0000000..5b6ac2e --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ComponentStatusRepositorySchema.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.util.schema; + +import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; + +/** + * + */ +public class ComponentStatusRepositorySchema extends BaseSchema { + public static final String BUFFER_SIZE_KEY = "buffer size"; + public static final String SNAPSHOT_FREQUENCY_KEY = "snapshot frequency"; + + private Number bufferSize = 1440; + private String snapshotFrequency = "1 min"; + + public ComponentStatusRepositorySchema() { + } + + public ComponentStatusRepositorySchema(Map map) { + bufferSize = getOptionalKeyAsType(map, BUFFER_SIZE_KEY, Number.class, COMPONENT_STATUS_REPO_KEY, 1440); + snapshotFrequency = getOptionalKeyAsType(map, SNAPSHOT_FREQUENCY_KEY, String.class, COMPONENT_STATUS_REPO_KEY, "1 min"); + } + + public Number getBufferSize() { + return bufferSize; + } + + public String getSnapshotFrequency() { + return snapshotFrequency; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConfigSchema.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConfigSchema.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConfigSchema.java new file mode 100644 index 0000000..6d3d7ca --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConfigSchema.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.util.schema; + +import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema; + +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CORE_PROPS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.PROCESSORS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY; + +/** + * + */ +public class ConfigSchema extends BaseSchema { + public static String TOP_LEVEL_NAME = "top level"; + + private FlowControllerSchema flowControllerProperties; + private CorePropertiesSchema coreProperties; + private FlowFileRepositorySchema flowfileRepositoryProperties; + private ContentRepositorySchema contentRepositoryProperties; + private ComponentStatusRepositorySchema componentStatusRepositoryProperties; + private SecurityPropertiesSchema securityProperties; + private List<ProcessorSchema> processors; + private List<ConnectionSchema> connections; + private List<RemoteProcessingGroupSchema> remoteProcessingGroups; + private ProvenanceReportingSchema provenanceReportingProperties; + + private ProvenanceRepositorySchema provenanceRepositorySchema; + + public ConfigSchema() { + } + + public ConfigSchema(Map map) { + Object obj ; + flowControllerProperties = getMapAsType(map, FLOW_CONTROLLER_PROPS_KEY, FlowControllerSchema.class, TOP_LEVEL_NAME, true); + + coreProperties = getMapAsType(map, CORE_PROPS_KEY, CorePropertiesSchema.class, TOP_LEVEL_NAME, false); + flowfileRepositoryProperties = getMapAsType(map, FLOWFILE_REPO_KEY, FlowFileRepositorySchema.class, TOP_LEVEL_NAME, false); + contentRepositoryProperties = getMapAsType(map, CONTENT_REPO_KEY, ContentRepositorySchema.class, TOP_LEVEL_NAME, false); + provenanceRepositorySchema = getMapAsType(map, PROVENANCE_REPO_KEY, ProvenanceRepositorySchema.class, TOP_LEVEL_NAME, false); + componentStatusRepositoryProperties = getMapAsType(map, COMPONENT_STATUS_REPO_KEY, ComponentStatusRepositorySchema.class, TOP_LEVEL_NAME, false); + securityProperties = getMapAsType(map, SECURITY_PROPS_KEY, SecurityPropertiesSchema.class, TOP_LEVEL_NAME, false); + + processors = getOptionalKeyAsType(map, PROCESSORS_KEY, List.class, TOP_LEVEL_NAME, null); + if (processors != null) { + transformListToType(processors, "processor", ProcessorSchema.class, PROCESSORS_KEY); + } + + connections = getOptionalKeyAsType(map, CONNECTIONS_KEY, List.class, TOP_LEVEL_NAME, null); + if (connections != null) { + transformListToType(connections, "connection", ConnectionSchema.class, CONNECTIONS_KEY); + } + + remoteProcessingGroups = getOptionalKeyAsType(map, REMOTE_PROCESSING_GROUPS_KEY, List.class, TOP_LEVEL_NAME, null); + if (remoteProcessingGroups != null) { + transformListToType(remoteProcessingGroups, "remote processing group", RemoteProcessingGroupSchema.class, REMOTE_PROCESSING_GROUPS_KEY); + } + + provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false); + + addIssuesIfNotNull(flowControllerProperties); + addIssuesIfNotNull(coreProperties); + addIssuesIfNotNull(flowfileRepositoryProperties); + addIssuesIfNotNull(contentRepositoryProperties); + addIssuesIfNotNull(componentStatusRepositoryProperties); + addIssuesIfNotNull(securityProperties); + addIssuesIfNotNull(provenanceReportingProperties); + addIssuesIfNotNull(provenanceRepositorySchema); + + if (processors != null) { + for (ProcessorSchema processorSchema : processors) { + addIssuesIfNotNull(processorSchema); + } + } + + if (connections != null) { + for (ConnectionSchema connectionSchema : connections) { + addIssuesIfNotNull(connectionSchema); + } + } + + if (remoteProcessingGroups != null) { + for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : remoteProcessingGroups) { + addIssuesIfNotNull(remoteProcessingGroupSchema); + } + } + } + + public FlowControllerSchema getFlowControllerProperties() { + return flowControllerProperties; + } + + public CorePropertiesSchema getCoreProperties() { + return coreProperties; + } + + public FlowFileRepositorySchema getFlowfileRepositoryProperties() { + return flowfileRepositoryProperties; + } + + public ContentRepositorySchema getContentRepositoryProperties() { + return contentRepositoryProperties; + } + + public SecurityPropertiesSchema getSecurityProperties() { + return securityProperties; + } + + public List<ProcessorSchema> getProcessors() { + return processors; + } + + public List<ConnectionSchema> getConnections() { + return connections; + } + + public List<RemoteProcessingGroupSchema> getRemoteProcessingGroups() { + return remoteProcessingGroups; + } + + public ProvenanceReportingSchema getProvenanceReportingProperties() { + return provenanceReportingProperties; + } + + public ComponentStatusRepositorySchema getComponentStatusRepositoryProperties() { + return componentStatusRepositoryProperties; + } + + public ProvenanceRepositorySchema getProvenanceRepositorySchema() { + return provenanceRepositorySchema; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConnectionSchema.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConnectionSchema.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConnectionSchema.java new file mode 100644 index 0000000..f7024f8 --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConnectionSchema.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.util.schema; + +import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.NAME_KEY; + +/** + * + */ +public class ConnectionSchema extends BaseSchema { + public static final String SOURCE_NAME_KEY = "source name"; + public static final String SOURCE_RELATIONSHIP_NAME_KEY = "source relationship name"; + public static final String DESTINATION_NAME_KEY = "destination name"; + public static final String MAX_WORK_QUEUE_SIZE_KEY = "max work queue size"; + public static final String MAX_WORK_QUEUE_DATA_SIZE_KEY = "max work queue data size"; + public static final String FLOWFILE_EXPIRATION__KEY = "flowfile expiration"; + public static final String QUEUE_PRIORITIZER_CLASS_KEY = "queue prioritizer class"; + + private String name; + private String sourceName; + private String sourceRelationshipName; + private String destinationName; + private Number maxWorkQueueSize = 0; + private String maxWorkQueueDataSize = "0 MB"; + private String flowfileExpiration = "0 sec"; + private String queuePrioritizerClass = ""; + + public ConnectionSchema() { + } + + public ConnectionSchema(Map map) { + name = getRequiredKeyAsType(map, NAME_KEY, String.class, CONNECTIONS_KEY); + sourceName = getRequiredKeyAsType(map, SOURCE_NAME_KEY, String.class, CONNECTIONS_KEY); + sourceRelationshipName = getRequiredKeyAsType(map, SOURCE_RELATIONSHIP_NAME_KEY, String.class, CONNECTIONS_KEY); + destinationName = getRequiredKeyAsType(map, DESTINATION_NAME_KEY, String.class, CONNECTIONS_KEY); + + maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, CONNECTIONS_KEY, 0); + maxWorkQueueDataSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, CONNECTIONS_KEY, "0 MB"); + flowfileExpiration = getOptionalKeyAsType(map, FLOWFILE_EXPIRATION__KEY, String.class, CONNECTIONS_KEY, "0 sec"); + queuePrioritizerClass = getOptionalKeyAsType(map, QUEUE_PRIORITIZER_CLASS_KEY, String.class, CONNECTIONS_KEY, ""); + } + + public String getName() { + return name; + } + + public String getSourceName() { + return sourceName; + } + + public String getSourceRelationshipName() { + return sourceRelationshipName; + } + + public String getDestinationName() { + return destinationName; + } + + public Number getMaxWorkQueueSize() { + return maxWorkQueueSize; + } + + public String getMaxWorkQueueDataSize() { + return maxWorkQueueDataSize; + } + + public String getFlowfileExpiration() { + return flowfileExpiration; + } + + public String getQueuePrioritizerClass() { + return queuePrioritizerClass; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ContentRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ContentRepositorySchema.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ContentRepositorySchema.java new file mode 100644 index 0000000..1a4d07d --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ContentRepositorySchema.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.util.schema; + +import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.ALWAYS_SYNC_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY; + +/** + * + */ +public class ContentRepositorySchema extends BaseSchema { + public static final String CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY = "content claim max appendable size"; + public static final String CONTENT_CLAIM_MAX_FLOW_FILES_KEY = "content claim max flow files"; + + private String contentClaimMaxAppendableSize = "10 MB"; + private Number contentClaimMaxFlowFiles = 100; + private Boolean alwaysSync = false; + + public ContentRepositorySchema() { + } + + public ContentRepositorySchema(Map map) { + contentClaimMaxAppendableSize = getOptionalKeyAsType(map, CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY, String.class, CONTENT_REPO_KEY, "10 MB"); + contentClaimMaxFlowFiles = getOptionalKeyAsType(map, CONTENT_CLAIM_MAX_FLOW_FILES_KEY, Number.class, CONTENT_REPO_KEY, 100); + alwaysSync = getOptionalKeyAsType(map, ALWAYS_SYNC_KEY, Boolean.class, CONTENT_REPO_KEY, false); + } + + public String getContentClaimMaxAppendableSize() { + return contentClaimMaxAppendableSize; + } + + public Number getContentClaimMaxFlowFiles() { + return contentClaimMaxFlowFiles; + } + + public boolean getAlwaysSync() { + return alwaysSync; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/CorePropertiesSchema.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/CorePropertiesSchema.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/CorePropertiesSchema.java new file mode 100644 index 0000000..d4bae0b --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/CorePropertiesSchema.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.util.schema; + +import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CORE_PROPS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.MAX_CONCURRENT_THREADS_KEY; + +/** + * + */ +public class CorePropertiesSchema extends BaseSchema { + + public static final String FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY = "flow controller graceful shutdown period"; + public static final String FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY = "flow service write delay interval"; + public static final String ADMINISTRATIVE_YIELD_DURATION_KEY = "administrative yield duration"; + public static final String BORED_YIELD_DURATION_KEY = "bored yield duration"; + + + private String flowControllerGracefulShutdownPeriod = "10 sec"; + private String flowServiceWriteDelayInterval = "500 ms"; + private String administrativeYieldDuration = "30 sec"; + private String boredYieldDuration = "10 millis"; + private Number maxConcurrentThreads = 1; + + public CorePropertiesSchema() { + } + + public CorePropertiesSchema(Map map) { + flowControllerGracefulShutdownPeriod = getOptionalKeyAsType(map, FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY, String.class, CORE_PROPS_KEY, "10 sec"); + flowServiceWriteDelayInterval = getOptionalKeyAsType(map, FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY, String.class, CORE_PROPS_KEY, "500 ms"); + administrativeYieldDuration = getOptionalKeyAsType(map, ADMINISTRATIVE_YIELD_DURATION_KEY, String.class, CORE_PROPS_KEY, "30 sec"); + boredYieldDuration = getOptionalKeyAsType(map, BORED_YIELD_DURATION_KEY, String.class, CORE_PROPS_KEY, "10 millis"); + maxConcurrentThreads = getOptionalKeyAsType(map, MAX_CONCURRENT_THREADS_KEY, Number.class, CORE_PROPS_KEY, 1); + } + + public String getFlowControllerGracefulShutdownPeriod() { + return flowControllerGracefulShutdownPeriod; + } + + public String getFlowServiceWriteDelayInterval() { + return flowServiceWriteDelayInterval; + } + + public String getAdministrativeYieldDuration() { + return administrativeYieldDuration; + } + + public String getBoredYieldDuration() { + return boredYieldDuration; + } + + public Number getMaxConcurrentThreads() { + return maxConcurrentThreads; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowControllerSchema.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowControllerSchema.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowControllerSchema.java new file mode 100644 index 0000000..1d7500a --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowControllerSchema.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.util.schema; + +import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.COMMENT_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.NAME_KEY; + +/** + * + */ +public class FlowControllerSchema extends BaseSchema { + + private String name; + private String comment = ""; + + public FlowControllerSchema() { + } + + public FlowControllerSchema(Map map) { + name = getRequiredKeyAsType(map, NAME_KEY, String.class, FLOW_CONTROLLER_PROPS_KEY); + + comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, FLOW_CONTROLLER_PROPS_KEY, ""); + } + + public String getName() { + return name; + } + + public String getComment() { + return comment; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowFileRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowFileRepositorySchema.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowFileRepositorySchema.java new file mode 100644 index 0000000..10a8c55 --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowFileRepositorySchema.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.util.schema; + +import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema; + +import java.util.Map; + +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.ALWAYS_SYNC_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.SWAP_PROPS_KEY; + +/** + * + */ +public class FlowFileRepositorySchema extends BaseSchema { + + public static final String PARTITIONS_KEY = "partitions"; + public static final String CHECKPOINT_INTERVAL_KEY = "checkpoint interval"; + + private Number partitions = 256; + private String checkpointInterval = "2 mins"; + private Boolean alwaysSync = false; + private SwapSchema swapProperties; + + public FlowFileRepositorySchema() { + swapProperties = new SwapSchema(); + } + + public FlowFileRepositorySchema(Map map) { + partitions = getOptionalKeyAsType(map, PARTITIONS_KEY, Number.class, FLOWFILE_REPO_KEY, 256); + checkpointInterval = getOptionalKeyAsType(map, CHECKPOINT_INTERVAL_KEY, String.class, FLOWFILE_REPO_KEY, "2 mins"); + alwaysSync = getOptionalKeyAsType(map, ALWAYS_SYNC_KEY, Boolean.class, FLOWFILE_REPO_KEY, false); + + swapProperties = getMapAsType(map, SWAP_PROPS_KEY, SwapSchema.class, FLOWFILE_REPO_KEY, false); + addIssuesIfNotNull(swapProperties); + } + + public Number getPartitions() { + return partitions; + } + + public String getCheckpointInterval() { + return checkpointInterval; + } + + public boolean getAlwaysSync() { + return alwaysSync; + } + + public SwapSchema getSwapProperties() { + return swapProperties; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ProcessorSchema.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ProcessorSchema.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ProcessorSchema.java new file mode 100644 index 0000000..bb86c8c --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ProcessorSchema.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.util.schema; + +import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema; +import org.apache.nifi.scheduling.SchedulingStrategy; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.NAME_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.PROCESSORS_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY; +import static org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY; + +/** + * + */ +public class ProcessorSchema extends BaseSchema { + public static final String CLASS_KEY = "class"; + public static final String PENALIZATION_PERIOD_KEY = "penalization period"; + public static final String RUN_DURATION_NANOS_KEY = "run duration nanos"; + public static final String AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY = "auto-terminated relationships list"; + public static final String PROCESSOR_PROPS_KEY = "Properties"; + + + private String name; + private String processorClass; + private Number maxConcurrentTasks = 1; + private String schedulingStrategy; + private String schedulingPeriod; + private String penalizationPeriod = "30 sec"; + private String yieldPeriod = "1 sec"; + private Number runDurationNanos = 0; + private List<String> autoTerminatedRelationshipsList = Collections.emptyList(); + private Map<String, Object> properties = Collections.emptyMap(); + + public ProcessorSchema() { + } + + public ProcessorSchema(Map map) { + name = getRequiredKeyAsType(map, NAME_KEY, String.class, PROCESSORS_KEY); + processorClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, PROCESSORS_KEY); + + maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, PROCESSORS_KEY, 1); + + schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROCESSORS_KEY); + try { + SchedulingStrategy.valueOf(schedulingStrategy); + } catch (IllegalArgumentException e) { + addValidationIssue(SCHEDULING_STRATEGY_KEY, PROCESSORS_KEY, "it is not a valid scheduling strategy"); + } + + schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROCESSORS_KEY); + + penalizationPeriod = getOptionalKeyAsType(map, PENALIZATION_PERIOD_KEY, String.class, PROCESSORS_KEY, "30 sec"); + + yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, PROCESSORS_KEY, "1 sec"); + + runDurationNanos = getOptionalKeyAsType(map, RUN_DURATION_NANOS_KEY, Number.class, PROCESSORS_KEY, 0); + + autoTerminatedRelationshipsList = getOptionalKeyAsType(map, AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, List.class, PROCESSORS_KEY, null); + + properties = getOptionalKeyAsType(map, PROCESSOR_PROPS_KEY, Map.class, PROCESSORS_KEY, null); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getProcessorClass() { + return processorClass; + } + + public Number getMaxConcurrentTasks() { + return maxConcurrentTasks; + } + + public String getSchedulingStrategy() { + return schedulingStrategy; + } + + public String getSchedulingPeriod() { + return schedulingPeriod; + } + + public String getPenalizationPeriod() { + return penalizationPeriod; + } + + public String getYieldPeriod() { + return yieldPeriod; + } + + public Number getRunDurationNanos() { + return runDurationNanos; + } + + public List<String> getAutoTerminatedRelationshipsList() { + return autoTerminatedRelationshipsList; + } + + public Map<String, Object> getProperties() { + return properties; + } +}
