MINIFI-117 - Maintainable Configuration Versioning This closes #45
Signed-off-by: Joseph Percivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/1bbeedf6 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/1bbeedf6 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/1bbeedf6 Branch: refs/heads/master Commit: 1bbeedf6fd0d2d28f604b7d6f36ceec15ac66eaf Parents: 8913a90 Author: Bryan Rosander <[email protected]> Authored: Fri Oct 7 13:23:21 2016 -0400 Committer: Joseph Percivall <[email protected]> Committed: Mon Oct 31 13:36:00 2016 -0400 ---------------------------------------------------------------------- .../bootstrap/util/ConfigTransformer.java | 4 +- .../bootstrap/util/TestConfigTransformer.java | 26 +- .../src/test/resources/config-v1.yml | 111 +++++++ minifi-bootstrap/src/test/resources/config.yml | 164 +++++------ .../schema/ComponentStatusRepositorySchema.java | 3 +- .../minifi/commons/schema/ConfigSchema.java | 228 +++------------ .../minifi/commons/schema/ConnectionSchema.java | 42 +-- .../commons/schema/ContentRepositorySchema.java | 3 +- .../commons/schema/CorePropertiesSchema.java | 3 +- .../commons/schema/FlowControllerSchema.java | 3 +- .../schema/FlowFileRepositorySchema.java | 3 +- .../minifi/commons/schema/ProcessorSchema.java | 2 +- .../schema/ProvenanceReportingSchema.java | 3 +- .../schema/ProvenanceRepositorySchema.java | 3 +- .../schema/RemoteProcessingGroupSchema.java | 3 +- .../schema/SecurityPropertiesSchema.java | 3 +- .../commons/schema/SensitivePropsSchema.java | 3 +- .../nifi/minifi/commons/schema/SwapSchema.java | 3 +- .../commons/schema/common/BaseSchema.java | 84 +++--- .../schema/common/BaseSchemaWithIdAndName.java | 24 +- .../schema/common/ConvertableSchema.java | 40 +++ .../minifi/commons/schema/common/Schema.java | 50 ++++ .../commons/schema/common/WritableSchema.java | 34 +++ .../schema/serialization/SchemaLoader.java | 27 +- .../commons/schema/v1/ConfigSchemaV1.java | 263 +++++++++++++++++ .../commons/schema/v1/ConnectionSchemaV1.java | 104 +++++++ .../commons/schema/v1/ProcessorSchemaV1.java | 103 +++++++ .../minifi/commons/schema/ConfigSchemaTest.java | 70 ++--- .../commons/schema/ConnectionSchemaTest.java | 92 +----- .../schema/serialization/SchemaLoaderTest.java | 18 +- .../commons/schema/v1/ConfigSchemaV1Test.java | 94 ++++++ .../schema/v1/ConnectionSchemaV1Test.java | 192 ++++++++++++ .../schema/v1/ProcessorSchemaV1Test.java | 241 +++++++++++++++ .../src/test/resources/config-minimal-v2.yml | 38 +++ .../src/main/markdown/System_Admin_Guide.md | 12 + .../src/main/resources/conf/config.yml | 86 +++--- .../src/main/resources/config.sh | 20 +- .../toolkit/configuration/ConfigMain.java | 257 +++++++++++----- .../toolkit/configuration/ConfigMainTest.java | 126 +++++++- .../configuration/dto/BaseSchemaTester.java | 4 +- .../configuration/dto/ConnectionSchemaTest.java | 4 +- .../src/test/resources/CsvToJson-v1.yml | 178 +++++++++++ .../resources/DecompressionCircularFlow-v1.yml | 293 +++++++++++++++++++ .../InvokeHttpMiNiFiTemplateTest-v1.yml | 262 +++++++++++++++++ ...TextExpressionLanguageCSVReformatting-v1.yml | 146 +++++++++ .../test/resources/StressTestFramework-v1.yml | 114 ++++++++ .../src/test/resources/config-v1.yml | 111 +++++++ .../src/test/resources/config.yml | 164 +++++------ 48 files changed, 3099 insertions(+), 762 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java index 87659ea..9794415 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java @@ -69,6 +69,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; public final class ConfigTransformer { @@ -89,7 +90,8 @@ public final class ConfigTransformer { public static void transformConfigFile(InputStream sourceStream, String destPath) throws Exception { ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(sourceStream); if (!configSchema.isValid()) { - throw new InvalidConfigurationException("Failed to transform config file due to:" + configSchema.getValidationIssuesAsString()); + throw new InvalidConfigurationException("Failed to transform config file due to:[" + + configSchema.getValidationIssues().stream().sorted().collect(Collectors.joining("], [")) + "]"); } // Create nifi.properties and flow.xml.gz in memory http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java index 8170d54..617da90 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java @@ -18,7 +18,6 @@ package org.apache.nifi.minifi.bootstrap.util; import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException; -import org.apache.nifi.minifi.commons.schema.ConfigSchema; import org.apache.nifi.minifi.commons.schema.ConnectionSchema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException; @@ -51,6 +50,23 @@ public class TestConfigTransformer { } @Test + public void doesTransformV1File() throws Exception { + ConfigTransformer.transformConfigFile("./src/test/resources/config-v1.yml", "./target/"); + File nifiPropertiesFile = new File("./target/nifi.properties"); + + assertTrue(nifiPropertiesFile.exists()); + assertTrue(nifiPropertiesFile.canRead()); + + nifiPropertiesFile.deleteOnExit(); + + File flowXml = new File("./target/flow.xml.gz"); + assertTrue(flowXml.exists()); + assertTrue(flowXml.canRead()); + + flowXml.deleteOnExit(); + } + + @Test public void doesTransformInputStream() throws Exception { File inputFile = new File("./src/test/resources/config.yml"); ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/"); @@ -199,10 +215,10 @@ public class TestConfigTransformer { ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-problems.yml", "./target/"); fail("Invalid configuration file was not detected."); } catch (InvalidConfigurationException e){ - assertEquals("Failed to transform config file due to:[" + ConfigSchema.CONNECTIONS_REFER_TO_PROCESSOR_NAMES_THAT_DONT_EXIST - + "null], ['scheduling strategy' in section 'Provenance Reporting' because it is not a valid scheduling strategy], ['class' in section " - + "'Processors' because it was not found and it is required], ['source name' in section 'Connections' because it was not found and it is required], [" - + BaseSchema.getIssueText(ConnectionSchema.SOURCE_ID_KEY, "Connections", BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED) + "]", e.getMessage()); + assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required], " + + "['scheduling strategy' in section 'Provenance Reporting' because it is not a valid scheduling strategy], " + + "[" + BaseSchema.getIssueText(ConnectionSchema.SOURCE_ID_KEY, "Connections", BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED) + "], " + + "['source name' in section 'Connections' because it was not found and it is required]", e.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-bootstrap/src/test/resources/config-v1.yml ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/resources/config-v1.yml b/minifi-bootstrap/src/test/resources/config-v1.yml new file mode 100644 index 0000000..2af6b9b --- /dev/null +++ b/minifi-bootstrap/src/test/resources/config-v1.yml @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Flow Controller: + name: MiNiFi Flow + comment: + +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 + +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: + threshold: 20000 + in period: 5 sec + in threads: 1 + out period: 5 sec + out threads: 4 + +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false + +Provenance Repository: + provenance rollover time: 1 min + +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min + +Security Properties: + keystore: /tmp/ssl/localhost-ks.jks + keystore type: JKS + keystore password: localtest + key password: localtest + truststore: /tmp/ssl/localhost-ts.jks + truststore type: JKS + truststore password: localtest + ssl protocol: TLS + Sensitive Props: + key: + algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL + provider: BC + +Processors: + - name: TailFile + class: org.apache.nifi.processors.standard.TailFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + File to Tail: logs/minifi-app.log + Rolling Filename Pattern: minifi-app* + Initial Start Position: Beginning of File + +Connections: + - name: TailToS2S + source name: TailFile + source relationship name: success + destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer + +Remote Processing Groups: + - name: NiFi Flow + comment: + url: https://localhost:8090/nifi + timeout: 30 secs + yield period: 10 sec + Input Ports: + - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61 + name: tailed log + comments: + max concurrent tasks: 1 + use compression: false + +Provenance Reporting: + comment: + scheduling strategy: TIMER_DRIVEN + scheduling period: 30 sec + destination url: https://localhost:8090/ + port name: provenance + originating url: http://${hostname(true)}:8081/nifi + use compression: true + timeout: 30 secs + batch size: 1000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-bootstrap/src/test/resources/config.yml ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/resources/config.yml b/minifi-bootstrap/src/test/resources/config.yml index 2af6b9b..64a28fe 100644 --- a/minifi-bootstrap/src/test/resources/config.yml +++ b/minifi-bootstrap/src/test/resources/config.yml @@ -13,99 +13,93 @@ # See the License for the specific language governing permissions and # limitations under the License. +MiNiFi Config Version: 2 Flow Controller: - name: MiNiFi Flow - comment: - + name: MiNiFi Flow + comment: '' Core Properties: - flow controller graceful shutdown period: 10 sec - flow service write delay interval: 500 ms - administrative yield duration: 30 sec - bored yield duration: 10 millis - max concurrent threads: 1 - + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 FlowFile Repository: - partitions: 256 - checkpoint interval: 2 mins - always sync: false - Swap: - threshold: 20000 - in period: 5 sec - in threads: 1 - out period: 5 sec - out threads: 4 - + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: + threshold: 20000 + in period: 5 sec + in threads: 1 + out period: 5 sec + out threads: 4 Content Repository: - content claim max appendable size: 10 MB - content claim max flow files: 100 - always sync: false - + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false Provenance Repository: - provenance rollover time: 1 min - + provenance rollover time: 1 min Component Status Repository: - buffer size: 1440 - snapshot frequency: 1 min - + buffer size: 1440 + snapshot frequency: 1 min Security Properties: - keystore: /tmp/ssl/localhost-ks.jks - keystore type: JKS - keystore password: localtest - key password: localtest - truststore: /tmp/ssl/localhost-ts.jks - truststore type: JKS - truststore password: localtest - ssl protocol: TLS - Sensitive Props: - key: - algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL - provider: BC - + keystore: /tmp/ssl/localhost-ks.jks + keystore type: JKS + keystore password: localtest + key password: localtest + truststore: /tmp/ssl/localhost-ts.jks + truststore type: JKS + truststore password: localtest + ssl protocol: TLS + Sensitive Props: + key: '' + algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL + provider: BC Processors: - - name: TailFile - class: org.apache.nifi.processors.standard.TailFile - max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec - penalization period: 30 sec - yield period: 1 sec - run duration nanos: 0 - auto-terminated relationships list: - Properties: - File to Tail: logs/minifi-app.log - Rolling Filename Pattern: minifi-app* - Initial Start Position: Beginning of File - +- id: TailFile + name: TailFile + class: org.apache.nifi.processors.standard.TailFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + File to Tail: logs/minifi-app.log + Initial Start Position: Beginning of File + Rolling Filename Pattern: minifi-app* Connections: - - name: TailToS2S - source name: TailFile - source relationship name: success - destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61 - max work queue size: 0 - max work queue data size: 1 MB - flowfile expiration: 60 sec - queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer - +- id: TailToS2S + name: TailToS2S + source id: TailFile + source relationship names: + - success + destination id: 8644cbcc-a45c-40e0-964d-5e536e2ada61 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer Remote Processing Groups: - - name: NiFi Flow - comment: - url: https://localhost:8090/nifi - timeout: 30 secs - yield period: 10 sec - Input Ports: - - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61 - name: tailed log - comments: - max concurrent tasks: 1 - use compression: false - +- name: NiFi Flow + url: https://localhost:8090/nifi + comment: '' + timeout: 30 secs + yield period: 10 sec + Input Ports: + - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61 + name: tailed log + comment: '' + max concurrent tasks: 1 + use compression: false Provenance Reporting: - comment: - scheduling strategy: TIMER_DRIVEN - scheduling period: 30 sec - destination url: https://localhost:8090/ - port name: provenance - originating url: http://${hostname(true)}:8081/nifi - use compression: true - timeout: 30 secs - batch size: 1000 \ No newline at end of file + comment: '' + scheduling strategy: TIMER_DRIVEN + scheduling period: 30 sec + destination url: https://localhost:8090/ + port name: provenance + originating url: http://${hostname(true)}:8081/nifi + use compression: true + timeout: 30 secs + batch size: 1000 http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java index 02f3a78..7f685fc 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java @@ -18,12 +18,13 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; -public class ComponentStatusRepositorySchema extends BaseSchema { +public class ComponentStatusRepositorySchema extends BaseSchema implements WritableSchema { public static final String BUFFER_SIZE_KEY = "buffer size"; public static final String SNAPSHOT_FREQUENCY_KEY = "snapshot frequency"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java index 6ed2dca..8dfd9d4 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java @@ -18,17 +18,16 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; import org.apache.nifi.minifi.commons.schema.common.StringUtil; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY; @@ -46,20 +45,18 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SE /** * */ -public class ConfigSchema extends BaseSchema { +public class ConfigSchema extends BaseSchema implements WritableSchema, ConvertableSchema<ConfigSchema> { public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: "; public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: "; public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: "; public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: "; public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in Processors and Remote Input Ports: "; - public static final String CANNOT_LOOK_UP_PROCESSOR_ID_FROM_PROCESSOR_NAME_DUE_TO_DUPLICATE_PROCESSOR_NAMES = "Cannot look up Processor id from Processor name due to duplicate Processor names: "; public static final int CONFIG_VERSION = 2; - public static final String CONNECTIONS_REFER_TO_PROCESSOR_NAMES_THAT_DONT_EXIST = "Connection(s) refer to Processor names that don't exist: "; - public static String TOP_LEVEL_NAME = "top level"; public static final String VERSION = "MiNiFi Config Version"; - public static final String EMPTY_NAME = "empty_name"; - public static final Pattern ID_REPLACE_PATTERN = Pattern.compile("[^A-Za-z0-9_-]"); - + public static final String CONNECTION_WITH_ID = "Connection with id "; + public static final String HAS_INVALID_SOURCE_ID = " has invalid source id "; + public static final String HAS_INVALID_DESTINATION_ID = " has invalid destination id "; + public static String TOP_LEVEL_NAME = "top level"; private FlowControllerSchema flowControllerProperties; private CorePropertiesSchema coreProperties; private FlowFileRepositorySchema flowfileRepositoryProperties; @@ -74,6 +71,11 @@ public class ConfigSchema extends BaseSchema { private ProvenanceRepositorySchema provenanceRepositorySchema; public ConfigSchema(Map map) { + this(map, Collections.emptyList()); + } + + public ConfigSchema(Map map, List<String> validationIssues) { + validationIssues.stream().forEach(this::addValidationIssue); flowControllerProperties = getMapAsType(map, FLOW_CONTROLLER_PROPS_KEY, FlowControllerSchema.class, TOP_LEVEL_NAME, true); coreProperties = getMapAsType(map, CORE_PROPS_KEY, CorePropertiesSchema.class, TOP_LEVEL_NAME, false); @@ -83,12 +85,13 @@ public class ConfigSchema extends BaseSchema { componentStatusRepositoryProperties = getMapAsType(map, COMPONENT_STATUS_REPO_KEY, ComponentStatusRepositorySchema.class, TOP_LEVEL_NAME, false); securityProperties = getMapAsType(map, SECURITY_PROPS_KEY, SecurityPropertiesSchema.class, TOP_LEVEL_NAME, false); - processors = getProcessorSchemas(getOptionalKeyAsType(map, PROCESSORS_KEY, List.class, TOP_LEVEL_NAME, null)); + processors = convertListToType(getOptionalKeyAsType(map, PROCESSORS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()), "processor", ProcessorSchema.class, PROCESSORS_KEY); - remoteProcessingGroups = convertListToType(getOptionalKeyAsType(map, REMOTE_PROCESSING_GROUPS_KEY, List.class, TOP_LEVEL_NAME, null), "remote processing group", + remoteProcessingGroups = convertListToType(getOptionalKeyAsType(map, REMOTE_PROCESSING_GROUPS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()), "remote processing group", RemoteProcessingGroupSchema.class, REMOTE_PROCESSING_GROUPS_KEY); - connections = getConnectionSchemas(getOptionalKeyAsType(map, CONNECTIONS_KEY, List.class, TOP_LEVEL_NAME, null)); + connections = convertListToType(getOptionalKeyAsType(map, CONNECTIONS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()), + "connection", ConnectionSchema.class, CONNECTIONS_KEY); provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false); @@ -100,170 +103,49 @@ public class ConfigSchema extends BaseSchema { addIssuesIfNotNull(securityProperties); addIssuesIfNotNull(provenanceReportingProperties); addIssuesIfNotNull(provenanceRepositorySchema); + addIssuesIfNotNull(processors); + addIssuesIfNotNull(connections); + addIssuesIfNotNull(remoteProcessingGroups); Set<String> processorIds = new HashSet<>(); - if (processors != null) { - List<String> processorIdList = processors.stream().map(ProcessorSchema::getId).collect(Collectors.toList()); - checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, processorIdList); - for (ProcessorSchema processorSchema : processors) { - addIssuesIfNotNull(processorSchema); - } - processorIds.addAll(processorIdList); - } + List<String> processorIdList = processors.stream().map(ProcessorSchema::getId).collect(Collectors.toList()); + processorIds.addAll(processorIdList); - if (connections != null) { - List<String> idList = connections.stream().map(ConnectionSchema::getId).filter(s -> !StringUtil.isNullOrEmpty(s)).collect(Collectors.toList()); - checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, idList); - for (ConnectionSchema connectionSchema : connections) { - addIssuesIfNotNull(connectionSchema); - } - } + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, processorIdList); + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, connections.stream().map(ConnectionSchema::getId).collect(Collectors.toList())); + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, + remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList())); Set<String> remoteInputPortIds = new HashSet<>(); - if (remoteProcessingGroups != null) { - checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, - remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList())); - for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : remoteProcessingGroups) { - addIssuesIfNotNull(remoteProcessingGroupSchema); - } - List<RemoteProcessingGroupSchema> remoteProcessingGroups = getRemoteProcessingGroups(); - if (remoteProcessingGroups != null) { - List<String> remoteInputPortIdList = remoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) - .flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toList()); - checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, remoteInputPortIdList); - remoteInputPortIds.addAll(remoteInputPortIdList); - } - } + List<String> remoteInputPortIdList = remoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) + .flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toList()); + checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, remoteInputPortIdList); + remoteInputPortIds.addAll(remoteInputPortIdList); Set<String> duplicateIds = new HashSet<>(processorIds); duplicateIds.retainAll(remoteInputPortIds); if (duplicateIds.size() > 0) { addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + duplicateIds.stream().sorted().collect(Collectors.joining(", "))); } - } - - protected List<ProcessorSchema> getProcessorSchemas(List<Map> processorMaps) { - if (processorMaps == null) { - return null; - } - List<ProcessorSchema> processors = convertListToType(processorMaps, "processor", ProcessorSchema.class, PROCESSORS_KEY); - - Map<String, Integer> idMap = processors.stream().map(ProcessorSchema::getId).filter( - s -> !StringUtil.isNullOrEmpty(s)).collect(Collectors.toMap(Function.identity(), s -> 2, Integer::compareTo)); - - // Set unset ids - processors.stream().filter(connection -> StringUtil.isNullOrEmpty(connection.getId())).forEachOrdered(processor -> processor.setId(getUniqueId(idMap, processor.getName()))); - - return processors; - } - - protected List<ConnectionSchema> getConnectionSchemas(List<Map> connectionMaps) { - if (connectionMaps == null) { - return null; - } - List<ConnectionSchema> connections = convertListToType(connectionMaps, "connection", ConnectionSchema.class, CONNECTIONS_KEY); - Map<String, Integer> idMap = connections.stream().map(ConnectionSchema::getId).filter( - s -> !StringUtil.isNullOrEmpty(s)).collect(Collectors.toMap(Function.identity(), s -> 2, Integer::compareTo)); - - Map<String, String> processorNameToIdMap = new HashMap<>(); - - // We can't look up id by name for names that appear more than once - Set<String> duplicateProcessorNames = new HashSet<>(); - - List<ProcessorSchema> processors = getProcessors(); - if (processors != null) { - processors.stream().forEachOrdered(p -> processorNameToIdMap.put(p.getName(), p.getId())); - - Set<String> processorNames = new HashSet<>(); - processors.stream().map(ProcessorSchema::getName).forEachOrdered(n -> { - if (!processorNames.add(n)) { - duplicateProcessorNames.add(n); - } - }); - } - Set<String> remoteInputPortIds = new HashSet<>(); - List<RemoteProcessingGroupSchema> remoteProcessingGroups = getRemoteProcessingGroups(); - if (remoteProcessingGroups != null) { - remoteInputPortIds.addAll(remoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null) - .flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toSet())); - } - - Set<String> problematicDuplicateNames = new HashSet<>(); - Set<String> missingProcessorNames = new HashSet<>(); - // Set unset ids - connections.stream().filter(connection -> StringUtil.isNullOrEmpty(connection.getId())).forEachOrdered(connection -> connection.setId(getUniqueId(idMap, connection.getName()))); - - connections.stream().filter(connection -> StringUtil.isNullOrEmpty(connection.getSourceId())).forEach(connection -> { - String sourceName = connection.getSourceName(); - if (remoteInputPortIds.contains(sourceName)) { - connection.setSourceId(sourceName); - } else { - if (duplicateProcessorNames.contains(sourceName)) { - problematicDuplicateNames.add(sourceName); - } - String sourceId = processorNameToIdMap.get(sourceName); - if (StringUtil.isNullOrEmpty(sourceId)) { - missingProcessorNames.add(sourceName); - } else { - connection.setSourceId(sourceId); - } + Set<String> connectableIds = new HashSet<>(processorIds); + connectableIds.addAll(remoteInputPortIds); + connections.forEach(c -> { + String destinationId = c.getDestinationId(); + if (!StringUtil.isNullOrEmpty(destinationId) && !connectableIds.contains(destinationId)) { + addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_DESTINATION_ID + destinationId); } - }); - - connections.stream().filter(connection -> StringUtil.isNullOrEmpty(connection.getDestinationId())) - .forEach(connection -> { - String destinationName = connection.getDestinationName(); - if (remoteInputPortIds.contains(destinationName)) { - connection.setDestinationId(destinationName); - } else { - if (duplicateProcessorNames.contains(destinationName)) { - problematicDuplicateNames.add(destinationName); - } - String destinationId = processorNameToIdMap.get(destinationName); - if (StringUtil.isNullOrEmpty(destinationId)) { - missingProcessorNames.add(destinationName); - } else { - connection.setDestinationId(destinationId); - } - } - }); - - if (problematicDuplicateNames.size() > 0) { - addValidationIssue(CANNOT_LOOK_UP_PROCESSOR_ID_FROM_PROCESSOR_NAME_DUE_TO_DUPLICATE_PROCESSOR_NAMES - + problematicDuplicateNames.stream().collect(Collectors.joining(", "))); - } - if (missingProcessorNames.size() > 0) { - addValidationIssue(CONNECTIONS_REFER_TO_PROCESSOR_NAMES_THAT_DONT_EXIST + missingProcessorNames.stream().sorted().collect(Collectors.joining(", "))); - } - return connections; - } - - protected static void checkForDuplicates(Consumer<String> duplicateMessageConsumer, String errorMessagePrefix, List<String> strings) { - if (strings != null) { - Set<String> seen = new HashSet<>(); - Set<String> duplicates = new TreeSet<>(); - for (String string : strings) { - if (!seen.add(string)) { - duplicates.add(String.valueOf(string)); - } + String sourceId = c.getSourceId(); + if (!StringUtil.isNullOrEmpty(sourceId) && !connectableIds.contains(sourceId)) { + addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_SOURCE_ID + sourceId); } - if (duplicates.size() > 0) { - StringBuilder errorMessage = new StringBuilder(errorMessagePrefix); - for (String duplicateName : duplicates) { - errorMessage.append(duplicateName); - errorMessage.append(", "); - } - errorMessage.setLength(errorMessage.length() - 2); - duplicateMessageConsumer.accept(errorMessage.toString()); - } - } + }); } public Map<String, Object> toMap() { Map<String, Object> result = mapSupplier.get(); result.put(VERSION, getVersion()); - result.put(FLOW_CONTROLLER_PROPS_KEY, flowControllerProperties.toMap()); + putIfNotNull(result, FLOW_CONTROLLER_PROPS_KEY, flowControllerProperties); putIfNotNull(result, CORE_PROPS_KEY, coreProperties); putIfNotNull(result, FLOWFILE_REPO_KEY, flowfileRepositoryProperties); putIfNotNull(result, CONTENT_REPO_KEY, contentRepositoryProperties); @@ -321,31 +203,13 @@ public class ConfigSchema extends BaseSchema { return provenanceRepositorySchema; } + @Override public int getVersion() { return CONFIG_VERSION; } - /** - * Will replace all characters not in [A-Za-z0-9_] with _ - * <p> - * This has potential for collisions so it will also append numbers as necessary to prevent that - * - * @param ids id map of already incremented numbers - * @param name the name - * @return a unique filesystem-friendly id - */ - protected static String getUniqueId(Map<String, Integer> ids, String name) { - String baseId = StringUtil.isNullOrEmpty(name) ? EMPTY_NAME : ID_REPLACE_PATTERN.matcher(name).replaceAll("_"); - String id = baseId; - Integer idNum = ids.get(baseId); - while (ids.containsKey(id)) { - id = baseId + "_" + idNum++; - } - // Using != on a string comparison here is intentional. The two will be reference equal iff the body of the while loop was never executed. - if (id != baseId) { - ids.put(baseId, idNum); - } - ids.put(id, 2); - return id; + @Override + public ConfigSchema convert() { + return this; } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java index ceba2ca..768213c 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java @@ -21,7 +21,6 @@ import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName; import org.apache.nifi.minifi.commons.schema.common.StringUtil; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -30,15 +29,12 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CO public class ConnectionSchema extends BaseSchemaWithIdAndName { public static final String SOURCE_ID_KEY = "source id"; - public static final String SOURCE_RELATIONSHIP_NAME_KEY = "source relationship name"; public static final String SOURCE_RELATIONSHIP_NAMES_KEY = "source relationship names"; public static final String DESTINATION_ID_KEY = "destination id"; public static final String MAX_WORK_QUEUE_SIZE_KEY = "max work queue size"; public static final String MAX_WORK_QUEUE_DATA_SIZE_KEY = "max work queue data size"; public static final String FLOWFILE_EXPIRATION__KEY = "flowfile expiration"; public static final String QUEUE_PRIORITIZER_CLASS_KEY = "queue prioritizer class"; - public static final String SOURCE_NAME_KEY = "source name"; - public static final String DESTINATION_NAME_KEY = "destination name"; public static final long DEFAULT_MAX_WORK_QUEUE_SIZE = 0; public static final String DEFAULT_MAX_QUEUE_DATA_SIZE = "0 MB"; @@ -48,9 +44,6 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName { private List<String> sourceRelationshipNames; private String destinationId; - private String sourceName; - private String destinationName; - private Number maxWorkQueueSize = DEFAULT_MAX_WORK_QUEUE_SIZE; private String maxWorkQueueDataSize = DEFAULT_MAX_QUEUE_DATA_SIZE; private String flowfileExpiration = DEFAULT_FLOWFILE_EXPIRATION; @@ -59,30 +52,13 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName { public ConnectionSchema(Map map) { super(map, CONNECTIONS_KEY); + // In case of older version, these may not be available until after construction, validated in getValidationIssues() sourceId = getOptionalKeyAsType(map, SOURCE_ID_KEY, String.class, CONNECTIONS_KEY, ""); - if (StringUtil.isNullOrEmpty(sourceId)) { - sourceName = getRequiredKeyAsType(map, SOURCE_NAME_KEY, String.class, CONNECTIONS_KEY); - } - - String sourceRelationshipName = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAME_KEY, String.class, CONNECTIONS_KEY, null); - if (StringUtil.isNullOrEmpty(sourceRelationshipName)) { - sourceRelationshipNames = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, CONNECTIONS_KEY, new ArrayList()); - if (sourceRelationshipNames.isEmpty()) { - addValidationIssue(getIssueText(SOURCE_RELATIONSHIP_NAMES_KEY, CONNECTIONS_KEY, "expected at least one relationship to be specified")); - } - } else { - if (map.containsKey(SOURCE_RELATIONSHIP_NAMES_KEY)) { - addValidationIssue("Only one of " + SOURCE_RELATIONSHIP_NAME_KEY + ", " + SOURCE_RELATIONSHIP_NAMES_KEY + " should be set per connection. Found both on " - + (StringUtil.isNullOrEmpty(getName()) ? getId() : getName())); - sourceRelationshipNames = getRequiredKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, CONNECTIONS_KEY); - } else { - sourceRelationshipNames = new ArrayList<>(Arrays.asList(sourceRelationshipName)); - } - } - destinationId = getOptionalKeyAsType(map, DESTINATION_ID_KEY, String.class, CONNECTIONS_KEY, ""); - if (StringUtil.isNullOrEmpty(getDestinationId())) { - destinationName = getRequiredKeyAsType(map, DESTINATION_NAME_KEY, String.class, CONNECTIONS_KEY); + + sourceRelationshipNames = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, CONNECTIONS_KEY, new ArrayList<>()); + if (sourceRelationshipNames.isEmpty()) { + addValidationIssue("Expected at least one value in " + SOURCE_RELATIONSHIP_NAMES_KEY + " for " + CONNECTIONS_KEY + " " + getName()); } maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, CONNECTIONS_KEY, DEFAULT_MAX_WORK_QUEUE_SIZE); @@ -141,14 +117,6 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName { return queuePrioritizerClass; } - public String getSourceName() { - return sourceName; - } - - public String getDestinationName() { - return destinationName; - } - @Override public List<String> getValidationIssues() { List<String> validationIssues = super.getValidationIssues(); http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java index 868cb79..e96bb75 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; @@ -27,7 +28,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CO /** * */ -public class ContentRepositorySchema extends BaseSchema { +public class ContentRepositorySchema extends BaseSchema implements WritableSchema { public static final String CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY = "content claim max appendable size"; public static final String CONTENT_CLAIM_MAX_FLOW_FILES_KEY = "content claim max flow files"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java index ce30d9c..f5f83d8 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; @@ -27,7 +28,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MA /** * */ -public class CorePropertiesSchema extends BaseSchema { +public class CorePropertiesSchema extends BaseSchema implements WritableSchema { public static final String FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY = "flow controller graceful shutdown period"; public static final String FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY = "flow service write delay interval"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java index 3306029..49bffb9 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; @@ -28,7 +29,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NA /** * */ -public class FlowControllerSchema extends BaseSchema { +public class FlowControllerSchema extends BaseSchema implements WritableSchema { private String name; private String comment; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java index cd7f456..bf94e4d 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; @@ -28,7 +29,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SW /** * */ -public class FlowFileRepositorySchema extends BaseSchema { +public class FlowFileRepositorySchema extends BaseSchema implements WritableSchema { public static final String PARTITIONS_KEY = "partitions"; public static final String CHECKPOINT_INTERVAL_KEY = "checkpoint interval"; public static final int DEFAULT_PARTITIONS = 256; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java index e2bde47..048027c 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java @@ -77,7 +77,7 @@ public class ProcessorSchema extends BaseSchemaWithIdAndName { annotationData = getOptionalKeyAsType(map, ANNOTATION_DATA_KEY, String.class, PROCESSORS_KEY, ""); } - private static boolean isSchedulingStrategy(String string) { + public static boolean isSchedulingStrategy(String string) { try { SchedulingStrategy.valueOf(string); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java index b12adb7..6490511 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import org.apache.nifi.scheduling.SchedulingStrategy; import java.util.Map; @@ -32,7 +33,7 @@ import static org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema. /** * */ -public class ProvenanceReportingSchema extends BaseSchema { +public class ProvenanceReportingSchema extends BaseSchema implements WritableSchema { public static final String DESTINATION_URL_KEY = "destination url"; public static final String PORT_NAME_KEY = "port name"; public static final String ORIGINATING_URL_KEY = "originating url"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java index ac858ef..1f1d02a 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java @@ -18,12 +18,13 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY; -public class ProvenanceRepositorySchema extends BaseSchema { +public class ProvenanceRepositorySchema extends BaseSchema implements WritableSchema { public static final String PROVENANCE_REPO_ROLLOVER_TIME_KEY = "provenance rollover time"; public static final String DEFAULT_PROVENANCE_ROLLOVER_TIME = "1 min"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java index 86ed71e..c1d318e 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.List; import java.util.Map; @@ -31,7 +32,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YI /** * */ -public class RemoteProcessingGroupSchema extends BaseSchema { +public class RemoteProcessingGroupSchema extends BaseSchema implements WritableSchema { public static final String URL_KEY = "url"; public static final String TIMEOUT_KEY = "timeout"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java index 6adfdfe..8ad68bb 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java @@ -19,6 +19,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; import org.apache.nifi.minifi.commons.schema.common.StringUtil; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; @@ -28,7 +29,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SE /** * */ -public class SecurityPropertiesSchema extends BaseSchema { +public class SecurityPropertiesSchema extends BaseSchema implements WritableSchema { public static final String KEYSTORE_KEY = "keystore"; public static final String KEYSTORE_TYPE_KEY = "keystore type"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java index 93260ea..afbefac 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; @@ -26,7 +27,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SE /** * */ -public class SensitivePropsSchema extends BaseSchema { +public class SensitivePropsSchema extends BaseSchema implements WritableSchema { public static final String SENSITIVE_PROPS_KEY_KEY = "key"; public static final String SENSITIVE_PROPS_ALGORITHM_KEY = "algorithm"; public static final String SENSITIVE_PROPS_PROVIDER_KEY = "provider"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java index d38ce7a..ee4b8c6 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.Map; @@ -26,7 +27,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SW /** * */ -public class SwapSchema extends BaseSchema { +public class SwapSchema extends BaseSchema implements WritableSchema { public static final String THRESHOLD_KEY = "threshold"; public static final String IN_PERIOD_KEY = "in period"; public static final String IN_THREADS_KEY = "in threads"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java index 0c269cc..7ba322a 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java @@ -20,18 +20,25 @@ package org.apache.nifi.minifi.commons.schema.common; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Predicate; +import java.util.TreeSet; +import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.regex.Pattern; import java.util.stream.Collectors; -public abstract class BaseSchema { +public abstract class BaseSchema implements Schema { public static final String IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED = "it was not found and it is required"; + public static final String EMPTY_NAME = "empty_name"; + + public static final Pattern ID_REPLACE_PATTERN = Pattern.compile("[^A-Za-z0-9_-]"); + protected final Supplier<Map<String, Object>> mapSupplier; public BaseSchema() { @@ -43,41 +50,16 @@ public abstract class BaseSchema { } /******* Validation Issue helper methods *******/ - private List<String> validationIssues = new LinkedList<>(); + private Collection<String> validationIssues = new HashSet<>(); + @Override public boolean isValid() { return getValidationIssues().isEmpty(); } + @Override public List<String> getValidationIssues() { - return new ArrayList<>(validationIssues); - } - - public String getValidationIssuesAsString() { - StringBuilder stringBuilder = new StringBuilder(); - boolean first = true; - for (String validationIssue : getValidationIssues()) { - if (!first) { - stringBuilder.append(", "); - } - stringBuilder.append("["); - stringBuilder.append(validationIssue); - stringBuilder.append("]"); - first = false; - } - return stringBuilder.toString(); - } - - public <T> T getAndValidateNotNull(Supplier<T> supplier, String keyName, String wrapperName) { - return getAndValidate(supplier, t -> t != null, keyName, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED); - } - - public <T> T getAndValidate(Supplier<T> supplier, Predicate<T> predicate, String keyName, String wrapperName, String reason) { - T result = supplier.get(); - if (!predicate.test(result)) { - addValidationIssue(keyName, wrapperName, reason); - } - return result; + return validationIssues.stream().sorted().collect(Collectors.toList()); } public void addValidationIssue(String issue) { @@ -98,6 +80,12 @@ public abstract class BaseSchema { } } + public void addIssuesIfNotNull(List<? extends BaseSchema> baseSchemas) { + if (baseSchemas != null) { + baseSchemas.forEach(this::addIssuesIfNotNull); + } + } + /******* Value Access/Interpretation helper methods *******/ public <T> T getOptionalKeyAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName, T defaultValue) { return getKeyAsType(valueMap, key, targetClass, wrapperName, false, defaultValue); @@ -138,7 +126,7 @@ public abstract class BaseSchema { public <InputT, OutputT> List<OutputT> convertListToType(List<InputT> list, String simpleListType, Class<? extends OutputT> targetClass, String wrapperName){ if (list == null) { - return null; + return new ArrayList<>(); } List<OutputT> result = new ArrayList<>(list.size()); for (int i = 0; i < list.size(); i++) { @@ -182,17 +170,15 @@ public abstract class BaseSchema { return null; } - public abstract Map<String, Object> toMap(); - - public static void putIfNotNull(Map valueMap, String key, BaseSchema schema) { + public static void putIfNotNull(Map valueMap, String key, WritableSchema schema) { if (schema != null) { valueMap.put(key, schema.toMap()); } } - public static void putListIfNotNull(Map valueMap, String key, List<? extends BaseSchema> list) { + public static void putListIfNotNull(Map valueMap, String key, List<? extends WritableSchema> list) { if (list != null) { - valueMap.put(key, list.stream().map(BaseSchema::toMap).collect(Collectors.toList())); + valueMap.put(key, list.stream().map(WritableSchema::toMap).collect(Collectors.toList())); } } @@ -207,4 +193,26 @@ public abstract class BaseSchema { public static <K, V> Map<K, V> nullToEmpty(Map<K, V> map) { return map == null ? Collections.emptyMap() : map; } + + + + public static void checkForDuplicates(Consumer<String> duplicateMessageConsumer, String errorMessagePrefix, List<String> strings) { + if (strings != null) { + Set<String> seen = new HashSet<>(); + Set<String> duplicates = new TreeSet<>(); + for (String string : strings) { + if (!seen.add(string)) { + duplicates.add(String.valueOf(string)); + } + } + if (duplicates.size() > 0) { + duplicateMessageConsumer.accept(errorMessagePrefix + duplicates.stream().collect(Collectors.joining(", "))); + } + } + } + + @Override + public void clearValidationIssues() { + validationIssues.clear(); + } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java index 8acb167..9ab6718 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java @@ -26,7 +26,7 @@ import java.util.regex.Pattern; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; -public abstract class BaseSchemaWithIdAndName extends BaseSchema { +public abstract class BaseSchemaWithIdAndName extends BaseSchema implements WritableSchema { public static final Pattern VALID_ID_PATTERN = Pattern.compile("[A-Za-z0-9_-]+"); public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + "): "; @@ -40,35 +40,35 @@ public abstract class BaseSchemaWithIdAndName extends BaseSchema { this.wrapperName = wrapperName; } - protected String getName(Map map, String wrapperName) { - return getOptionalKeyAsType(map, NAME_KEY, String.class, wrapperName, ""); - } - protected String getId(Map map, String wrapperName) { return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, ""); } - public void setId(String id) { - this.id = id; - } - - protected void setName(String name) { - this.name = name; + protected String getName(Map map, String wrapperName) { + return getOptionalKeyAsType(map, NAME_KEY, String.class, wrapperName, ""); } public String getId() { return id; } + public void setId(String id) { + this.id = id; + } + public String getName() { return name; } + protected void setName(String name) { + this.name = name; + } + @Override public Map<String, Object> toMap() { Map<String, Object> map = mapSupplier.get(); - map.put(NAME_KEY, name); map.put(ID_KEY, id); + map.put(NAME_KEY, name); return map; } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/ConvertableSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/ConvertableSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/ConvertableSchema.java new file mode 100644 index 0000000..61a5b2a --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/ConvertableSchema.java @@ -0,0 +1,40 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.commons.schema.common; + +/** + * Schema that can be converted to another. Typically used to upconvert older versions to newer. + * @param <T> the type it can be converted to + */ +public interface ConvertableSchema<T extends Schema> extends Schema { + /** + * Converts this instance to the destination type. + * + * @return the converted instance + */ + T convert(); + + /** + * Returns the version of this Schema before conversion. + * + * @return the version of this Schema before conversion. + */ + int getVersion(); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/Schema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/Schema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/Schema.java new file mode 100644 index 0000000..f038d76 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/Schema.java @@ -0,0 +1,50 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.commons.schema.common; + +import java.util.List; + +public interface Schema { + /** + * Returns known problems with the given schema + * + * @return known problems with the given schema + */ + List<String> getValidationIssues(); + + /** + * Adds a validation issue + * + * @param issue the issue to add + */ + void addValidationIssue(String issue); + + /** + * Returns a boolean indicating whether the schema is valid + * + * @return a boolean indicating whether the schema is valid + */ + boolean isValid(); + + /** + * Clears validation issues + */ + void clearValidationIssues(); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/WritableSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/WritableSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/WritableSchema.java new file mode 100644 index 0000000..2119e85 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/WritableSchema.java @@ -0,0 +1,34 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.commons.schema.common; + +import java.util.Map; + +/** + * Schema that can be serialized to a Map + */ +public interface WritableSchema extends Schema { + /** + * Serialize the schema to a Map + * + * @return the output map + */ + Map<String, Object> toMap(); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/1bbeedf6/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoader.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoader.java index 9d73bb7..331f40d 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoader.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoader.java @@ -18,7 +18,10 @@ package org.apache.nifi.minifi.commons.schema.serialization; import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.common.StringUtil; import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException; +import org.apache.nifi.minifi.commons.schema.v1.ConfigSchemaV1; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.error.YAMLException; @@ -30,13 +33,13 @@ import java.util.function.Function; import java.util.stream.Collectors; public class SchemaLoader { - private static final Map<String, Function<Map, ConfigSchema>> configSchemaFactories = initConfigSchemaFactories(); + private static final Map<String, Function<Map, ConvertableSchema<ConfigSchema>>> configSchemaFactories = initConfigSchemaFactories(); - private static Map<String, Function<Map, ConfigSchema>> initConfigSchemaFactories() { - Map<String, Function<Map, ConfigSchema>> result = new HashMap<>(); - result.put(String.valueOf((Object)null), ConfigSchema::new); - result.put("", ConfigSchema::new); - result.put("1", ConfigSchema::new); + private static Map<String, Function<Map, ConvertableSchema<ConfigSchema>>> initConfigSchemaFactories() { + Map<String, Function<Map, ConvertableSchema<ConfigSchema>>> result = new HashMap<>(); + result.put(String.valueOf((Object) null), ConfigSchemaV1::new); + result.put("", ConfigSchemaV1::new); + result.put(Integer.toString(ConfigSchemaV1.CONFIG_VERSION), ConfigSchemaV1::new); result.put(Integer.toString(ConfigSchema.CONFIG_VERSION), ConfigSchema::new); return result; } @@ -67,11 +70,19 @@ public class SchemaLoader { } public static ConfigSchema loadConfigSchemaFromYaml(Map<String, Object> yamlAsMap) throws SchemaLoaderException { + return loadConvertableSchemaFromYaml(yamlAsMap).convert(); + } + + public static ConvertableSchema<ConfigSchema> loadConvertableSchemaFromYaml(InputStream inputStream) throws SchemaLoaderException, IOException { + return loadConvertableSchemaFromYaml(loadYamlAsMap(inputStream)); + } + + public static ConvertableSchema<ConfigSchema> loadConvertableSchemaFromYaml(Map<String, Object> yamlAsMap) throws SchemaLoaderException { String version = String.valueOf(yamlAsMap.get(ConfigSchema.VERSION)); - Function<Map, ConfigSchema> schemaFactory = configSchemaFactories.get(version); + Function<Map, ConvertableSchema<ConfigSchema>> schemaFactory = configSchemaFactories.get(version); if (schemaFactory == null) { throw new SchemaLoaderException("YAML configuration version " + version + " not supported. Supported versions: " - + configSchemaFactories.keySet().stream().sorted().collect(Collectors.joining(", "))); + + configSchemaFactories.keySet().stream().filter(s -> !StringUtil.isNullOrEmpty(s) && !String.valueOf((Object) null).equals(s)).sorted().collect(Collectors.joining(", "))); } return schemaFactory.apply(yamlAsMap); }
