http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java index 048027c..6f2ff8e 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.TreeMap; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY; @@ -59,22 +58,23 @@ public class ProcessorSchema extends BaseSchemaWithIdAndName { private String annotationData = ""; public ProcessorSchema(Map map) { - super(map, PROCESSORS_KEY); - processorClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, PROCESSORS_KEY); - schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROCESSORS_KEY); + super(map, "Processor(id: {id}, name: {name})"); + String wrapperName = getWrapperName(); + processorClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, wrapperName); + schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, wrapperName); if (schedulingStrategy != null && !isSchedulingStrategy(schedulingStrategy)) { - addValidationIssue(SCHEDULING_STRATEGY_KEY, PROCESSORS_KEY, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY); + addValidationIssue(SCHEDULING_STRATEGY_KEY, wrapperName, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY); } - schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROCESSORS_KEY); + schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, wrapperName); - maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, PROCESSORS_KEY, DEFAULT_MAX_CONCURRENT_TASKS); - penalizationPeriod = getOptionalKeyAsType(map, PENALIZATION_PERIOD_KEY, String.class, PROCESSORS_KEY, DEFAULT_PENALIZATION_PERIOD); - yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, PROCESSORS_KEY, DEFAULT_YIELD_DURATION); - runDurationNanos = getOptionalKeyAsType(map, RUN_DURATION_NANOS_KEY, Number.class, PROCESSORS_KEY, DEFAULT_RUN_DURATION_NANOS); - autoTerminatedRelationshipsList = getOptionalKeyAsType(map, AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, List.class, PROCESSORS_KEY, DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST); - properties = getOptionalKeyAsType(map, PROCESSOR_PROPS_KEY, Map.class, PROCESSORS_KEY, DEFAULT_PROPERTIES); + maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, wrapperName, DEFAULT_MAX_CONCURRENT_TASKS); + penalizationPeriod = getOptionalKeyAsType(map, PENALIZATION_PERIOD_KEY, String.class, wrapperName, DEFAULT_PENALIZATION_PERIOD); + yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, wrapperName, DEFAULT_YIELD_DURATION); + runDurationNanos = getOptionalKeyAsType(map, RUN_DURATION_NANOS_KEY, Number.class, wrapperName, DEFAULT_RUN_DURATION_NANOS); + autoTerminatedRelationshipsList = getOptionalKeyAsType(map, AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, List.class, wrapperName, DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST); + properties = getOptionalKeyAsType(map, PROCESSOR_PROPS_KEY, Map.class, wrapperName, DEFAULT_PROPERTIES); - annotationData = getOptionalKeyAsType(map, ANNOTATION_DATA_KEY, String.class, PROCESSORS_KEY, ""); + annotationData = getOptionalKeyAsType(map, ANNOTATION_DATA_KEY, String.class, wrapperName, ""); } public static boolean isSchedulingStrategy(String string) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java index 6ff8648..736c63f 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java @@ -22,13 +22,9 @@ import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName; import java.util.Map; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY; -import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY; -/** - * - */ public class RemoteInputPortSchema extends BaseSchemaWithIdAndName { public static final String DEFAULT_COMMENT = ""; public static final int DEFAULT_MAX_CONCURRENT_TASKS = 1; @@ -39,11 +35,12 @@ public class RemoteInputPortSchema extends BaseSchemaWithIdAndName { private Boolean useCompression = DEFAULT_USE_COMPRESSION; public RemoteInputPortSchema(Map map) { - super(map, INPUT_PORTS_KEY); + super(map, "RemoteInputPort(id: {id}, name: {name})"); + String wrapperName = getWrapperName(); - comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, INPUT_PORTS_KEY, DEFAULT_COMMENT); - maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, INPUT_PORTS_KEY, DEFAULT_MAX_CONCURRENT_TASKS); - useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, INPUT_PORTS_KEY, DEFAULT_USE_COMPRESSION); + comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, wrapperName, DEFAULT_COMMENT); + maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, wrapperName, DEFAULT_MAX_CONCURRENT_TASKS); + useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, wrapperName, DEFAULT_USE_COMPRESSION); } @Override http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java index c1d318e..6d2bb20 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java @@ -18,6 +18,7 @@ package org.apache.nifi.minifi.commons.schema; import org.apache.nifi.minifi.commons.schema.common.BaseSchema; +import org.apache.nifi.minifi.commons.schema.common.StringUtil; import org.apache.nifi.minifi.commons.schema.common.WritableSchema; import java.util.List; @@ -29,9 +30,6 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NA import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY; -/** - * - */ public class RemoteProcessingGroupSchema extends BaseSchema implements WritableSchema { public static final String URL_KEY = "url"; public static final String TIMEOUT_KEY = "timeout"; @@ -50,17 +48,18 @@ public class RemoteProcessingGroupSchema extends BaseSchema implements WritableS public RemoteProcessingGroupSchema(Map map) { name = getRequiredKeyAsType(map, NAME_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY); - url = getRequiredKeyAsType(map, URL_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY); - inputPorts = convertListToType(getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, REMOTE_PROCESSING_GROUPS_KEY), "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY); + String wrapperName = new StringBuilder("RemoteProcessingGroup(name: ").append(StringUtil.isNullOrEmpty(name) ? "unknown" : name).append(")").toString(); + url = getRequiredKeyAsType(map, URL_KEY, String.class, wrapperName); + inputPorts = convertListToType(getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, wrapperName), "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY); if (inputPorts != null) { for (RemoteInputPortSchema remoteInputPortSchema: inputPorts) { addIssuesIfNotNull(remoteInputPortSchema); } } - comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_COMMENT); - timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_TIMEOUT); - yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_YIELD_PERIOD); + comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, wrapperName, DEFAULT_COMMENT); + timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, wrapperName, DEFAULT_TIMEOUT); + yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, wrapperName, DEFAULT_YIELD_PERIOD); } @Override http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java index 7ba322a..7cd82f7 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java @@ -17,11 +17,12 @@ package org.apache.nifi.minifi.commons.schema.common; +import org.apache.nifi.minifi.commons.schema.exception.SchemaInstantiatonException; + import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -29,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -138,6 +140,40 @@ public abstract class BaseSchema implements Schema { return result; } + public <T> List<T> getOptionalKeyAsList(Map valueMap, String key, Function<Map, T> conversionFunction, String wrapperName) { + return convertListToType(Map.class, (List<Map>) valueMap.get(key), key, conversionFunction, wrapperName, null); + } + + public <InputT, OutputT> List<OutputT> convertListToType(Class<InputT> inputType, List<InputT> list, String simpleListType, Function<InputT, OutputT> conversionFunction, + String wrapperName, Supplier<OutputT> instantiator) { + if (list == null) { + return new ArrayList<>(); + } + List<OutputT> result = new ArrayList<>(list.size()); + for (int i = 0; i < list.size(); i++) { + try { + OutputT val = interpretValueAsType(inputType, list.get(i), conversionFunction, instantiator); + if (val != null) { + result.add(val); + } + } catch (SchemaInstantiatonException e) { + addValidationIssue(simpleListType + " number " + i, wrapperName, e.getMessage()); + } + } + return result; + } + + private <InputT, OutputT> OutputT interpretValueAsType(Class<InputT> inputType, InputT input, Function<InputT, OutputT> conversionFunction, Supplier<OutputT> instantiator) + throws SchemaInstantiatonException { + if (input == null && instantiator != null) { + return instantiator.get(); + } + if (!inputType.isInstance(input)) { + throw new SchemaInstantiatonException("was expecting object of type " + inputType + " but was " + input.getClass()); + } + return conversionFunction.apply(input); + } + private <T> T interpretValueAsType(Object obj, String key, Class targetClass, String wrapperName, boolean required, boolean instantiateIfNull) { if (obj == null) { if (required){ @@ -182,20 +218,6 @@ public abstract class BaseSchema implements Schema { } } - public static <T> List<T> nullToEmpty(List<T> list) { - return list == null ? Collections.emptyList() : list; - } - - public static <T> Set<T> nullToEmpty(Set<T> set) { - return set == null ? Collections.emptySet() : set; - } - - public static <K, V> Map<K, V> nullToEmpty(Map<K, V> map) { - return map == null ? Collections.emptyMap() : map; - } - - - public static void checkForDuplicates(Consumer<String> duplicateMessageConsumer, String errorMessagePrefix, List<String> strings) { if (strings != null) { Set<String> seen = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java index 9ab6718..a1f7bb5 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java @@ -28,26 +28,22 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NA public abstract class BaseSchemaWithIdAndName extends BaseSchema implements WritableSchema { public static final Pattern VALID_ID_PATTERN = Pattern.compile("[A-Za-z0-9_-]+"); - public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + "): "; + public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + ")"; private final String wrapperName; private String id; private String name; public BaseSchemaWithIdAndName(Map map, String wrapperName) { - id = getId(map, wrapperName); - name = getName(map, wrapperName); this.wrapperName = wrapperName; + id = getId(map, getWrapperName()); + name = getOptionalKeyAsType(map, NAME_KEY, String.class, getWrapperName(), ""); } protected String getId(Map map, String wrapperName) { return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, ""); } - protected String getName(Map map, String wrapperName) { - return getOptionalKeyAsType(map, NAME_KEY, String.class, wrapperName, ""); - } - public String getId() { return id; } @@ -60,10 +56,14 @@ public abstract class BaseSchemaWithIdAndName extends BaseSchema implements Writ return name; } - protected void setName(String name) { + public void setName(String name) { this.name = name; } + public String getWrapperName() { + return wrapperName.replace("{id}", StringUtil.isNullOrEmpty(id) ? "unkown" : id).replace("{name}", StringUtil.isNullOrEmpty(name) ? "unkown" : name); + } + @Override public Map<String, Object> toMap() { Map<String, Object> map = mapSupplier.get(); @@ -76,9 +76,9 @@ public abstract class BaseSchemaWithIdAndName extends BaseSchema implements Writ public List<String> getValidationIssues() { List<String> validationIssues = super.getValidationIssues(); if (StringUtil.isNullOrEmpty(id)) { - validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED)); + validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED)); } else if (!VALID_ID_PATTERN.matcher(id).matches()) { - validationIssues.add(ID_DOES_NOT_MATCH_VALID_ID_PATTERN + id); + validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), ID_DOES_NOT_MATCH_VALID_ID_PATTERN)); } return validationIssues; } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionUtil.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionUtil.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionUtil.java new file mode 100644 index 0000000..5fb9549 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionUtil.java @@ -0,0 +1,39 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.commons.schema.common; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CollectionUtil { + public static <T> List<T> nullToEmpty(List<T> list) { + return list == null ? Collections.emptyList() : list; + } + + public static <T> Set<T> nullToEmpty(Set<T> set) { + return set == null ? Collections.emptySet() : set; + } + + public static <K, V> Map<K, V> nullToEmpty(Map<K, V> map) { + return map == null ? Collections.emptyMap() : map; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java index 5c3a432..a603f3e 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java @@ -31,6 +31,7 @@ public class CommonPropertyKeys { public static final String PROVENANCE_REPORTING_KEY = "Provenance Reporting"; public static final String REMOTE_PROCESSING_GROUPS_KEY = "Remote Processing Groups"; public static final String INPUT_PORTS_KEY = "Input Ports"; + public static final String OUTPUT_PORTS_KEY = "Output Ports"; public static final String PROVENANCE_REPO_KEY = "Provenance Repository"; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java index ae7165e..68b4cc7 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java @@ -17,8 +17,40 @@ package org.apache.nifi.minifi.commons.schema.common; +import java.util.function.Consumer; + public class StringUtil { - public static boolean isNullOrEmpty(final String string) { + /** + * Returns true if the string is null or empty + * + * @param string the string + * @return true if the string is null or empty + */ + public static boolean isNullOrEmpty(String string) { return string == null || string.isEmpty(); } + + /** + * Passes the string to the consumer if it is neither null nor empty + * + * @param string the input + * @param consumer the action to perform + */ + public static void doIfNotNullOrEmpty(String string, Consumer<String> consumer) { + if (!isNullOrEmpty(string)) { + consumer.accept(string); + } + } + + /** + * Passes the string to the consumer if it is either null nor empty + * + * @param string the input + * @param consumer the action to perform + */ + public static void doIfNullOrEmpty(String string, Consumer<String> consumer) { + if (isNullOrEmpty(string)) { + consumer.accept(string); + } + } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/exception/SchemaInstantiatonException.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/exception/SchemaInstantiatonException.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/exception/SchemaInstantiatonException.java new file mode 100644 index 0000000..d659df8 --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/exception/SchemaInstantiatonException.java @@ -0,0 +1,30 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.commons.schema.exception; + +public class SchemaInstantiatonException extends RuntimeException { + public SchemaInstantiatonException(String message) { + super(message); + } + + public SchemaInstantiatonException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchemaTest.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchemaTest.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchemaTest.java new file mode 100644 index 0000000..53cf32a --- /dev/null +++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchemaTest.java @@ -0,0 +1,64 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.commons.schema; + +import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +public class ProcessGroupSchemaTest { + @Test + public void testNoPortsRootGroup() { + validateIssuesNumMatches(0, new ProcessGroupSchema(new HashMap<>(), ConfigSchema.TOP_LEVEL_NAME)); + } + + @Test + public void testInputPortsRootGroup() { + Map<String, Object> map = new HashMap<>(); + map.put(CommonPropertyKeys.INPUT_PORTS_KEY, Arrays.asList(createPortSchema("testId", "testName", ConfigSchema.TOP_LEVEL_NAME).toMap())); + validateIssuesNumMatches(1, new ProcessGroupSchema(map, ConfigSchema.TOP_LEVEL_NAME)); + } + + @Test + public void testOutputPortsRootGroup() { + Map<String, Object> map = new HashMap<>(); + map.put(CommonPropertyKeys.OUTPUT_PORTS_KEY, Arrays.asList(createPortSchema("testId", "testName", ConfigSchema.TOP_LEVEL_NAME).toMap())); + validateIssuesNumMatches(1, new ProcessGroupSchema(map, ConfigSchema.TOP_LEVEL_NAME)); + } + + private PortSchema createPortSchema(String id, String name, String wrapperName) { + Map<String, Object> map = new HashMap<>(); + map.put(CommonPropertyKeys.ID_KEY, id); + map.put(CommonPropertyKeys.NAME_KEY, name); + return new PortSchema(map, wrapperName); + } + + private void validateIssuesNumMatches(int expected, ProcessGroupSchema processGroupSchema) { + int actual = processGroupSchema.getValidationIssues().size(); + String issues = "[" + System.lineSeparator() + processGroupSchema.getValidationIssues().stream().collect(Collectors.joining("," + System.lineSeparator())) + "]"; + assertEquals("Expected " + expected + " issue(s), got " + actual + ": " + issues, expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java index 5b602ac..928bc03 100644 --- a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java +++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java @@ -79,12 +79,12 @@ public class SchemaLoaderTest { private void validateMinimalConfigVersion1Parse(ConfigSchema configSchema) { assertTrue(configSchema instanceof ConfigSchema); - List<ConnectionSchema> connections = configSchema.getConnections(); + List<ConnectionSchema> connections = configSchema.getProcessGroupSchema().getConnections(); assertNotNull(connections); assertEquals(1, connections.size()); assertNotNull(connections.get(0).getId()); - List<ProcessorSchema> processors = configSchema.getProcessors(); + List<ProcessorSchema> processors = configSchema.getProcessGroupSchema().getProcessors(); assertNotNull(processors); assertEquals(2, processors.size()); processors.forEach(p -> assertNotNull(p.getId())); http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java index 56a8103..7ce0587 100644 --- a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java +++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java @@ -69,8 +69,10 @@ public class ConfigSchemaV1Test { ConfigSchema configSchema = new ConfigSchemaV1(yamlAsMap).convert(); List<String> validationIssues = configSchema.getValidationIssues(); assertEquals(4, validationIssues.size()); - assertEquals(BaseSchema.getIssueText(ConnectionSchema.DESTINATION_ID_KEY, CommonPropertyKeys.CONNECTIONS_KEY, BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED), validationIssues.get(0)); - assertEquals(BaseSchema.getIssueText(ConnectionSchema.SOURCE_ID_KEY, CommonPropertyKeys.CONNECTIONS_KEY, BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED), validationIssues.get(1)); + assertEquals(BaseSchema.getIssueText(ConnectionSchema.DESTINATION_ID_KEY, "Connection(id: TailToSplit, name: TailToSplit)", BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED), + validationIssues.get(0)); + assertEquals(BaseSchema.getIssueText(ConnectionSchema.SOURCE_ID_KEY, "Connection(id: TailToSplit, name: TailToSplit)", BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED), + validationIssues.get(1)); assertEquals(ConfigSchemaV1.CONNECTION_WITH_NAME + connection.get(NAME_KEY) + ConfigSchemaV1.HAS_INVALID_DESTINATION_NAME + fakeDestination, validationIssues.get(2)); assertEquals(ConfigSchemaV1.CONNECTION_WITH_NAME + connection.get(NAME_KEY) + ConfigSchemaV1.HAS_INVALID_SOURCE_NAME + fakeSource, validationIssues.get(3)); } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java index e9acf4a..28b000c 100644 --- a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java +++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java @@ -179,7 +179,7 @@ public class ConnectionSchemaV1Test { ConfigSchema configSchema = new ConfigSchemaV1(Collections.singletonMap(CommonPropertyKeys.CONNECTIONS_KEY, listWithKeyValues)).convert(); assertMessageDoesNotExist(configSchema, ConfigSchema.FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS); - List<ConnectionSchema> connections = configSchema.getConnections(); + List<ConnectionSchema> connections = configSchema.getProcessGroupSchema().getConnections(); assertEquals(5, connections.size()); // Generated unique ids http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-docs/src/main/markdown/System_Admin_Guide.md ---------------------------------------------------------------------- diff --git a/minifi-docs/src/main/markdown/System_Admin_Guide.md b/minifi-docs/src/main/markdown/System_Admin_Guide.md index 38c9588..0836443 100644 --- a/minifi-docs/src/main/markdown/System_Admin_Guide.md +++ b/minifi-docs/src/main/markdown/System_Admin_Guide.md @@ -248,7 +248,7 @@ parses and upconverts to the current version without issue. 1. Use ids instead of names for processors, connections. 2. Allow multiple source relationships for connections. - +3. Support process groups, input ports, output ports ## Flow Controller @@ -391,6 +391,39 @@ Within the Processor Configuration section, there is the `Properties` subsection State File: ./conf/state/tail-file Initial Start Position: Beginning of File +## Process Groups + +Process groups can be nested from the top level. They can contain other process groups as well and can be used to logically group related operations. + +*Property* | *Description* +----------------------------------- | ------------- +name | The name of what this process group will do. +id | The id of this process group. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) +Processors | The processors contained in this Process Group. (Defined above) +Remote Processing Groups | The remote processing groups contained in this Process Group. (Defined below) +Connections | The connections contained in this Process Group. (Defined below) +Input Ports | The input ports contained in this Process Group. (Defined below) +Output Ports | The output ports contained in this Process Group. (Defined below) +Process Groups | The child Process Groups contained in this Process Group. + +## Input Ports + +These ports provide input to the Process Group they reside on. (Currently only for internal Input ports.) + +*Property* | *Description* +-------------------- | ------------- +name | The name of what this input port will do. +id | The id of this input port. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) + +## Output Ports + +These ports provide output from the Process Group they reside on. (Currently only for internal Output ports.) + +*Property* | *Description* +-------------------- | ------------- +name | The name of what this output port will do. +id | The id of this output port. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+) + ## Connections There can be multiple connections in this version of MiNiFi. The "Connections" subsection is a list of connections. Each connection must specify these properties. http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml index 44d8d4e..c88e47e 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml @@ -56,5 +56,8 @@ Security Properties: algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL provider: BC Processors: [] +Process Groups: [] +Input Ports: [] +Output Ports: [] Connections: [] Remote Processing Groups: [] http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java index 251e9a3..e62392d 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java +++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java @@ -28,6 +28,8 @@ import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.NiFiComponentDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; @@ -42,14 +44,19 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.nifi.minifi.commons.schema.common.CollectionUtil.nullToEmpty; public class ConfigMain { public static final int ERR_INVALID_ARGS = 1; @@ -134,80 +141,82 @@ public class ConfigMain { System.out.println(); } - private static void enrichTemplateDTO(TemplateDTO templateDTO) { - FlowSnippetDTO flowSnippetDTO = templateDTO.getSnippet(); + private static void enrichFlowSnippetDTO(FlowSnippetDTO flowSnippetDTO) { + List<FlowSnippetDTO> allFlowSnippets = getAllFlowSnippets(flowSnippetDTO); - Set<RemoteProcessGroupDTO> remoteProcessGroups = flowSnippetDTO.getRemoteProcessGroups(); - if (remoteProcessGroups != null) { - for (RemoteProcessGroupDTO remoteProcessGroupDTO : remoteProcessGroups) { - if (StringUtil.isNullOrEmpty(remoteProcessGroupDTO.getName())) { - remoteProcessGroupDTO.setName(remoteProcessGroupDTO.getTargetUri()); - } - } + Set<RemoteProcessGroupDTO> remoteProcessGroups = getAll(allFlowSnippets, FlowSnippetDTO::getRemoteProcessGroups).collect(Collectors.toSet()); + + // RPGs with no name get Target URI as name + remoteProcessGroups.stream().filter(r -> StringUtil.isNullOrEmpty(r.getName())).forEach(r -> r.setName(r.getTargetUri())); + + Map<String, String> connectableNameMap = getAll(allFlowSnippets, FlowSnippetDTO::getProcessors).collect(Collectors.toMap(NiFiComponentDTO::getId, ProcessorDTO::getName)); + + for (RemoteProcessGroupDTO remoteProcessGroupDTO : remoteProcessGroups) { + RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents(); + addConnectables(connectableNameMap, nullToEmpty(contents.getInputPorts()), RemoteProcessGroupPortDTO::getId, RemoteProcessGroupPortDTO::getId); + addConnectables(connectableNameMap, nullToEmpty(contents.getOutputPorts()), RemoteProcessGroupPortDTO::getId, RemoteProcessGroupPortDTO::getId); } - Set<ConnectionDTO> connections = flowSnippetDTO.getConnections(); - if (connections != null) { - Map<String, String> connectableNameMap = new HashMap<>(); - Set<ProcessorDTO> processorDTOs = flowSnippetDTO.getProcessors(); - if (processorDTOs != null) { - connectableNameMap.putAll(processorDTOs.stream().collect(Collectors.toMap(NiFiComponentDTO::getId, ProcessorDTO::getName))); - } - if (remoteProcessGroups != null) { - for (RemoteProcessGroupDTO remoteProcessGroupDTO : remoteProcessGroups) { - RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents(); - addRemoteProcessGroupPortDTOs(connectableNameMap, contents.getInputPorts()); - addRemoteProcessGroupPortDTOs(connectableNameMap, contents.getOutputPorts()); + addConnectables(connectableNameMap, getAll(allFlowSnippets, FlowSnippetDTO::getInputPorts).collect(Collectors.toList()), PortDTO::getId, PortDTO::getName); + addConnectables(connectableNameMap, getAll(allFlowSnippets, FlowSnippetDTO::getOutputPorts).collect(Collectors.toList()), PortDTO::getId, PortDTO::getName); + + Set<ConnectionDTO> connections = getAll(allFlowSnippets, FlowSnippetDTO::getConnections).collect(Collectors.toSet()); + for (ConnectionDTO connection : connections) { + setName(connectableNameMap, connection.getSource()); + setName(connectableNameMap, connection.getDestination()); + } + + for (ConnectionDTO connection : connections) { + if (StringUtil.isNullOrEmpty(connection.getName())) { + StringBuilder name = new StringBuilder(); + ConnectableDTO connectionSource = connection.getSource(); + if (connectionSource != null) { + name.append(connectionSource.getName()); } - } - for (ConnectionDTO connection : connections) { - setName(connectableNameMap, connection.getSource()); - setName(connectableNameMap, connection.getDestination()); - } - for (ConnectionDTO connection : connections) { - if (StringUtil.isNullOrEmpty(connection.getName())) { - StringBuilder name = new StringBuilder(); - ConnectableDTO connectionSource = connection.getSource(); - if (connectionSource != null) { - name.append(connectionSource.getName()); - } - name.append("/"); - if (connection.getSelectedRelationships() != null && connection.getSelectedRelationships().size() > 0) { - name.append(connection.getSelectedRelationships().iterator().next()); - } - name.append("/"); - ConnectableDTO connectionDestination = connection.getDestination(); - if (connectionDestination != null) { - name.append(connectionDestination.getName()); - } - connection.setName(name.toString()); + name.append("/"); + if (connection.getSelectedRelationships() != null && connection.getSelectedRelationships().size() > 0) { + name.append(connection.getSelectedRelationships().iterator().next()); + } + name.append("/"); + ConnectableDTO connectionDestination = connection.getDestination(); + if (connectionDestination != null) { + name.append(connectionDestination.getName()); } + connection.setName(name.toString()); } } + nullToEmpty(flowSnippetDTO.getProcessGroups()).stream().map(ProcessGroupDTO::getContents).forEach(ConfigMain::enrichFlowSnippetDTO); } - public static ConfigSchema transformTemplateToSchema(InputStream source) throws JAXBException, SchemaLoaderException { - TemplateDTO templateDTO = (TemplateDTO) JAXBContext.newInstance(TemplateDTO.class).createUnmarshaller().unmarshal(source); + private static <T> Stream<T> getAll(List<FlowSnippetDTO> allFlowSnippets, Function<FlowSnippetDTO, Collection<T>> accessor) { + return allFlowSnippets.stream().flatMap(f -> accessor.apply(f).stream()).filter(Objects::nonNull); + } - if (templateDTO.getSnippet().getProcessGroups().size() != 0){ - throw new SchemaLoaderException("Process Groups are not currently supported in MiNiFi. Please remove any from the template and try again."); - } + private static List<FlowSnippetDTO> getAllFlowSnippets(FlowSnippetDTO flowSnippetDTO) { + List<FlowSnippetDTO> result = new ArrayList<>(); + getAllFlowSnippets(flowSnippetDTO, result); + return result; + } - if (templateDTO.getSnippet().getOutputPorts().size() != 0){ - throw new SchemaLoaderException("Output Ports are not currently supported in MiNiFi. Please remove any from the template and try again."); - } + private static void getAllFlowSnippets(FlowSnippetDTO flowSnippetDTO, List<FlowSnippetDTO> result) { + result.add(flowSnippetDTO); + nullToEmpty(flowSnippetDTO.getProcessGroups()).stream().map(ProcessGroupDTO::getContents).forEach(f -> getAllFlowSnippets(f, result)); + } - if (templateDTO.getSnippet().getInputPorts().size() != 0){ - throw new SchemaLoaderException("Input Ports are not currently supported in MiNiFi. Please remove any from the template and try again."); - } + public static ConfigSchema transformTemplateToSchema(InputStream source) throws JAXBException, IOException, SchemaLoaderException { + try { + TemplateDTO templateDTO = (TemplateDTO) JAXBContext.newInstance(TemplateDTO.class).createUnmarshaller().unmarshal(source); - if (templateDTO.getSnippet().getFunnels().size() != 0){ - throw new SchemaLoaderException("Funnels are not currently supported in MiNiFi. Please remove any from the template and try again."); - } + if (templateDTO.getSnippet().getFunnels().size() != 0){ + throw new SchemaLoaderException("Funnels are not currently supported in MiNiFi. Please remove any from the template and try again."); + } - enrichTemplateDTO(templateDTO); - ConfigSchema configSchema = new ConfigSchemaFunction().apply(templateDTO); - return configSchema; + enrichFlowSnippetDTO(templateDTO.getSnippet()); + ConfigSchema configSchema = new ConfigSchemaFunction().apply(templateDTO); + return configSchema; + } finally { + source.close(); + } } private static void setName(Map<String, String> connectableNameMap, ConnectableDTO connectableDTO) { @@ -219,10 +228,6 @@ public class ConfigMain { } } - private static void addRemoteProcessGroupPortDTOs(Map<String, String> connectableNameMap, Collection<RemoteProcessGroupPortDTO> ports) { - addConnectables(connectableNameMap, ports, RemoteProcessGroupPortDTO::getId, RemoteProcessGroupPortDTO::getId); - } - private static <T> void addConnectables(Map<String, String> connectableNameMap, Collection<T> hasIdAndNames, Function<T, String> idGetter, Function<T, String> nameGetter) { if (hasIdAndNames != null) { for (T hasIdAndName : hasIdAndNames) { http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java index 4fc9f5a..9cdccf5 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java +++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java @@ -19,10 +19,12 @@ package org.apache.nifi.minifi.toolkit.configuration.dto; import org.apache.nifi.minifi.commons.schema.ConfigSchema; import org.apache.nifi.minifi.commons.schema.ConnectionSchema; +import org.apache.nifi.minifi.commons.schema.PortSchema; +import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema; import org.apache.nifi.minifi.commons.schema.ProcessorSchema; import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema; -import org.apache.nifi.minifi.commons.schema.common.BaseSchema; import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys; +import org.apache.nifi.minifi.commons.schema.common.StringUtil; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.TemplateDTO; @@ -32,51 +34,96 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.nifi.minifi.commons.schema.common.CollectionUtil.nullToEmpty; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.OUTPUT_PORTS_KEY; + public class ConfigSchemaFunction implements Function<TemplateDTO, ConfigSchema> { private final FlowControllerSchemaFunction flowControllerSchemaFunction; private final ProcessorSchemaFunction processorSchemaFunction; private final ConnectionSchemaFunction connectionSchemaFunction; private final RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction; + private final PortSchemaFunction inputPortSchemaFunction; + private final PortSchemaFunction outputPortSchemaFunction; public ConfigSchemaFunction() { - this(new FlowControllerSchemaFunction(), new ProcessorSchemaFunction(), new ConnectionSchemaFunction(), new RemoteProcessingGroupSchemaFunction(new RemoteInputPortSchemaFunction())); + this(new FlowControllerSchemaFunction(), new ProcessorSchemaFunction(), new ConnectionSchemaFunction(), new RemoteProcessingGroupSchemaFunction(new RemoteInputPortSchemaFunction()), + new PortSchemaFunction(INPUT_PORTS_KEY), new PortSchemaFunction(OUTPUT_PORTS_KEY)); } - public ConfigSchemaFunction(FlowControllerSchemaFunction flowControllerSchemaFunction, ProcessorSchemaFunction processorSchemaFunction, - ConnectionSchemaFunction connectionSchemaFunction, RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction) { + public ConfigSchemaFunction(FlowControllerSchemaFunction flowControllerSchemaFunction, ProcessorSchemaFunction processorSchemaFunction, ConnectionSchemaFunction connectionSchemaFunction, + RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction, PortSchemaFunction inputPortSchemaFunction, PortSchemaFunction outputPortSchemaFunction) { this.flowControllerSchemaFunction = flowControllerSchemaFunction; this.processorSchemaFunction = processorSchemaFunction; this.connectionSchemaFunction = connectionSchemaFunction; this.remoteProcessingGroupSchemaFunction = remoteProcessingGroupSchemaFunction; + this.inputPortSchemaFunction = inputPortSchemaFunction; + this.outputPortSchemaFunction = outputPortSchemaFunction; } @Override public ConfigSchema apply(TemplateDTO templateDTO) { Map<String, Object> map = new HashMap<>(); + map.put(CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY, flowControllerSchemaFunction.apply(templateDTO).toMap()); + FlowSnippetDTO snippet = templateDTO.getSnippet(); - map.put(CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY, flowControllerSchemaFunction.apply(templateDTO).toMap()); + addSnippet(map, snippet); + + return new ConfigSchema(map); + } + + protected void addSnippet(Map<String, Object> map, FlowSnippetDTO snippet) { + addSnippet(map, null, null, snippet); + } + + protected Map<String, Object> addSnippet(Map<String, Object> map, String id, String name, FlowSnippetDTO snippet) { + if (!StringUtil.isNullOrEmpty(id)) { + map.put(ID_KEY, id); + } - map.put(CommonPropertyKeys.PROCESSORS_KEY, BaseSchema.nullToEmpty(snippet.getProcessors()).stream() + if (!StringUtil.isNullOrEmpty(name)) { + map.put(NAME_KEY, name); + } + + map.put(CommonPropertyKeys.PROCESSORS_KEY, nullToEmpty(snippet.getProcessors()).stream() .map(processorSchemaFunction) .sorted(Comparator.comparing(ProcessorSchema::getName)) .map(ProcessorSchema::toMap) .collect(Collectors.toList())); - map.put(CommonPropertyKeys.CONNECTIONS_KEY, BaseSchema.nullToEmpty(snippet.getConnections()).stream() + + + map.put(CommonPropertyKeys.CONNECTIONS_KEY, nullToEmpty(snippet.getConnections()).stream() .map(connectionSchemaFunction) .sorted(Comparator.comparing(ConnectionSchema::getName)) .map(ConnectionSchema::toMap) .collect(Collectors.toList())); - map.put(CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY, BaseSchema.nullToEmpty(snippet.getRemoteProcessGroups()).stream() + map.put(CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY, nullToEmpty(snippet.getRemoteProcessGroups()).stream() .map(remoteProcessingGroupSchemaFunction) .sorted(Comparator.comparing(RemoteProcessingGroupSchema::getName)) .map(RemoteProcessingGroupSchema::toMap) .collect(Collectors.toList())); - ConfigSchema configSchema = new ConfigSchema(map); - return configSchema; + map.put(INPUT_PORTS_KEY, nullToEmpty(snippet.getInputPorts()).stream() + .map(inputPortSchemaFunction) + .sorted(Comparator.comparing(PortSchema::getName)) + .map(PortSchema::toMap) + .collect(Collectors.toList())); + + map.put(OUTPUT_PORTS_KEY, nullToEmpty(snippet.getOutputPorts()).stream() + .map(outputPortSchemaFunction) + .sorted(Comparator.comparing(PortSchema::getName)) + .map(PortSchema::toMap) + .collect(Collectors.toList())); + + map.put(ProcessGroupSchema.PROCESS_GROUPS_KEY, nullToEmpty(snippet.getProcessGroups()).stream() + .map(p -> addSnippet(new HashMap<>(), p.getId(), p.getName(), p.getContents())).collect(Collectors.toList())); + + return map; } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java index d3d71e6..7acab41 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java +++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java @@ -19,7 +19,6 @@ package org.apache.nifi.minifi.toolkit.configuration.dto; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.minifi.commons.schema.ConnectionSchema; -import org.apache.nifi.minifi.commons.schema.common.BaseSchema; import org.apache.nifi.web.api.dto.ConnectionDTO; import java.util.HashMap; @@ -29,6 +28,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.nifi.minifi.commons.schema.common.CollectionUtil.nullToEmpty; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; @@ -40,14 +40,14 @@ public class ConnectionSchemaFunction implements Function<ConnectionDTO, Connect map.put(ID_KEY, connectionDTO.getId()); map.put(NAME_KEY, connectionDTO.getName()); map.put(ConnectionSchema.SOURCE_ID_KEY, connectionDTO.getSource().getId()); - Set<String> selectedRelationships = BaseSchema.nullToEmpty(connectionDTO.getSelectedRelationships()); + Set<String> selectedRelationships = nullToEmpty(connectionDTO.getSelectedRelationships()); map.put(ConnectionSchema.SOURCE_RELATIONSHIP_NAMES_KEY, selectedRelationships.stream().sorted().collect(Collectors.toList())); map.put(ConnectionSchema.DESTINATION_ID_KEY, connectionDTO.getDestination().getId()); map.put(ConnectionSchema.MAX_WORK_QUEUE_SIZE_KEY, connectionDTO.getBackPressureObjectThreshold()); map.put(ConnectionSchema.MAX_WORK_QUEUE_DATA_SIZE_KEY, connectionDTO.getBackPressureDataSizeThreshold()); map.put(ConnectionSchema.FLOWFILE_EXPIRATION__KEY, connectionDTO.getFlowFileExpiration()); - List<String> queuePrioritizers = BaseSchema.nullToEmpty(connectionDTO.getPrioritizers()); + List<String> queuePrioritizers = nullToEmpty(connectionDTO.getPrioritizers()); if (queuePrioritizers.size() > 0) { map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, queuePrioritizers.get(0)); } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunction.java ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunction.java new file mode 100644 index 0000000..29efac3 --- /dev/null +++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunction.java @@ -0,0 +1,46 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.toolkit.configuration.dto; + +import org.apache.nifi.minifi.commons.schema.PortSchema; +import org.apache.nifi.web.api.dto.PortDTO; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY; +import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; + +public class PortSchemaFunction implements Function<PortDTO, PortSchema> { + private final String wrapperName; + + public PortSchemaFunction(String wrapperName) { + this.wrapperName = wrapperName; + } + + @Override + public PortSchema apply(PortDTO portDTO) { + Map<String, Object> map = new HashMap<>(); + map.put(ID_KEY, portDTO.getId()); + map.put(NAME_KEY, portDTO.getName()); + return new PortSchema(map, wrapperName); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java index 21b3345..89097cc 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java +++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java @@ -18,7 +18,6 @@ package org.apache.nifi.minifi.toolkit.configuration.dto; import org.apache.nifi.minifi.commons.schema.ProcessorSchema; -import org.apache.nifi.minifi.commons.schema.common.BaseSchema; import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys; import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; @@ -29,6 +28,7 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.nifi.minifi.commons.schema.common.CollectionUtil.nullToEmpty; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY; @@ -53,11 +53,11 @@ public class ProcessorSchemaFunction implements Function<ProcessorDTO, Processor if (runDurationMillis != null) { map.put(ProcessorSchema.RUN_DURATION_NANOS_KEY, runDurationMillis * 1000); } - map.put(ProcessorSchema.AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, BaseSchema.nullToEmpty(processorDTO.getRelationships()).stream() + map.put(ProcessorSchema.AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, nullToEmpty(processorDTO.getRelationships()).stream() .filter(RelationshipDTO::isAutoTerminate) .map(RelationshipDTO::getName) .collect(Collectors.toList())); - map.put(ProcessorSchema.PROCESSOR_PROPS_KEY, new HashMap<>(BaseSchema.nullToEmpty(processorDTOConfig.getProperties()))); + map.put(ProcessorSchema.PROCESSOR_PROPS_KEY, new HashMap<>(nullToEmpty(processorDTOConfig.getProperties()))); String annotationData = processorDTOConfig.getAnnotationData(); if(annotationData != null && !annotationData.isEmpty()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java index cfdc48f..d61a641 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java +++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java @@ -148,7 +148,7 @@ public class ConfigMainTest { @Test public void testTransformErrorTransformingTemplate() throws FileNotFoundException { when(pathInputStreamFactory.create(testInput)).thenAnswer(invocation -> - ConfigMainTest.class.getClassLoader().getResourceAsStream("TemplateWithInputPort.xml")); + ConfigMainTest.class.getClassLoader().getResourceAsStream("TemplateWithFunnel.xml")); assertEquals(ConfigMain.ERR_UNABLE_TO_TRANSFORM_TEMPLATE, configMain.execute(new String[]{ConfigMain.TRANSFORM, testInput, testOutput})); } @@ -190,18 +190,23 @@ public class ConfigMainTest { transformRoundTrip("MultipleRelationships"); } - @Test(expected = SchemaLoaderException.class) - public void testFailToTransformProcessGroup() throws IOException, JAXBException, SchemaLoaderException { + @Test + public void testTransformRoundTripProcessGroupsAndRemoteProcessGroups() throws IOException, JAXBException, SchemaLoaderException { + transformRoundTrip("ProcessGroupsAndRemoteProcessGroups"); + } + + @Test + public void testSuccessTransformProcessGroup() throws IOException, JAXBException, SchemaLoaderException { ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithProcessGroup.xml")).toMap(); } - @Test(expected = SchemaLoaderException.class) - public void testFailToTransformInputPort() throws IOException, JAXBException, SchemaLoaderException { + @Test + public void testSuccessTransformInputPort() throws IOException, JAXBException, SchemaLoaderException { ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithOutputPort.xml")).toMap(); } - @Test(expected = SchemaLoaderException.class) - public void testFailToTransformOutputPort() throws IOException, JAXBException, SchemaLoaderException { + @Test + public void testSuccessTransformOutputPort() throws IOException, JAXBException, SchemaLoaderException { ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithInputPort.xml")).toMap(); } @@ -278,8 +283,8 @@ public class ConfigMainTest { assertTrue(configSchemaUpgradedFromV1.isValid()); assertEquals(configSchemaConvertableSchema, configSchemaUpgradedFromV1); ConfigSchema configSchemaFromCurrent = new ConfigSchema(yamlMap); - List<ProcessorSchema> currentProcessors = configSchemaFromCurrent.getProcessors(); - List<ProcessorSchema> v1Processors = configSchemaUpgradedFromV1.getProcessors(); + List<ProcessorSchema> currentProcessors = configSchemaFromCurrent.getProcessGroupSchema().getProcessors(); + List<ProcessorSchema> v1Processors = configSchemaUpgradedFromV1.getProcessGroupSchema().getProcessors(); assertEquals(currentProcessors.size(), v1Processors.size()); // V1 doesn't have ids so we need to map the autogenerated ones to the ones from the template @@ -291,11 +296,12 @@ public class ConfigMainTest { v1IdToCurrentIdMap.put(v1Processor.getId(), currentProcessor.getId()); v1Processor.setId(currentProcessor.getId()); } - configSchemaUpgradedFromV1.getRemoteProcessingGroups().stream().flatMap(g -> g.getInputPorts().stream()).map(RemoteInputPortSchema::getId).sequential() + + configSchemaUpgradedFromV1.getProcessGroupSchema().getRemoteProcessingGroups().stream().flatMap(g -> g.getInputPorts().stream()).map(RemoteInputPortSchema::getId).sequential() .forEach(id -> v1IdToCurrentIdMap.put(id, id)); - List<ConnectionSchema> currentConnections = configSchemaFromCurrent.getConnections(); - List<ConnectionSchema> v1Connections = configSchemaUpgradedFromV1.getConnections(); + List<ConnectionSchema> currentConnections = configSchemaFromCurrent.getProcessGroupSchema().getConnections(); + List<ConnectionSchema> v1Connections = configSchemaUpgradedFromV1.getProcessGroupSchema().getConnections(); // Update source and dest ids, can set connection id equal because it isn't referenced elsewhere assertEquals(currentConnections.size(), v1Connections.size()); http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunctionTest.java ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunctionTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunctionTest.java new file mode 100644 index 0000000..05c83cb --- /dev/null +++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunctionTest.java @@ -0,0 +1,74 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.nifi.minifi.toolkit.configuration.dto; + +import org.apache.nifi.minifi.commons.schema.PortSchema; +import org.apache.nifi.web.api.dto.PortDTO; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class PortSchemaFunctionTest { + private String testId; + private String testName; + private String testWrapperName; + private PortDTO portDTO; + private PortSchemaFunction portSchemaFunction; + + @Before + public void setup() { + testId = "testId"; + testName = "testName"; + testWrapperName = "testWrapperName"; + portDTO = new PortDTO(); + portDTO.setId(testId); + portDTO.setName(testName); + portSchemaFunction = new PortSchemaFunction(testWrapperName); + } + + @Test + public void testFullMap() { + PortSchema portSchema = portSchemaFunction.apply(portDTO); + assertEquals(testId, portSchema.getId()); + assertEquals(testName, portSchema.getName()); + assertTrue(portSchema.isValid()); + } + + @Test + public void testNoId() { + portDTO.setId(null); + PortSchema portSchema = portSchemaFunction.apply(portDTO); + assertEquals("", portSchema.getId()); + assertEquals(testName, portSchema.getName()); + assertFalse(portSchema.isValid()); + } + + @Test + public void testNoName() { + portDTO.setName(null); + PortSchema portSchema = portSchemaFunction.apply(portDTO); + assertEquals(testId, portSchema.getId()); + assertEquals("", portSchema.getName()); + assertTrue(portSchema.isValid()); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml index b4993e3..d03bd16 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml +++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml @@ -148,6 +148,9 @@ Processors: - success Properties: Delete Attributes Expression: +Process Groups: [] +Input Ports: [] +Output Ports: [] Connections: - name: ExtractText/matched/ReplaceText2 id: 56ef3e2e-ee35-4598-9fbe-ae86050960b0 http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml index fdec427..d2f90b2 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml +++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml @@ -187,6 +187,9 @@ Processors: Compression Level: Mode: decompress Update Filename: +Process Groups: [] +Input Ports: [] +Output Ports: [] Connections: - name: Compressed?/gzip/Uncompress GZIP id: 5de215d5-9f7e-414b-98aa-2edaa0514d99 http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml index dcd75be..aa7d6d5 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml +++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml @@ -166,6 +166,9 @@ Processors: Properties: Delete Attributes Expression: q: nifi +Process Groups: [] +Input Ports: [] +Output Ports: [] Connections: - name: Route On Status Code/200/LogAttribute id: 3039718a-bb40-4811-9b74-ecbe926daae8 http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml ---------------------------------------------------------------------- diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml index 2850e67..12ed7e1 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml +++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml @@ -114,6 +114,9 @@ Processors: Properties: Delete Attributes Expression: filename: abc +Process Groups: [] +Input Ports: [] +Output Ports: [] Connections: - name: GenerateFlowFile/success/UpdateAttribute id: 7c79cce3-0157-1000-0000-000000000000
