MINIFI-30 Added changes to the config transformer to support multiple 
processors, multiple RPGs, multiple Input Ports in RPGS, and greater validation 
on the config yaml

This closes #17


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/6f528bbc
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/6f528bbc
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/6f528bbc

Branch: refs/heads/master
Commit: 6f528bbc1454ee66f2fa26d7c01c8de28290f649
Parents: fb55481
Author: Joseph Percivall <[email protected]>
Authored: Tue Apr 26 15:11:00 2016 -0400
Committer: Joseph Percivall <[email protected]>
Committed: Tue Jun 14 15:27:10 2016 -0400

----------------------------------------------------------------------
 .../InvalidConfigurationException.java          |  38 ++
 .../bootstrap/util/ConfigTransformer.java       | 484 ++++++++-----------
 .../schema/ComponentStatusRepositorySchema.java |  50 ++
 .../bootstrap/util/schema/ConfigSchema.java     | 157 ++++++
 .../bootstrap/util/schema/ConnectionSchema.java |  93 ++++
 .../util/schema/ContentRepositorySchema.java    |  57 +++
 .../util/schema/CorePropertiesSchema.java       |  73 +++
 .../util/schema/FlowControllerSchema.java       |  51 ++
 .../util/schema/FlowFileRepositorySchema.java   |  68 +++
 .../bootstrap/util/schema/ProcessorSchema.java  | 127 +++++
 .../util/schema/ProvenanceReportingSchema.java  | 112 +++++
 .../util/schema/ProvenanceRepositorySchema.java |  43 ++
 .../util/schema/RemoteInputPortSchema.java      |  72 +++
 .../schema/RemoteProcessingGroupSchema.java     |  88 ++++
 .../util/schema/SecurityPropertiesSchema.java   | 164 +++++++
 .../util/schema/SensitivePropsSchema.java       |  59 +++
 .../bootstrap/util/schema/SwapSchema.java       |  75 +++
 .../util/schema/common/BaseSchema.java          | 134 +++++
 .../util/schema/common/CommonPropertyKeys.java  |  49 ++
 .../bootstrap/util/TestConfigTransformer.java   | 122 ++++-
 .../src/test/resources/config-empty.yml         |   6 +-
 .../test/resources/config-malformed-field.yml   | 109 +++++
 .../src/test/resources/config-minimal.yml       |  35 ++
 .../resources/config-missing-required-field.yml | 109 +++++
 .../src/test/resources/config-multiple-RPGs.yml | 127 +++++
 .../resources/config-multiple-input-ports.yml   | 121 +++++
 .../test/resources/config-multiple-problems.yml | 111 +++++
 .../resources/config-multiple-processors.yml    | 165 +++++++
 minifi-bootstrap/src/test/resources/config.yml  |  68 +--
 minifi-bootstrap/src/test/resources/default.yml |  46 +-
 minifi-docs/Properties_Guide.md                 | 246 ++++++++--
 .../src/main/resources/conf/config.yml          |  46 +-
 32 files changed, 2865 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/exception/InvalidConfigurationException.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/exception/InvalidConfigurationException.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/exception/InvalidConfigurationException.java
new file mode 100644
index 0000000..bdc51d4
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/exception/InvalidConfigurationException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.exception;
+
+public class InvalidConfigurationException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidConfigurationException() {
+        super();
+    }
+
+    public InvalidConfigurationException(final String message) {
+        super(message);
+    }
+
+    public InvalidConfigurationException(final Throwable t) {
+        super(t);
+    }
+
+    public InvalidConfigurationException(final String message, final Throwable 
t) {
+        super(message, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index e7383ad..956f4dc 100644
--- 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -20,6 +20,22 @@ package org.apache.nifi.minifi.bootstrap.util;
 
 import org.apache.nifi.controller.FlowSerializationException;
 import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import 
org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException;
+import 
org.apache.nifi.minifi.bootstrap.util.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.ConfigSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.ConnectionSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.FlowControllerSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.ProcessorSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.ProvenanceReportingSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.RemoteInputPortSchema;
+import 
org.apache.nifi.minifi.bootstrap.util.schema.RemoteProcessingGroupSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.SecurityPropertiesSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.SensitivePropsSchema;
+import org.apache.nifi.minifi.bootstrap.util.schema.SwapSchema;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
@@ -50,96 +66,17 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.zip.GZIPOutputStream;
 
 public final class ConfigTransformer {
-    // Underlying version NIFI POC will be using
+    // Underlying version of NIFI will be using
     public static final String NIFI_VERSION = "0.6.0";
 
-    public static final String NAME_KEY = "name";
-    public static final String COMMENT_KEY = "comment";
-    public static final String ALWAYS_SYNC_KEY = "always sync";
-    public static final String YIELD_PERIOD_KEY = "yield period";
-    public static final String MAX_CONCURRENT_TASKS_KEY = "max concurrent 
tasks";
-    public static final String ID_KEY = "id";
-
-    public static final String FLOW_CONTROLLER_PROPS_KEY = "Flow Controller";
-
-    public static final String CORE_PROPS_KEY = "Core Properties";
-    public static final String FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY = "flow 
controller graceful shutdown period";
-    public static final String FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY = "flow 
service write delay interval";
-    public static final String ADMINISTRATIVE_YIELD_DURATION_KEY = 
"administrative yield duration";
-    public static final String BORED_YIELD_DURATION_KEY = "bored yield 
duration";
-
-    public static final String FLOWFILE_REPO_KEY = "FlowFile Repository";
-    public static final String PARTITIONS_KEY = "partitions";
-    public static final String CHECKPOINT_INTERVAL_KEY = "checkpoint interval";
-    public static final String THRESHOLD_KEY = "queue swap threshold";
-    public static final String SWAP_PROPS_KEY = "Swap";
-    public static final String IN_PERIOD_KEY = "in period";
-    public static final String IN_THREADS_KEY = "in threads";
-    public static final String OUT_PERIOD_KEY = "out period";
-    public static final String OUT_THREADS_KEY = "out threads";
-
-
-    public static final String PROVENANCE_REPO_KEY = "Provenance Repository";
-    public static final String PROVENANCE_REPO_ROLLOVER_TIME_KEY = "provenance 
rollover time";
-
-    public static final String CONTENT_REPO_KEY = "Content Repository";
-    public static final String CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY = 
"content claim max appendable size";
-    public static final String CONTENT_CLAIM_MAX_FLOW_FILES_KEY = "content 
claim max flow files";
-
-    public static final String COMPONENT_STATUS_REPO_KEY = "Component Status 
Repository";
-    public static final String BUFFER_SIZE_KEY = "buffer size";
-    public static final String SNAPSHOT_FREQUENCY_KEY = "snapshot frequency";
-
-    public static final String SECURITY_PROPS_KEY = "Security Properties";
-    public static final String KEYSTORE_KEY = "keystore";
-    public static final String KEYSTORE_TYPE_KEY = "keystore type";
-    public static final String KEYSTORE_PASSWORD_KEY = "keystore password";
-    public static final String KEY_PASSWORD_KEY = "key password";
-    public static final String TRUSTSTORE_KEY = "truststore";
-    public static final String TRUSTSTORE_TYPE_KEY = "truststore type";
-    public static final String TRUSTSTORE_PASSWORD_KEY = "truststore password";
-    public static final String SENSITIVE_PROPS_KEY = "Sensitive Props";
-    public static final String SENSITIVE_PROPS_KEY__KEY = "key";
-    public static final String SENSITIVE_PROPS_ALGORITHM_KEY = "algorithm";
-    public static final String SENSITIVE_PROPS_PROVIDER_KEY = "provider";
-
-    public static final String PROCESSOR_CONFIG_KEY = "Processor 
Configuration";
-    public static final String CLASS_KEY = "class";
-    public static final String SCHEDULING_PERIOD_KEY = "scheduling period";
-    public static final String PENALIZATION_PERIOD_KEY = "penalization period";
-    public static final String SCHEDULING_STRATEGY_KEY = "scheduling strategy";
-    public static final String RUN_DURATION_NANOS_KEY = "run duration nanos";
-    public static final String AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY = 
"auto-terminated relationships list";
-
-    public static final String PROCESSOR_PROPS_KEY = "Properties";
-
-    public static final String CONNECTION_PROPS_KEY = "Connection Properties";
-    public static final String MAX_WORK_QUEUE_SIZE_KEY = "max work queue size";
-    public static final String MAX_WORK_QUEUE_DATA_SIZE_KEY = "max work queue 
data size";
-    public static final String FLOWFILE_EXPIRATION__KEY = "flowfile 
expiration";
-    public static final String QUEUE_PRIORITIZER_CLASS_KEY = "queue 
prioritizer class";
-
-    public static final String REMOTE_PROCESSING_GROUP_KEY = "Remote 
Processing Group";
-    public static final String URL_KEY = "url";
-    public static final String TIMEOUT_KEY = "timeout";
-
-    public static final String INPUT_PORT_KEY = "Input Port";
-    public static final String USE_COMPRESSION_KEY = "use compression";
-
-    public static final String PROVENANCE_REPORTING_KEY = "Provenance 
Reporting";
-    public static final String DESTINATION_URL_KEY = "destination url";
-    public static final String PORT_NAME_KEY = "port name";
-    public static final String ORIGINATING_URL_KEY = "originating url";
-    public static final String BATCH_SIZE_KEY = "batch size";
-
-    public static final String SSL_PROTOCOL_KEY = "ssl protocol";
-
     // Final util classes should have private constructor
-    private ConfigTransformer() {}
+    private ConfigTransformer() {
+    }
 
     public static void transformConfigFile(String sourceFile, String destPath) 
throws Exception {
         File ymlConfigFile = new File(sourceFile);
@@ -157,20 +94,24 @@ public final class ConfigTransformer {
 
             // Verify the parsed object is a Map structure
             if (loadedObject instanceof Map) {
-                final Map<String, Object> result = (Map<String, Object>) 
loadedObject;
+                final Map<String, Object> loadedMap = (Map<String, Object>) 
loadedObject;
+                ConfigSchema configSchema = new ConfigSchema(loadedMap);
+                if (!configSchema.isValid()) {
+                    throw new InvalidConfigurationException("Failed to 
transform config file due to:" + configSchema.getValidationIssuesAsString());
+                }
 
                 // Create nifi.properties and flow.xml.gz in memory
                 ByteArrayOutputStream nifiPropertiesOutputStream = new 
ByteArrayOutputStream();
-                writeNiFiProperties(result, nifiPropertiesOutputStream);
+                writeNiFiProperties(configSchema, nifiPropertiesOutputStream);
 
-                DOMSource flowXml = createFlowXml(result);
+                DOMSource flowXml = createFlowXml(configSchema);
 
                 // Write nifi.properties and flow.xml.gz
                 writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath);
 
                 writeFlowXmlFile(flowXml, destPath);
             } else {
-                throw new IllegalArgumentException("Provided YAML 
configuration is not a Map.");
+                throw new InvalidConfigurationException("Provided YAML 
configuration is not a Map");
             }
         } finally {
             if (sourceStream != null) {
@@ -180,12 +121,11 @@ public final class ConfigTransformer {
     }
 
     private static void writeNiFiPropertiesFile(ByteArrayOutputStream 
nifiPropertiesOutputStream, String destPath) throws IOException {
-        try {
-            final Path nifiPropertiesPath = Paths.get(destPath, 
"nifi.properties");
-            FileOutputStream nifiProperties = new FileOutputStream(new 
File(nifiPropertiesPath.toString()));
-            
nifiProperties.write(nifiPropertiesOutputStream.getUnderlyingBuffer());
+        final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties");
+        try (FileOutputStream nifiProperties = new 
FileOutputStream(nifiPropertiesPath.toString())) {
+            nifiPropertiesOutputStream.writeTo(nifiProperties);
         } finally {
-            if (nifiPropertiesOutputStream != null){
+            if (nifiPropertiesOutputStream != null) {
                 nifiPropertiesOutputStream.flush();
                 nifiPropertiesOutputStream.close();
             }
@@ -210,32 +150,32 @@ public final class ConfigTransformer {
         outStream.close();
     }
 
-    private static void writeNiFiProperties(Map<String, Object> topLevelYaml, 
OutputStream outputStream) throws FileNotFoundException, 
UnsupportedEncodingException, ConfigurationChangeException {
+    private static void writeNiFiProperties(ConfigSchema configSchema, 
OutputStream outputStream) throws FileNotFoundException, 
UnsupportedEncodingException, ConfigurationChangeException {
         PrintWriter writer = null;
         try {
             writer = new PrintWriter(outputStream, true);
 
-            Map<String, Object> coreProperties = (Map<String, Object>) 
topLevelYaml.get(CORE_PROPS_KEY);
-            Map<String, Object> flowfileRepo = (Map<String, Object>) 
topLevelYaml.get(FLOWFILE_REPO_KEY);
-            Map<String, Object> swapProperties = (Map<String, Object>) 
flowfileRepo.get(SWAP_PROPS_KEY);
-            Map<String, Object> contentRepo = (Map<String, Object>) 
topLevelYaml.get(CONTENT_REPO_KEY);
-            Map<String, Object> provenanceRepo = (Map<String, Object>) 
topLevelYaml.get(PROVENANCE_REPO_KEY);
-            Map<String, Object> componentStatusRepo = (Map<String, Object>) 
topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
-            Map<String, Object> securityProperties = (Map<String, Object>) 
topLevelYaml.get(SECURITY_PROPS_KEY);
-            Map<String, Object> sensitiveProperties = (Map<String, Object>) 
securityProperties.get(SENSITIVE_PROPS_KEY);
+            CorePropertiesSchema coreProperties = 
configSchema.getCoreProperties();
+            FlowFileRepositorySchema flowfileRepoSchema = 
configSchema.getFlowfileRepositoryProperties();
+            SwapSchema swapProperties = flowfileRepoSchema.getSwapProperties();
+            ContentRepositorySchema contentRepoProperties = 
configSchema.getContentRepositoryProperties();
+            ComponentStatusRepositorySchema componentStatusRepoProperties = 
configSchema.getComponentStatusRepositoryProperties();
+            SecurityPropertiesSchema securityProperties = 
configSchema.getSecurityProperties();
+            SensitivePropsSchema sensitiveProperties = 
securityProperties.getSensitiveProps();
+            ProvenanceRepositorySchema provenanceRepositorySchema = 
configSchema.getProvenanceRepositorySchema();
 
             writer.print(PROPERTIES_FILE_APACHE_2_0_LICENSE);
             writer.println("# Core Properties #");
             writer.println();
-            writer.println("nifi.version="+NIFI_VERSION);
+            writer.println("nifi.version=" + NIFI_VERSION);
             writer.println("nifi.flow.configuration.file=./conf/flow.xml.gz");
             
writer.println("nifi.flow.configuration.archive.dir=./conf/archive/");
             writer.println("nifi.flowcontroller.autoResumeState=true");
-            writer.println("nifi.flowcontroller.graceful.shutdown.period=" + 
getValueString(coreProperties, FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY));
-            writer.println("nifi.flowservice.writedelay.interval=" + 
getValueString(coreProperties, FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY));
-            writer.println("nifi.administrative.yield.duration=" + 
getValueString(coreProperties, ADMINISTRATIVE_YIELD_DURATION_KEY));
+            writer.println("nifi.flowcontroller.graceful.shutdown.period=" + 
coreProperties.getFlowControllerGracefulShutdownPeriod());
+            writer.println("nifi.flowservice.writedelay.interval=" + 
coreProperties.getFlowServiceWriteDelayInterval());
+            writer.println("nifi.administrative.yield.duration=" + 
coreProperties.getAdministrativeYieldDuration());
             writer.println("# If a component has no work to do (is \"bored\"), 
how long should we wait before checking again for work?");
-            writer.println("nifi.bored.yield.duration=" + 
getValueString(coreProperties, BORED_YIELD_DURATION_KEY));
+            writer.println("nifi.bored.yield.duration=" + 
coreProperties.getBoredYieldDuration());
             writer.println();
             
writer.println("nifi.authority.provider.configuration.file=./conf/authority-providers.xml");
             
writer.println("nifi.login.identity.provider.configuration.file=./conf/login-identity-providers.xml");
@@ -260,38 +200,38 @@ public final class ConfigTransformer {
             writer.println("# FlowFile Repository");
             
writer.println("nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository");
             
writer.println("nifi.flowfile.repository.directory=./flowfile_repository");
-            writer.println("nifi.flowfile.repository.partitions=" + 
getValueString(flowfileRepo, PARTITIONS_KEY));
-            writer.println("nifi.flowfile.repository.checkpoint.interval=" + 
getValueString(flowfileRepo,CHECKPOINT_INTERVAL_KEY));
-            writer.println("nifi.flowfile.repository.always.sync=" + 
getValueString(flowfileRepo,ALWAYS_SYNC_KEY));
+            writer.println("nifi.flowfile.repository.partitions=" + 
flowfileRepoSchema.getPartitions());
+            writer.println("nifi.flowfile.repository.checkpoint.interval=" + 
flowfileRepoSchema.getCheckpointInterval());
+            writer.println("nifi.flowfile.repository.always.sync=" + 
flowfileRepoSchema.getAlwaysSync());
             writer.println();
             
writer.println("nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager");
-            writer.println("nifi.queue.swap.threshold=" + 
getValueString(swapProperties, THRESHOLD_KEY));
-            writer.println("nifi.swap.in.period=" + 
getValueString(swapProperties, IN_PERIOD_KEY));
-            writer.println("nifi.swap.in.threads=" + 
getValueString(swapProperties, IN_THREADS_KEY));
-            writer.println("nifi.swap.out.period=" + 
getValueString(swapProperties, OUT_PERIOD_KEY));
-            writer.println("nifi.swap.out.threads=" + 
getValueString(swapProperties, OUT_THREADS_KEY));
+            writer.println("nifi.queue.swap.threshold=" + 
swapProperties.getThreshold());
+            writer.println("nifi.swap.in.period=" + 
swapProperties.getInPeriod());
+            writer.println("nifi.swap.in.threads=" + 
swapProperties.getInThreads());
+            writer.println("nifi.swap.out.period=" + 
swapProperties.getOutPeriod());
+            writer.println("nifi.swap.out.threads=" + 
swapProperties.getOutThreads());
             writer.println();
             writer.println("# Content Repository");
             
writer.println("nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository");
-            writer.println("nifi.content.claim.max.appendable.size=" + 
getValueString(contentRepo, CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY));
-            writer.println("nifi.content.claim.max.flow.files=" + 
getValueString(contentRepo, CONTENT_CLAIM_MAX_FLOW_FILES_KEY));
+            writer.println("nifi.content.claim.max.appendable.size=" + 
contentRepoProperties.getContentClaimMaxAppendableSize());
+            writer.println("nifi.content.claim.max.flow.files=" + 
contentRepoProperties.getContentClaimMaxFlowFiles());
             
writer.println("nifi.content.repository.archive.max.retention.period=");
             
writer.println("nifi.content.repository.archive.max.usage.percentage=");
             writer.println("nifi.content.repository.archive.enabled=false");
             
writer.println("nifi.content.repository.directory.default=./content_repository");
-            writer.println("nifi.content.repository.always.sync=" + 
getValueString(contentRepo, ALWAYS_SYNC_KEY));
+            writer.println("nifi.content.repository.always.sync=" + 
contentRepoProperties.getAlwaysSync());
             writer.println();
             writer.println("# Provenance Repository Properties");
             
writer.println("nifi.provenance.repository.implementation=org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository");
-            writer.println("nifi.provenance.repository.rollover.time=" + 
getValueString(provenanceRepo, PROVENANCE_REPO_ROLLOVER_TIME_KEY));
+            writer.println("nifi.provenance.repository.rollover.time=" + 
provenanceRepositorySchema.getProvenanceRepoRolloverTimeKey());
             writer.println();
             writer.println("# Volatile Provenance Respository Properties");
-            writer.println("nifi.provenance.repository.buffer.size=100000");
+            writer.println("nifi.provenance.repository.buffer.size=10000");
             writer.println();
             writer.println("# Component Status Repository");
             
writer.println("nifi.components.status.repository.implementation=org.apache.nifi.controller.status.history.VolatileComponentStatusRepository");
-            writer.println("nifi.components.status.repository.buffer.size=" + 
getValueString(componentStatusRepo, BUFFER_SIZE_KEY));
-            writer.println("nifi.components.status.snapshot.frequency=" + 
getValueString(componentStatusRepo, SNAPSHOT_FREQUENCY_KEY));
+            writer.println("nifi.components.status.repository.buffer.size=" + 
componentStatusRepoProperties.getBufferSize());
+            writer.println("nifi.components.status.snapshot.frequency=" + 
componentStatusRepoProperties.getSnapshotFrequency());
             writer.println();
             writer.println("# web properties #");
             writer.println("nifi.web.war.directory=./lib");
@@ -303,17 +243,17 @@ public final class ConfigTransformer {
             writer.println("nifi.web.jetty.threads=200");
             writer.println();
             writer.println("# security properties #");
-            writer.println("nifi.sensitive.props.key=" + 
getValueString(sensitiveProperties, SENSITIVE_PROPS_KEY__KEY));
-            writer.println("nifi.sensitive.props.algorithm=" + 
getValueString(sensitiveProperties, SENSITIVE_PROPS_ALGORITHM_KEY));
-            writer.println("nifi.sensitive.props.provider=" + 
getValueString(sensitiveProperties, SENSITIVE_PROPS_PROVIDER_KEY));
+            writer.println("nifi.sensitive.props.key=" + 
sensitiveProperties.getKey());
+            writer.println("nifi.sensitive.props.algorithm=" + 
sensitiveProperties.getAlgorithm());
+            writer.println("nifi.sensitive.props.provider=" + 
sensitiveProperties.getProvider());
             writer.println();
-            writer.println("nifi.security.keystore=" + 
getValueString(securityProperties, KEYSTORE_KEY));
-            writer.println("nifi.security.keystoreType=" + 
getValueString(securityProperties, KEYSTORE_TYPE_KEY));
-            writer.println("nifi.security.keystorePasswd=" + 
getValueString(securityProperties, KEYSTORE_PASSWORD_KEY));
-            writer.println("nifi.security.keyPasswd=" + 
getValueString(securityProperties, KEY_PASSWORD_KEY));
-            writer.println("nifi.security.truststore=" + 
getValueString(securityProperties, TRUSTSTORE_KEY));
-            writer.println("nifi.security.truststoreType=" + 
getValueString(securityProperties, TRUSTSTORE_TYPE_KEY));
-            writer.println("nifi.security.truststorePasswd=" + 
getValueString(securityProperties, TRUSTSTORE_PASSWORD_KEY));
+            writer.println("nifi.security.keystore=" + 
securityProperties.getKeystore());
+            writer.println("nifi.security.keystoreType=" + 
securityProperties.getKeystoreType());
+            writer.println("nifi.security.keystorePasswd=" + 
securityProperties.getKeystorePassword());
+            writer.println("nifi.security.keyPasswd=" + 
securityProperties.getKeyPassword());
+            writer.println("nifi.security.truststore=" + 
securityProperties.getTruststore());
+            writer.println("nifi.security.truststoreType=" + 
securityProperties.getTruststoreType());
+            writer.println("nifi.security.truststorePasswd=" + 
securityProperties.getTruststorePassword());
             writer.println("nifi.security.needClientAuth=");
             writer.println("nifi.security.user.credential.cache.duration=24 
hours");
             
writer.println("nifi.security.user.authority.provider=file-provider");
@@ -333,13 +273,14 @@ public final class ConfigTransformer {
         } catch (NullPointerException e) {
             throw new ConfigurationChangeException("Failed to parse the config 
YAML while creating the nifi.properties", e);
         } finally {
-            if (writer != null){
+            if (writer != null) {
                 writer.flush();
                 writer.close();
             }
         }
     }
-    private static DOMSource createFlowXml(Map<String, Object> topLevelYaml) 
throws IOException, ConfigurationChangeException {
+
+    private static DOMSource createFlowXml(ConfigSchema configSchema) throws 
IOException, ConfigurationChangeException {
         try {
             // create a new, empty document
             final DocumentBuilderFactory docFactory = 
DocumentBuilderFactory.newInstance();
@@ -351,50 +292,34 @@ public final class ConfigTransformer {
             // populate document with controller state
             final Element rootNode = doc.createElement("flowController");
             doc.appendChild(rootNode);
-            Map<String, Object> processorConfig = (Map<String, Object>) 
topLevelYaml.get(PROCESSOR_CONFIG_KEY);
-            addTextElement(rootNode, "maxTimerDrivenThreadCount", 
getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
-            addTextElement(rootNode, "maxEventDrivenThreadCount", 
getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
-            addProcessGroup(rootNode, topLevelYaml, "rootGroup");
-
-            Map<String, Object> securityProps = (Map<String, Object>) 
topLevelYaml.get(SECURITY_PROPS_KEY);
-            if (securityProps != null) {
-                String sslAlgorithm = (String) 
securityProps.get(SSL_PROTOCOL_KEY);
-                if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
-                    final Element controllerServicesNode = 
doc.createElement("controllerServices");
-                    rootNode.appendChild(controllerServicesNode);
-                    addSSLControllerService(controllerServicesNode, 
securityProps);
-                }
+            CorePropertiesSchema coreProperties = 
configSchema.getCoreProperties();
+            addTextElement(rootNode, "maxTimerDrivenThreadCount", 
String.valueOf(coreProperties.getMaxConcurrentThreads()));
+            addTextElement(rootNode, "maxEventDrivenThreadCount", 
String.valueOf(coreProperties.getMaxConcurrentThreads()));
+            addProcessGroup(rootNode, configSchema, "rootGroup");
+
+            SecurityPropertiesSchema securityProperties = 
configSchema.getSecurityProperties();
+            if (securityProperties.useSSL()) {
+                final Element controllerServicesNode = 
doc.createElement("controllerServices");
+                rootNode.appendChild(controllerServicesNode);
+                addSSLControllerService(controllerServicesNode, 
securityProperties);
             }
 
-            Map<String, Object> provenanceProperties = (Map<String, Object>) 
topLevelYaml.get(PROVENANCE_REPORTING_KEY);
-            if (provenanceProperties.get(SCHEDULING_STRATEGY_KEY) != null) {
+            ProvenanceReportingSchema provenanceProperties = 
configSchema.getProvenanceReportingProperties();
+            if (provenanceProperties != null) {
                 final Element reportingTasksNode = 
doc.createElement("reportingTasks");
                 rootNode.appendChild(reportingTasksNode);
-                addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
+                addProvenanceReportingTask(reportingTasksNode, configSchema);
             }
 
             return new DOMSource(doc);
         } catch (final ParserConfigurationException | DOMException | 
TransformerFactoryConfigurationError | IllegalArgumentException e) {
             throw new FlowSerializationException(e);
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config 
YAML while writing the top level of the flow xml", e);
         }
     }
 
-    private static <K> String getValueString(Map<K,Object> map, K key){
-        Object value = map.get(key);
-        return value == null ? "" : value.toString();
-    }
-
-    private static <K> String getValueString(Map<K,Object> map, K key, String 
theDefault){
-        Object value = null;
-        if (map != null){
-            value = map.get(key);
-        }
-        return value == null ? theDefault : value.toString();
-    }
-
-    private static void addSSLControllerService(final Element element, 
Map<String, Object> securityProperties) throws ConfigurationChangeException {
+    private static void addSSLControllerService(final Element element, 
SecurityPropertiesSchema securityProperties) throws 
ConfigurationChangeException {
         try {
             final Element serviceElement = 
element.getOwnerDocument().createElement("controllerService");
             addTextElement(serviceElement, "id", "SSL-Context-Service");
@@ -405,126 +330,133 @@ public final class ConfigTransformer {
             addTextElement(serviceElement, "enabled", "true");
 
             Map<String, Object> attributes = new HashMap<>();
-            attributes.put("Keystore Filename", 
securityProperties.get(KEYSTORE_KEY));
-            attributes.put("Keystore Type", 
securityProperties.get(KEYSTORE_TYPE_KEY));
-            attributes.put("Keystore Password", 
securityProperties.get(KEYSTORE_PASSWORD_KEY));
-            attributes.put("Truststore Filename", 
securityProperties.get(TRUSTSTORE_KEY));
-            attributes.put("Truststore Type", 
securityProperties.get(TRUSTSTORE_TYPE_KEY));
-            attributes.put("Truststore Password", 
securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
-            attributes.put("SSL Protocol", 
securityProperties.get(SSL_PROTOCOL_KEY));
+            attributes.put("Keystore Filename", 
securityProperties.getKeystore());
+            attributes.put("Keystore Type", 
securityProperties.getKeystoreType());
+            attributes.put("Keystore Password", 
securityProperties.getKeyPassword());
+            attributes.put("Truststore Filename", 
securityProperties.getTruststore());
+            attributes.put("Truststore Type", 
securityProperties.getTruststoreType());
+            attributes.put("Truststore Password", 
securityProperties.getTruststorePassword());
+            attributes.put("SSL Protocol", 
securityProperties.getSslProtocol());
 
             addConfiguration(serviceElement, attributes);
 
             element.appendChild(serviceElement);
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config 
YAML while trying to create an SSL Controller Service", e);
         }
     }
 
-    private static void addProcessGroup(final Element parentElement, 
Map<String, Object> topLevelYaml, final String elementName) throws 
ConfigurationChangeException {
+    private static void addProcessGroup(final Element parentElement, 
ConfigSchema configSchema, final String elementName) throws 
ConfigurationChangeException {
         try {
-            Map<String, Object> flowControllerProperties = (Map<String, 
Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
+            FlowControllerSchema flowControllerProperties = 
configSchema.getFlowControllerProperties();
 
             final Document doc = parentElement.getOwnerDocument();
             final Element element = doc.createElement(elementName);
             parentElement.appendChild(element);
             addTextElement(element, "id", "Root-Group");
-            addTextElement(element, "name", 
getValueString(flowControllerProperties, NAME_KEY));
+            addTextElement(element, "name", 
flowControllerProperties.getName());
             addPosition(element);
-            addTextElement(element, "comment", 
getValueString(flowControllerProperties, COMMENT_KEY));
+            addTextElement(element, "comment", 
flowControllerProperties.getComment());
 
-            Map<String, Object> processorConfig = (Map<String, Object>) 
topLevelYaml.get(PROCESSOR_CONFIG_KEY);
-            addProcessor(element, processorConfig);
+            List<ProcessorSchema> processors = configSchema.getProcessors();
+            if (processors != null) {
+                for (ProcessorSchema processorConfig : processors) {
+                    addProcessor(element, processorConfig);
+                }
+            }
 
-            Map<String, Object> remoteProcessingGroup = (Map<String, Object>) 
topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
-            addRemoteProcessGroup(element, remoteProcessingGroup);
+            List<RemoteProcessingGroupSchema> remoteProcessingGroups = 
configSchema.getRemoteProcessingGroups();
+            if (remoteProcessingGroups != null) {
+                for (RemoteProcessingGroupSchema remoteProcessingGroupSchema: 
remoteProcessingGroups) {
+                    addRemoteProcessGroup(element, 
remoteProcessingGroupSchema);
+                }
+            }
 
-            addConnection(element, topLevelYaml);
-        } catch (ConfigurationChangeException e){
+            List<ConnectionSchema> connections = configSchema.getConnections();
+            if (connections != null) {
+                for (ConnectionSchema connectionConfig : connections) {
+                    addConnection(element, connectionConfig, configSchema);
+                }
+            }
+        } catch (ConfigurationChangeException e) {
             throw e;
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config 
YAML while trying to creating the root Process Group", e);
         }
     }
 
-    private static void addProcessor(final Element parentElement, Map<String, 
Object> processorConfig) throws ConfigurationChangeException {
-
+    private static void addProcessor(final Element parentElement, 
ProcessorSchema processorConfig) throws ConfigurationChangeException {
         try {
-            if (processorConfig.get(CLASS_KEY) == null) {
-                // Only add a processor if it has a class
-                return;
-            }
-
             final Document doc = parentElement.getOwnerDocument();
             final Element element = doc.createElement("processor");
             parentElement.appendChild(element);
-            addTextElement(element, "id", "Processor");
-            addTextElement(element, "name", getValueString(processorConfig, 
NAME_KEY));
+            addTextElement(element, "id", processorConfig.getName());
+            addTextElement(element, "name", processorConfig.getName());
 
             addPosition(element);
             addStyle(element);
 
-            addTextElement(element, "comment", getValueString(processorConfig, 
COMMENT_KEY));
-            addTextElement(element, "class", getValueString(processorConfig, 
CLASS_KEY));
-            addTextElement(element, "maxConcurrentTasks", 
getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
-            addTextElement(element, "schedulingPeriod", 
getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
-            addTextElement(element, "penalizationPeriod", 
getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
-            addTextElement(element, "yieldPeriod", 
getValueString(processorConfig, YIELD_PERIOD_KEY));
+            addTextElement(element, "comment", "");
+            addTextElement(element, "class", 
processorConfig.getProcessorClass());
+            addTextElement(element, "maxConcurrentTasks", 
String.valueOf(processorConfig.getMaxConcurrentTasks()));
+            addTextElement(element, "schedulingPeriod", 
processorConfig.getSchedulingPeriod());
+            addTextElement(element, "penalizationPeriod", 
processorConfig.getPenalizationPeriod());
+            addTextElement(element, "yieldPeriod", 
processorConfig.getYieldPeriod());
             addTextElement(element, "bulletinLevel", "WARN");
             addTextElement(element, "lossTolerant", "false");
             addTextElement(element, "scheduledState", "RUNNING");
-            addTextElement(element, "schedulingStrategy", 
getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
-            addTextElement(element, "runDurationNanos", 
getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
+            addTextElement(element, "schedulingStrategy", 
processorConfig.getSchedulingStrategy());
+            addTextElement(element, "runDurationNanos", 
String.valueOf(processorConfig.getRunDurationNanos()));
 
-            addConfiguration(element, (Map<String, Object>) 
processorConfig.get(PROCESSOR_PROPS_KEY));
+            addConfiguration(element, processorConfig.getProperties());
 
-            Collection<String> autoTerminatedRelationships = 
(Collection<String>) 
processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
+            Collection<String> autoTerminatedRelationships = 
processorConfig.getAutoTerminatedRelationshipsList();
             if (autoTerminatedRelationships != null) {
                 for (String rel : autoTerminatedRelationships) {
                     addTextElement(element, "autoTerminatedRelationship", rel);
                 }
             }
-        } catch (Exception e){
-            throw new ConfigurationChangeException("Failed to parse the config 
YAML while trying to add the Processor", e);
+        } catch (Exception e) {
+            throw new ConfigurationChangeException("Failed to parse the config 
YAML while trying to add a Processor", e);
         }
     }
 
-    private static void addProvenanceReportingTask(final Element element, 
Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+    private static void addProvenanceReportingTask(final Element element, 
ConfigSchema configSchema) throws ConfigurationChangeException {
         try {
-            Map<String, Object> provenanceProperties = (Map<String, Object>) 
topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+            ProvenanceReportingSchema provenanceProperties = 
configSchema.getProvenanceReportingProperties();
             final Element taskElement = 
element.getOwnerDocument().createElement("reportingTask");
             addTextElement(taskElement, "id", "Provenance-Reporting");
             addTextElement(taskElement, "name", 
"Site-To-Site-Provenance-Reporting");
-            addTextElement(taskElement, "comment", 
getValueString(provenanceProperties, COMMENT_KEY));
+            addTextElement(taskElement, "comment", 
provenanceProperties.getComment());
             addTextElement(taskElement, "class", 
"org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
-            addTextElement(taskElement, "schedulingPeriod", 
getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
+            addTextElement(taskElement, "schedulingPeriod", 
provenanceProperties.getSchedulingPeriod());
             addTextElement(taskElement, "scheduledState", "RUNNING");
-            addTextElement(taskElement, "schedulingStrategy", 
getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
+            addTextElement(taskElement, "schedulingStrategy", 
provenanceProperties.getSchedulingStrategy());
 
             Map<String, Object> attributes = new HashMap<>();
-            attributes.put("Destination URL", 
provenanceProperties.get(DESTINATION_URL_KEY));
-            attributes.put("Input Port Name", 
provenanceProperties.get(PORT_NAME_KEY));
-            attributes.put("MiNiFi URL", 
provenanceProperties.get(ORIGINATING_URL_KEY));
-            attributes.put("Compress Events", 
provenanceProperties.get(USE_COMPRESSION_KEY));
-            attributes.put("Batch Size", 
provenanceProperties.get(BATCH_SIZE_KEY));
-
-            Map<String, Object> securityProps = (Map<String, Object>) 
topLevelYaml.get(SECURITY_PROPS_KEY);
-            String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
-            if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+            attributes.put("Destination URL", 
provenanceProperties.getDestinationUrl());
+            attributes.put("Input Port Name", 
provenanceProperties.getPortName());
+            attributes.put("MiNiFi URL", 
provenanceProperties.getOriginatingUrl());
+            attributes.put("Compress Events", 
provenanceProperties.getUseCompression());
+            attributes.put("Batch Size", provenanceProperties.getBatchSize());
+            attributes.put("Communications Timeout", 
provenanceProperties.getTimeout());
+
+            SecurityPropertiesSchema securityProps = 
configSchema.getSecurityProperties();
+            if (securityProps.useSSL()) {
                 attributes.put("SSL Context Service", "SSL-Context-Service");
             }
 
             addConfiguration(taskElement, attributes);
 
             element.appendChild(taskElement);
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config 
YAML while trying to add the Provenance Reporting Task", e);
         }
     }
 
     private static void addConfiguration(final Element element, Map<String, 
Object> elementConfig) {
         final Document doc = element.getOwnerDocument();
-        if (elementConfig == null){
+        if (elementConfig == null) {
             return;
         }
         for (final Map.Entry<String, Object> entry : elementConfig.entrySet()) 
{
@@ -544,76 +476,57 @@ public final class ConfigTransformer {
         parentElement.appendChild(element);
     }
 
-    private static void addRemoteProcessGroup(final Element parentElement, 
Map<String, Object> remoteProcessingGroup) throws ConfigurationChangeException {
+    private static void addRemoteProcessGroup(final Element parentElement, 
RemoteProcessingGroupSchema remoteProcessingGroupProperties) throws 
ConfigurationChangeException {
         try {
-            if (remoteProcessingGroup.get(URL_KEY) == null) {
-                // Only add an an RPG if it has a URL
-                return;
-            }
-
             final Document doc = parentElement.getOwnerDocument();
             final Element element = doc.createElement("remoteProcessGroup");
             parentElement.appendChild(element);
-            addTextElement(element, "id", "Remote-Process-Group");
-            addTextElement(element, "name", 
getValueString(remoteProcessingGroup, NAME_KEY));
+            addTextElement(element, "id", 
remoteProcessingGroupProperties.getName());
+            addTextElement(element, "name", 
remoteProcessingGroupProperties.getName());
             addPosition(element);
-            addTextElement(element, "comment", 
getValueString(remoteProcessingGroup, COMMENT_KEY));
-            addTextElement(element, "url", 
getValueString(remoteProcessingGroup, URL_KEY));
-            addTextElement(element, "timeout", 
getValueString(remoteProcessingGroup, TIMEOUT_KEY));
-            addTextElement(element, "yieldPeriod", 
getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
+            addTextElement(element, "comment", 
remoteProcessingGroupProperties.getComment());
+            addTextElement(element, "url", 
remoteProcessingGroupProperties.getUrl());
+            addTextElement(element, "timeout", 
remoteProcessingGroupProperties.getTimeout());
+            addTextElement(element, "yieldPeriod", 
remoteProcessingGroupProperties.getYieldPeriod());
             addTextElement(element, "transmitting", "true");
 
-            Map<String, Object> inputPort = (Map<String, Object>) 
remoteProcessingGroup.get(INPUT_PORT_KEY);
-            addRemoteGroupPort(element, inputPort, "inputPort");
+            List<RemoteInputPortSchema> remoteInputPorts = 
remoteProcessingGroupProperties.getInputPorts();
+            for(RemoteInputPortSchema remoteInputPortSchema: remoteInputPorts) 
{
+                addRemoteGroupPort(element, remoteInputPortSchema);
+            }
 
             parentElement.appendChild(element);
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config 
YAML while trying to add the Remote Process Group", e);
         }
     }
 
-    private static void addRemoteGroupPort(final Element parentElement, 
Map<String, Object> inputPort, final String elementName) throws 
ConfigurationChangeException {
-
+    private static void addRemoteGroupPort(final Element parentElement, 
RemoteInputPortSchema inputPort) throws ConfigurationChangeException {
         try {
-            if (inputPort.get(ID_KEY) == null) {
-                // Only add an input port if it has an ID
-                return;
-            }
-
             final Document doc = parentElement.getOwnerDocument();
-            final Element element = doc.createElement(elementName);
+            final Element element = doc.createElement("inputPort");
             parentElement.appendChild(element);
-            addTextElement(element, "id", getValueString(inputPort, ID_KEY));
-            addTextElement(element, "name", getValueString(inputPort, 
NAME_KEY));
+            addTextElement(element, "id", inputPort.getId());
+            addTextElement(element, "name", inputPort.getName());
             addPosition(element);
-            addTextElement(element, "comments", getValueString(inputPort, 
COMMENT_KEY));
+            addTextElement(element, "comments", inputPort.getComment());
             addTextElement(element, "scheduledState", "RUNNING");
-            addTextElement(element, "maxConcurrentTasks", 
getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
-            addTextElement(element, "useCompression", 
getValueString(inputPort, USE_COMPRESSION_KEY));
+            addTextElement(element, "maxConcurrentTasks", 
String.valueOf(inputPort.getMax_concurrent_tasks()));
+            addTextElement(element, "useCompression", 
String.valueOf(inputPort.getUseCompression()));
 
             parentElement.appendChild(element);
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config 
YAML while trying to add the input port of the Remote Process Group", e);
         }
     }
 
-    private static void addConnection(final Element parentElement, Map<String, 
Object> topLevelYaml) throws ConfigurationChangeException {
+    private static void addConnection(final Element parentElement, 
ConnectionSchema connectionProperties, ConfigSchema configSchema) throws 
ConfigurationChangeException {
         try {
-            Map<String, Object> connectionProperties = (Map<String, Object>) 
topLevelYaml.get(CONNECTION_PROPS_KEY);
-            Map<String, Object> remoteProcessingGroup = (Map<String, Object>) 
topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
-            Map<String, Object> inputPort = (Map<String, Object>) 
remoteProcessingGroup.get(INPUT_PORT_KEY);
-            Map<String, Object> processorConfig = (Map<String, Object>) 
topLevelYaml.get(PROCESSOR_CONFIG_KEY);
-
-            if (inputPort.get(ID_KEY) == null || 
processorConfig.get(CLASS_KEY) == null) {
-                // Only add the connection if the input port and processor 
config are created
-                return;
-            }
-
             final Document doc = parentElement.getOwnerDocument();
             final Element element = doc.createElement("connection");
             parentElement.appendChild(element);
-            addTextElement(element, "id", "Connection");
-            addTextElement(element, "name", 
getValueString(connectionProperties, NAME_KEY));
+            addTextElement(element, "id", connectionProperties.getName());
+            addTextElement(element, "name", connectionProperties.getName());
 
             final Element bendPointsElement = doc.createElement("bendPoints");
             element.appendChild(bendPointsElement);
@@ -621,28 +534,55 @@ public final class ConfigTransformer {
             addTextElement(element, "labelIndex", "1");
             addTextElement(element, "zIndex", "0");
 
-            addTextElement(element, "sourceId", "Processor");
+            addTextElement(element, "sourceId", 
connectionProperties.getSourceName());
             addTextElement(element, "sourceGroupId", "Root-Group");
             addTextElement(element, "sourceType", "PROCESSOR");
 
-            addTextElement(element, "destinationId", getValueString(inputPort, 
ID_KEY));
-            addTextElement(element, "destinationGroupId", 
"Remote-Process-Group");
-            addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
+            addTextElement(element, "destinationId", 
connectionProperties.getDestinationName());
 
-            addTextElement(element, "relationship", "success");
+            if (isInputPortId(connectionProperties.getDestinationName(), 
configSchema)) {
+                addTextElement(element, "destinationGroupId", 
"Remote-Process-Group");
+                addTextElement(element, "destinationType", 
"REMOTE_INPUT_PORT");
+            } else {
+                addTextElement(element, "destinationGroupId", "Root-Group");
+                addTextElement(element, "destinationType", "PROCESSOR");
+            }
+
+            addTextElement(element, "relationship", 
connectionProperties.getSourceRelationshipName());
 
-            addTextElement(element, "maxWorkQueueSize", 
getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
-            addTextElement(element, "maxWorkQueueDataSize", 
getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
+            addTextElement(element, "maxWorkQueueSize", 
String.valueOf(connectionProperties.getMaxWorkQueueSize()));
+            addTextElement(element, "maxWorkQueueDataSize", 
connectionProperties.getMaxWorkQueueDataSize());
 
-            addTextElement(element, "flowFileExpiration", 
getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
-            addTextElement(element, "queuePrioritizerClass", 
getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+            addTextElement(element, "flowFileExpiration", 
connectionProperties.getFlowfileExpiration());
+            addTextElement(element, "queuePrioritizerClass", 
connectionProperties.getQueuePrioritizerClass());
 
             parentElement.appendChild(element);
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config 
YAML while trying to add the connection from the Processor to the input port of 
the Remote Process Group", e);
         }
     }
 
+    private static boolean isInputPortId(String id, ConfigSchema configSchema) 
{
+        boolean isInputPortId = false;
+        try {
+            List<RemoteProcessingGroupSchema> remoteProcessingGroups = 
configSchema.getRemoteProcessingGroups();
+            if (remoteProcessingGroups != null) {
+                for (RemoteProcessingGroupSchema remoteProcessingGroupSchema: 
remoteProcessingGroups) {
+                    List<RemoteInputPortSchema> remoteInputPorts = 
remoteProcessingGroupSchema.getInputPorts();
+                    for (RemoteInputPortSchema remoteInputPortSchema: 
remoteInputPorts) {
+                        if (remoteInputPortSchema != null && 
id.equals(remoteInputPortSchema.getId())) {
+                            isInputPortId = true;
+                            break;
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            // If an exception was thrown then it isn't the InputPort
+        }
+        return isInputPortId;
+    }
+
     private static void addPosition(final Element parentElement) {
         final Element element = 
parentElement.getOwnerDocument().createElement("position");
         element.setAttribute("x", String.valueOf("0"));

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ComponentStatusRepositorySchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ComponentStatusRepositorySchema.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ComponentStatusRepositorySchema.java
new file mode 100644
index 0000000..5b6ac2e
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ComponentStatusRepositorySchema.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.util.schema;
+
+import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY;
+
+/**
+ *
+ */
+public class ComponentStatusRepositorySchema extends BaseSchema {
+    public static final String BUFFER_SIZE_KEY = "buffer size";
+    public static final String SNAPSHOT_FREQUENCY_KEY = "snapshot frequency";
+
+    private Number bufferSize = 1440;
+    private String snapshotFrequency = "1 min";
+
+    public ComponentStatusRepositorySchema() {
+    }
+
+    public ComponentStatusRepositorySchema(Map map) {
+        bufferSize = getOptionalKeyAsType(map, BUFFER_SIZE_KEY, Number.class, 
COMPONENT_STATUS_REPO_KEY, 1440);
+        snapshotFrequency = getOptionalKeyAsType(map, SNAPSHOT_FREQUENCY_KEY, 
String.class, COMPONENT_STATUS_REPO_KEY, "1 min");
+    }
+
+    public Number getBufferSize() {
+        return bufferSize;
+    }
+
+    public String getSnapshotFrequency() {
+        return snapshotFrequency;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConfigSchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConfigSchema.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConfigSchema.java
new file mode 100644
index 0000000..6d3d7ca
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConfigSchema.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.util.schema;
+
+import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.PROCESSORS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY;
+
+/**
+ *
+ */
+public class ConfigSchema extends BaseSchema {
+    public static String TOP_LEVEL_NAME = "top level";
+
+    private FlowControllerSchema flowControllerProperties;
+    private CorePropertiesSchema coreProperties;
+    private FlowFileRepositorySchema flowfileRepositoryProperties;
+    private ContentRepositorySchema contentRepositoryProperties;
+    private ComponentStatusRepositorySchema 
componentStatusRepositoryProperties;
+    private SecurityPropertiesSchema securityProperties;
+    private List<ProcessorSchema> processors;
+    private List<ConnectionSchema> connections;
+    private List<RemoteProcessingGroupSchema> remoteProcessingGroups;
+    private ProvenanceReportingSchema provenanceReportingProperties;
+
+    private ProvenanceRepositorySchema provenanceRepositorySchema;
+
+    public ConfigSchema() {
+    }
+
+    public ConfigSchema(Map map) {
+        Object obj ;
+        flowControllerProperties = getMapAsType(map, 
FLOW_CONTROLLER_PROPS_KEY, FlowControllerSchema.class, TOP_LEVEL_NAME, true);
+
+        coreProperties = getMapAsType(map, CORE_PROPS_KEY, 
CorePropertiesSchema.class, TOP_LEVEL_NAME, false);
+        flowfileRepositoryProperties = getMapAsType(map, FLOWFILE_REPO_KEY, 
FlowFileRepositorySchema.class, TOP_LEVEL_NAME, false);
+        contentRepositoryProperties = getMapAsType(map, CONTENT_REPO_KEY, 
ContentRepositorySchema.class, TOP_LEVEL_NAME, false);
+        provenanceRepositorySchema = getMapAsType(map, PROVENANCE_REPO_KEY, 
ProvenanceRepositorySchema.class, TOP_LEVEL_NAME, false);
+        componentStatusRepositoryProperties = getMapAsType(map, 
COMPONENT_STATUS_REPO_KEY, ComponentStatusRepositorySchema.class, 
TOP_LEVEL_NAME, false);
+        securityProperties = getMapAsType(map, SECURITY_PROPS_KEY, 
SecurityPropertiesSchema.class, TOP_LEVEL_NAME, false);
+
+        processors = getOptionalKeyAsType(map, PROCESSORS_KEY, List.class, 
TOP_LEVEL_NAME, null);
+        if (processors != null) {
+            transformListToType(processors, "processor", 
ProcessorSchema.class, PROCESSORS_KEY);
+        }
+
+        connections = getOptionalKeyAsType(map, CONNECTIONS_KEY, List.class, 
TOP_LEVEL_NAME, null);
+        if (connections != null) {
+            transformListToType(connections, "connection", 
ConnectionSchema.class, CONNECTIONS_KEY);
+        }
+
+        remoteProcessingGroups = getOptionalKeyAsType(map, 
REMOTE_PROCESSING_GROUPS_KEY, List.class, TOP_LEVEL_NAME, null);
+        if (remoteProcessingGroups != null) {
+            transformListToType(remoteProcessingGroups, "remote processing 
group", RemoteProcessingGroupSchema.class, REMOTE_PROCESSING_GROUPS_KEY);
+        }
+
+        provenanceReportingProperties = getMapAsType(map, 
PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, 
false);
+
+        addIssuesIfNotNull(flowControllerProperties);
+        addIssuesIfNotNull(coreProperties);
+        addIssuesIfNotNull(flowfileRepositoryProperties);
+        addIssuesIfNotNull(contentRepositoryProperties);
+        addIssuesIfNotNull(componentStatusRepositoryProperties);
+        addIssuesIfNotNull(securityProperties);
+        addIssuesIfNotNull(provenanceReportingProperties);
+        addIssuesIfNotNull(provenanceRepositorySchema);
+
+        if (processors != null) {
+            for (ProcessorSchema processorSchema : processors) {
+                addIssuesIfNotNull(processorSchema);
+            }
+        }
+
+        if (connections != null) {
+            for (ConnectionSchema connectionSchema : connections) {
+                addIssuesIfNotNull(connectionSchema);
+            }
+        }
+
+        if (remoteProcessingGroups != null) {
+            for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : 
remoteProcessingGroups) {
+                addIssuesIfNotNull(remoteProcessingGroupSchema);
+            }
+        }
+    }
+
+    public FlowControllerSchema getFlowControllerProperties() {
+        return flowControllerProperties;
+    }
+
+    public CorePropertiesSchema getCoreProperties() {
+        return coreProperties;
+    }
+
+    public FlowFileRepositorySchema getFlowfileRepositoryProperties() {
+        return flowfileRepositoryProperties;
+    }
+
+    public ContentRepositorySchema getContentRepositoryProperties() {
+        return contentRepositoryProperties;
+    }
+
+    public SecurityPropertiesSchema getSecurityProperties() {
+        return securityProperties;
+    }
+
+    public List<ProcessorSchema> getProcessors() {
+        return processors;
+    }
+
+    public List<ConnectionSchema> getConnections() {
+        return connections;
+    }
+
+    public List<RemoteProcessingGroupSchema> getRemoteProcessingGroups() {
+        return remoteProcessingGroups;
+    }
+
+    public ProvenanceReportingSchema getProvenanceReportingProperties() {
+        return provenanceReportingProperties;
+    }
+
+    public ComponentStatusRepositorySchema 
getComponentStatusRepositoryProperties() {
+        return componentStatusRepositoryProperties;
+    }
+
+    public ProvenanceRepositorySchema getProvenanceRepositorySchema() {
+        return provenanceRepositorySchema;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConnectionSchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConnectionSchema.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConnectionSchema.java
new file mode 100644
index 0000000..f7024f8
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ConnectionSchema.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.util.schema;
+
+import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.NAME_KEY;
+
+/**
+ *
+ */
+public class ConnectionSchema extends BaseSchema {
+    public static final String SOURCE_NAME_KEY = "source name";
+    public static final String SOURCE_RELATIONSHIP_NAME_KEY = "source 
relationship name";
+    public static final String DESTINATION_NAME_KEY = "destination name";
+    public static final String MAX_WORK_QUEUE_SIZE_KEY = "max work queue size";
+    public static final String MAX_WORK_QUEUE_DATA_SIZE_KEY = "max work queue 
data size";
+    public static final String FLOWFILE_EXPIRATION__KEY = "flowfile 
expiration";
+    public static final String QUEUE_PRIORITIZER_CLASS_KEY = "queue 
prioritizer class";
+
+    private String name;
+    private String sourceName;
+    private String sourceRelationshipName;
+    private String destinationName;
+    private Number maxWorkQueueSize = 0;
+    private String maxWorkQueueDataSize = "0 MB";
+    private String flowfileExpiration = "0 sec";
+    private String queuePrioritizerClass = "";
+
+    public ConnectionSchema() {
+    }
+
+    public ConnectionSchema(Map map) {
+        name = getRequiredKeyAsType(map, NAME_KEY, String.class, 
CONNECTIONS_KEY);
+        sourceName = getRequiredKeyAsType(map, SOURCE_NAME_KEY, String.class, 
CONNECTIONS_KEY);
+        sourceRelationshipName = getRequiredKeyAsType(map, 
SOURCE_RELATIONSHIP_NAME_KEY, String.class, CONNECTIONS_KEY);
+        destinationName = getRequiredKeyAsType(map, DESTINATION_NAME_KEY, 
String.class, CONNECTIONS_KEY);
+
+        maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, 
Number.class, CONNECTIONS_KEY, 0);
+        maxWorkQueueDataSize = getOptionalKeyAsType(map, 
MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, CONNECTIONS_KEY, "0 MB");
+        flowfileExpiration = getOptionalKeyAsType(map, 
FLOWFILE_EXPIRATION__KEY, String.class, CONNECTIONS_KEY, "0 sec");
+        queuePrioritizerClass = getOptionalKeyAsType(map, 
QUEUE_PRIORITIZER_CLASS_KEY, String.class, CONNECTIONS_KEY, "");
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public String getSourceRelationshipName() {
+        return sourceRelationshipName;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public Number getMaxWorkQueueSize() {
+        return maxWorkQueueSize;
+    }
+
+    public String getMaxWorkQueueDataSize() {
+        return maxWorkQueueDataSize;
+    }
+
+    public String getFlowfileExpiration() {
+        return flowfileExpiration;
+    }
+
+    public String getQueuePrioritizerClass() {
+        return queuePrioritizerClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ContentRepositorySchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ContentRepositorySchema.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ContentRepositorySchema.java
new file mode 100644
index 0000000..1a4d07d
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ContentRepositorySchema.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.util.schema;
+
+import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.ALWAYS_SYNC_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY;
+
+/**
+ *
+ */
+public class ContentRepositorySchema extends BaseSchema {
+    public static final String CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY = 
"content claim max appendable size";
+    public static final String CONTENT_CLAIM_MAX_FLOW_FILES_KEY = "content 
claim max flow files";
+
+    private String contentClaimMaxAppendableSize = "10 MB";
+    private Number contentClaimMaxFlowFiles = 100;
+    private Boolean alwaysSync = false;
+
+    public ContentRepositorySchema() {
+    }
+
+    public ContentRepositorySchema(Map map) {
+        contentClaimMaxAppendableSize = getOptionalKeyAsType(map, 
CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY, String.class, CONTENT_REPO_KEY, "10 MB");
+        contentClaimMaxFlowFiles = getOptionalKeyAsType(map, 
CONTENT_CLAIM_MAX_FLOW_FILES_KEY, Number.class, CONTENT_REPO_KEY, 100);
+        alwaysSync = getOptionalKeyAsType(map, ALWAYS_SYNC_KEY, Boolean.class, 
CONTENT_REPO_KEY, false);
+    }
+
+    public String getContentClaimMaxAppendableSize() {
+        return contentClaimMaxAppendableSize;
+    }
+
+    public Number getContentClaimMaxFlowFiles() {
+        return contentClaimMaxFlowFiles;
+    }
+
+    public boolean getAlwaysSync() {
+        return alwaysSync;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/CorePropertiesSchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/CorePropertiesSchema.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/CorePropertiesSchema.java
new file mode 100644
index 0000000..d4bae0b
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/CorePropertiesSchema.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.util.schema;
+
+import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.MAX_CONCURRENT_THREADS_KEY;
+
+/**
+ *
+ */
+public class CorePropertiesSchema extends BaseSchema {
+
+    public static final String FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY = "flow 
controller graceful shutdown period";
+    public static final String FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY = "flow 
service write delay interval";
+    public static final String ADMINISTRATIVE_YIELD_DURATION_KEY = 
"administrative yield duration";
+    public static final String BORED_YIELD_DURATION_KEY = "bored yield 
duration";
+
+
+    private String flowControllerGracefulShutdownPeriod = "10 sec";
+    private String flowServiceWriteDelayInterval = "500 ms";
+    private String administrativeYieldDuration = "30 sec";
+    private String boredYieldDuration = "10 millis";
+    private Number maxConcurrentThreads = 1;
+
+    public CorePropertiesSchema() {
+    }
+
+    public CorePropertiesSchema(Map map) {
+        flowControllerGracefulShutdownPeriod = getOptionalKeyAsType(map, 
FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY, String.class, CORE_PROPS_KEY, "10 sec");
+        flowServiceWriteDelayInterval = getOptionalKeyAsType(map, 
FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY, String.class, CORE_PROPS_KEY, "500 ms");
+        administrativeYieldDuration = getOptionalKeyAsType(map, 
ADMINISTRATIVE_YIELD_DURATION_KEY, String.class, CORE_PROPS_KEY, "30 sec");
+        boredYieldDuration = getOptionalKeyAsType(map, 
BORED_YIELD_DURATION_KEY, String.class, CORE_PROPS_KEY, "10 millis");
+        maxConcurrentThreads = getOptionalKeyAsType(map, 
MAX_CONCURRENT_THREADS_KEY, Number.class, CORE_PROPS_KEY, 1);
+    }
+
+    public String getFlowControllerGracefulShutdownPeriod() {
+        return flowControllerGracefulShutdownPeriod;
+    }
+
+    public String getFlowServiceWriteDelayInterval() {
+        return flowServiceWriteDelayInterval;
+    }
+
+    public String getAdministrativeYieldDuration() {
+        return administrativeYieldDuration;
+    }
+
+    public String getBoredYieldDuration() {
+        return boredYieldDuration;
+    }
+
+    public Number getMaxConcurrentThreads() {
+        return maxConcurrentThreads;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowControllerSchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowControllerSchema.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowControllerSchema.java
new file mode 100644
index 0000000..1d7500a
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowControllerSchema.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.util.schema;
+
+import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.NAME_KEY;
+
+/**
+ *
+ */
+public class FlowControllerSchema extends BaseSchema {
+
+    private String name;
+    private String comment = "";
+
+    public FlowControllerSchema() {
+    }
+
+    public FlowControllerSchema(Map map) {
+        name = getRequiredKeyAsType(map, NAME_KEY, String.class, 
FLOW_CONTROLLER_PROPS_KEY);
+
+        comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, 
FLOW_CONTROLLER_PROPS_KEY, "");
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowFileRepositorySchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowFileRepositorySchema.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowFileRepositorySchema.java
new file mode 100644
index 0000000..10a8c55
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/FlowFileRepositorySchema.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.util.schema;
+
+import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.ALWAYS_SYNC_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.SWAP_PROPS_KEY;
+
+/**
+ *
+ */
+public class FlowFileRepositorySchema extends BaseSchema {
+
+    public static final String PARTITIONS_KEY = "partitions";
+    public static final String CHECKPOINT_INTERVAL_KEY = "checkpoint interval";
+
+    private Number partitions = 256;
+    private String checkpointInterval = "2 mins";
+    private Boolean alwaysSync = false;
+    private SwapSchema swapProperties;
+
+    public FlowFileRepositorySchema() {
+        swapProperties = new SwapSchema();
+    }
+
+    public FlowFileRepositorySchema(Map map) {
+        partitions = getOptionalKeyAsType(map, PARTITIONS_KEY, Number.class, 
FLOWFILE_REPO_KEY, 256);
+        checkpointInterval = getOptionalKeyAsType(map, 
CHECKPOINT_INTERVAL_KEY, String.class, FLOWFILE_REPO_KEY, "2 mins");
+        alwaysSync = getOptionalKeyAsType(map, ALWAYS_SYNC_KEY, Boolean.class, 
FLOWFILE_REPO_KEY, false);
+
+        swapProperties = getMapAsType(map, SWAP_PROPS_KEY, SwapSchema.class, 
FLOWFILE_REPO_KEY, false);
+        addIssuesIfNotNull(swapProperties);
+    }
+
+    public Number getPartitions() {
+        return partitions;
+    }
+
+    public String getCheckpointInterval() {
+        return checkpointInterval;
+    }
+
+    public boolean getAlwaysSync() {
+        return alwaysSync;
+    }
+
+    public SwapSchema getSwapProperties() {
+        return swapProperties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f528bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ProcessorSchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ProcessorSchema.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ProcessorSchema.java
new file mode 100644
index 0000000..bb86c8c
--- /dev/null
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/schema/ProcessorSchema.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.util.schema;
+
+import org.apache.nifi.minifi.bootstrap.util.schema.common.BaseSchema;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.NAME_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.PROCESSORS_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY;
+
+/**
+ *
+ */
+public class ProcessorSchema extends BaseSchema {
+    public static final String CLASS_KEY = "class";
+    public static final String PENALIZATION_PERIOD_KEY = "penalization period";
+    public static final String RUN_DURATION_NANOS_KEY = "run duration nanos";
+    public static final String AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY = 
"auto-terminated relationships list";
+    public static final String PROCESSOR_PROPS_KEY = "Properties";
+
+
+    private String name;
+    private String processorClass;
+    private Number maxConcurrentTasks = 1;
+    private String schedulingStrategy;
+    private String schedulingPeriod;
+    private String penalizationPeriod = "30 sec";
+    private String yieldPeriod = "1 sec";
+    private Number runDurationNanos = 0;
+    private List<String> autoTerminatedRelationshipsList = 
Collections.emptyList();
+    private Map<String, Object> properties = Collections.emptyMap();
+
+    public ProcessorSchema() {
+    }
+
+    public ProcessorSchema(Map map) {
+        name = getRequiredKeyAsType(map, NAME_KEY, String.class, 
PROCESSORS_KEY);
+        processorClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, 
PROCESSORS_KEY);
+
+        maxConcurrentTasks = getOptionalKeyAsType(map, 
MAX_CONCURRENT_TASKS_KEY, Number.class, PROCESSORS_KEY, 1);
+
+        schedulingStrategy = getRequiredKeyAsType(map, 
SCHEDULING_STRATEGY_KEY, String.class, PROCESSORS_KEY);
+        try {
+            SchedulingStrategy.valueOf(schedulingStrategy);
+        } catch (IllegalArgumentException e) {
+            addValidationIssue(SCHEDULING_STRATEGY_KEY, PROCESSORS_KEY, "it is 
not a valid scheduling strategy");
+        }
+
+        schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, 
String.class, PROCESSORS_KEY);
+
+        penalizationPeriod = getOptionalKeyAsType(map, 
PENALIZATION_PERIOD_KEY, String.class, PROCESSORS_KEY, "30 sec");
+
+        yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, 
String.class, PROCESSORS_KEY, "1 sec");
+
+        runDurationNanos = getOptionalKeyAsType(map, RUN_DURATION_NANOS_KEY, 
Number.class, PROCESSORS_KEY, 0);
+
+        autoTerminatedRelationshipsList = getOptionalKeyAsType(map, 
AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, List.class, PROCESSORS_KEY, null);
+
+        properties = getOptionalKeyAsType(map, PROCESSOR_PROPS_KEY, Map.class, 
PROCESSORS_KEY, null);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getProcessorClass() {
+        return processorClass;
+    }
+
+    public Number getMaxConcurrentTasks() {
+        return maxConcurrentTasks;
+    }
+
+    public String getSchedulingStrategy() {
+        return schedulingStrategy;
+    }
+
+    public String getSchedulingPeriod() {
+        return schedulingPeriod;
+    }
+
+    public String getPenalizationPeriod() {
+        return penalizationPeriod;
+    }
+
+    public String getYieldPeriod() {
+        return yieldPeriod;
+    }
+
+    public Number getRunDurationNanos() {
+        return runDurationNanos;
+    }
+
+    public List<String> getAutoTerminatedRelationshipsList() {
+        return autoTerminatedRelationshipsList;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+}

Reply via email to