APEX-28 #resolve - Rename of files requires a separate commit to preserve attribution. - Improved documentation - Added unit test to make sure that attributes declared in multiple contexts have the same type.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/977093e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/977093e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/977093e1 Branch: refs/heads/feature-module Commit: 977093e171f1183985ae80d42b0d6dbc3af6cbc5 Parents: 434a717 Author: Timothy Farkas <[email protected]> Authored: Tue Aug 25 18:03:08 2015 -0700 Committer: Timothy Farkas <[email protected]> Committed: Wed Sep 16 15:31:44 2015 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/api/Attribute.java | 11 +- .../main/java/com/datatorrent/api/Context.java | 10 - .../stram/plan/logical/LogicalPlan.java | 6 +- .../plan/logical/LogicalPlanConfiguration.java | 472 +++-- .../plan/LogicalPlanConfigurationTest.java | 1788 ------------------ .../datatorrent/stram/plan/LogicalPlanTest.java | 990 ---------- .../logical/LogicalPlanConfigurationTest.java | 1511 +++++++++++++++ .../stram/plan/logical/LogicalPlanTest.java | 988 ++++++++++ .../src/test/resources/schemaTestTopology.json | 2 +- 9 files changed, 2785 insertions(+), 2993 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/api/src/main/java/com/datatorrent/api/Attribute.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java index 4c16a2a..a7492b5 100644 --- a/api/src/main/java/com/datatorrent/api/Attribute.java +++ b/api/src/main/java/com/datatorrent/api/Attribute.java @@ -277,13 +277,6 @@ public class Attribute<T> implements Serializable if (map.containsKey(clazz)) { return 0; } - - map.put(clazz, getAttributesNoSave(clazz)); - return (long)clazz.getModifiers() << 32 | clazz.hashCode(); - } - - public static Set<Attribute<Object>> getAttributesNoSave(Class<?> clazz) - { Set<Attribute<Object>> set = new HashSet<Attribute<Object>>(); try { for (Field f: clazz.getDeclaredFields()) { @@ -330,8 +323,8 @@ public class Attribute<T> implements Serializable catch (Exception ex) { DTThrowable.rethrow(ex); } - - return set; + map.put(clazz, set); + return (long)clazz.getModifiers() << 32 | clazz.hashCode(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index c2d974a..cd10398 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -33,16 +33,6 @@ import com.datatorrent.api.annotation.Stateless; */ public interface Context { - /* - * Note: If the same name is given to an Attribute specified in multiple Context classes, then the type of that - * Attribute is required to be the same accross all Context classes. This is required because if a simple attribute - * name is specified in a properties file at the top level context then that attribute needs to be set in all child configurations. If - * there were multiple Attributes specified in different Contexts with the same name, but a different type, then - * it would not be possible to set the values of Attributes specified by a simple attribute name in the root - * context of a properties file. If this were the case, then adding another Attribute with the same name as a pre-existing Attribute to a new Context - * class would be a backwards incompatible change. - */ - /** * Get the attributes associated with this context. * The returned map does not contain any attributes that may have been defined in the parent context of this context. http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 8826896..94d18ba 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1088,7 +1088,7 @@ public class LogicalPlan implements Serializable, DAG if (e.getKey().getOperatorWrapper() == om) { stream.sinks.remove(e.getKey()); } - // If persistStream was enabled for stream, reset stream when sink removed + // If persistStream was enabled for stream, reset stream when sink removed stream.resetStreamPersistanceOnSinkRemoval(e.getKey()); } this.operators.remove(om.getName()); @@ -1431,11 +1431,11 @@ public class LogicalPlan implements Serializable, DAG for (StreamMeta s: streams.values()) { if (s.source == null) { - throw new ValidationException(String.format("stream source not connected: %s", s.getName())); + throw new ValidationException("Stream source not connected: " + s.getName()); } if (s.sinks.isEmpty()) { - throw new ValidationException(String.format("stream sink not connected: %s", s.getName())); + throw new ValidationException("Stream sink not connected: " + s.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index a3a18c2..7a53cd7 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -15,6 +15,7 @@ */ package com.datatorrent.stram.plan.logical; + import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -22,14 +23,17 @@ import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + import java.util.*; import java.util.Map.Entry; -import jline.internal.Preconditions; import javax.validation.ValidationException; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -61,7 +65,6 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; -import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement; import com.datatorrent.stram.util.ObjectMapperFactory; /** @@ -159,43 +162,16 @@ public class LogicalPlanConfiguration { */ protected enum ConfElement { - @SuppressWarnings("SetReplaceableByEnumSet") - STRAM(null, - null, - new HashSet<StramElement>(), - null), - @SuppressWarnings("SetReplaceableByEnumSet") - APPLICATION(StramElement.APPLICATION, - STRAM, - new HashSet<StramElement>(), - DAGContext.class), - @SuppressWarnings("SetReplaceableByEnumSet") - TEMPLATE(StramElement.TEMPLATE, - STRAM, - new HashSet<StramElement>(), - null), - @SuppressWarnings("SetReplaceableByEnumSet") - GATEWAY(StramElement.GATEWAY, - ConfElement.APPLICATION, - new HashSet<StramElement>(), - null), - @SuppressWarnings("SetReplaceableByEnumSet") - OPERATOR(StramElement.OPERATOR, - ConfElement.APPLICATION, - new HashSet<StramElement>(), - OperatorContext.class), - @SuppressWarnings("SetReplaceableByEnumSet") - STREAM(StramElement.STREAM, - ConfElement.APPLICATION, - new HashSet<StramElement>(), - null), - PORT(StramElement.PORT, - ConfElement.OPERATOR, - Sets.newHashSet(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), - PortContext.class); - - public static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap(); - public static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap(); + STRAM(null, null, null, null), + APPLICATION(StramElement.APPLICATION, STRAM, null, DAGContext.class), + TEMPLATE(StramElement.TEMPLATE, STRAM, null, null), + GATEWAY(StramElement.GATEWAY, ConfElement.APPLICATION, null, null), + OPERATOR(StramElement.OPERATOR, ConfElement.APPLICATION, null, OperatorContext.class), + STREAM(StramElement.STREAM, ConfElement.APPLICATION, null, null), + PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class); + + protected static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap(); + protected static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap(); static { initialize(); @@ -246,12 +222,8 @@ public class LogicalPlanConfiguration { } if (!ContextUtils.CONTEXT_CLASSES.equals(confElementContextClasses)) { - throw new IllegalStateException("All the context classes " - + ContextUtils.CONTEXT_CLASSES - + " found in " - + Context.class - + " are not used by ConfElements " - + confElementContextClasses); + throw new IllegalStateException("All the context classes " + ContextUtils.CONTEXT_CLASSES + " found in " + + Context.class + " are not used by ConfElements " + confElementContextClasses); } } @@ -312,16 +284,15 @@ public class LogicalPlanConfiguration { this.element = element; this.parent = parent; - this.allRelatedElements.addAll(additionalRelatedElements); + if (additionalRelatedElements != null) { + this.allRelatedElements.addAll(additionalRelatedElements); + } + this.allRelatedElements.add(element); this.contextClass = contextClass; - if (contextClass != null) { - this.contextAttributes = ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass); - } else { - this.contextAttributes = Sets.newHashSet(); - } + this.contextAttributes = contextClass != null ? ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass) : new HashSet<String>(); } private void setAllChildAttributes(Set<String> allChildAttributes) @@ -445,8 +416,7 @@ public class LogicalPlanConfiguration { * * @param conf The current {@link Conf} type. * @return A path from the current {@link Conf} type to a root {@link Conf} type, which includes the current and root - * { - * @lin Conf} types. + * {@link Conf} types. */ public static List<StramElement> getPathFromChildToRootInclusive(StramElement conf) { @@ -471,8 +441,7 @@ public class LogicalPlanConfiguration { * * @param conf The current {@link Conf} type. * @return A path from the root {@link Conf} type to the current {@link Conf} type, which includes the current and root - * { - * @lin Conf} types. + * {@link Conf} types. */ public static List<StramElement> getPathFromRootToChildInclusive(StramElement conf) { @@ -487,11 +456,9 @@ public class LogicalPlanConfiguration { * @param child The current {@link Conf} type. * @param parent The parent {@link Conf} type. * @return A path from the current {@link Conf} type to a parent {@link Conf} type, which includes the current and parent - * { - * @lin Conf} types. + * {@link Conf} types. */ - public static List<StramElement> getPathFromChildToParentInclusive(StramElement child, - StramElement parent) + public static List<StramElement> getPathFromChildToParentInclusive(StramElement child, StramElement parent) { ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(child); @@ -528,11 +495,9 @@ public class LogicalPlanConfiguration { * @param child The current {@link Conf} type. * @param parent The parent {@link Conf} type. * @return A path from the parent {@link Conf} type to the current {@link Conf} type, which includes the current and parent - * { - * @lin Conf} types. + * {@link Conf} types. */ - public static List<StramElement> getPathFromParentToChildInclusive(StramElement child, - StramElement parent) + public static List<StramElement> getPathFromParentToChildInclusive(StramElement child, StramElement parent) { List<StramElement> path = getPathFromChildToParentInclusive(child, parent); @@ -548,8 +513,7 @@ public class LogicalPlanConfiguration { * @return The {@link ConfElement} that contains the given attribute, or null if no {@link ConfElement} contains * the given attribute. */ - public static ConfElement findConfElementWithAttribute(ConfElement current, - String simpleAttributeName) + public static ConfElement findConfElementWithAttribute(ConfElement current, String simpleAttributeName) { if (current.getContextAttributes().contains(simpleAttributeName)) { return current; @@ -573,9 +537,7 @@ public class LogicalPlanConfiguration { List<StramElement> path = ConfElement.getPathFromParentToChildInclusive(childConfElement.getStramElement(), parentConf.getConfElement().getStramElement()); - for (int pathIndex = 1; - pathIndex < path.size(); - pathIndex++) { + for (int pathIndex = 1; pathIndex < path.size(); pathIndex++) { LOG.debug("Adding conf"); StramElement pathElement = path.get(pathIndex); //Add the configurations we need to hold this attribute @@ -593,12 +555,19 @@ public class LogicalPlanConfiguration { @SuppressWarnings("unchecked") protected static class ContextUtils { - public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES; - public static final Set<Class<? extends Context>> CONTEXT_CLASSES; - public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE; + private static final Map<String, Type> ATTRIBUTES_TO_TYPE = Maps.newHashMap(); + public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap(); + public static final Set<Class<? extends Context>> CONTEXT_CLASSES = Sets.newHashSet(); + public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap(); static { - CONTEXT_CLASSES = Sets.newHashSet(); + initialize(); + } + + @VisibleForTesting + protected static void initialize() + { + CONTEXT_CLASSES.clear(); for (Class<?> clazz: Context.class.getDeclaredClasses()) { if (!Context.class.isAssignableFrom(clazz)) { @@ -608,9 +577,17 @@ public class LogicalPlanConfiguration { CONTEXT_CLASSES.add((Class<? extends Context>)clazz); } - CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap(); + buildAttributeMaps(CONTEXT_CLASSES); + } - for (Class<? extends Context> contextClass: CONTEXT_CLASSES) { + @VisibleForTesting + protected static void buildAttributeMaps(Set<Class<? extends Context>> contextClasses) + { + CONTEXT_CLASS_TO_ATTRIBUTES.clear(); + CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.clear(); + ATTRIBUTES_TO_TYPE.clear(); + + for (Class<? extends Context> contextClass: contextClasses) { Set<String> contextAttributes = Sets.newHashSet(); Field[] fields = contextClass.getDeclaredFields(); @@ -620,19 +597,29 @@ public class LogicalPlanConfiguration { continue; } + Type fieldType = ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[0]; contextAttributes.add(field.getName()); + + Type existingType = ATTRIBUTES_TO_TYPE.get(field.getName()); + + if (existingType != null && !existingType.equals(fieldType)) { + throw new ValidationException("The attribute " + field.getName() + + " is defined with two different types in two different context classes: " + + fieldType + " and " + existingType + "\n" + + "Attributes with the same name are required to have the same type accross all Context classes."); + } + + ATTRIBUTES_TO_TYPE.put(field.getName(), fieldType); } CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, contextAttributes); } - CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap(); - - for (Class<? extends Context> contextClass: CONTEXT_CLASSES) { + for (Class<? extends Context> contextClass: contextClasses) { Map<String, Attribute<?>> simpleAttributeNameToAttribute = Maps.newHashMap(); CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, simpleAttributeNameToAttribute); - Set<Attribute<Object>> attributes = AttributeInitializer.getAttributesNoSave(contextClass); + Set<Attribute<Object>> attributes = AttributeInitializer.getAttributes(contextClass); LOG.debug("context class {} and attributes {}", contextClass, attributes); @@ -644,6 +631,7 @@ public class LogicalPlanConfiguration { private ContextUtils() { + //Private construct to prevent instantiation of utility class } /** @@ -735,6 +723,7 @@ public class LogicalPlanConfiguration { private AttributeParseUtils() { + //Private construct to prevent instantiation of utility class } /** @@ -782,11 +771,7 @@ public class LogicalPlanConfiguration { { if (element != null && element != StramElement.ATTR) { - throw new IllegalArgumentException("The given " - + StramElement.class - + " must either have a value of null or " - + StramElement.ATTR - + " but it had a value of " + element); + throw new IllegalArgumentException("The given " + StramElement.class + " must either have a value of null or " + StramElement.ATTR + " but it had a value of " + element); } String attributeName; @@ -823,9 +808,7 @@ public class LogicalPlanConfiguration { public static Class<? extends Context> getContainingContextClass(String attributeName) { if (isSimpleAttributeName(attributeName)) { - throw new IllegalArgumentException("The given attribute name " - + attributeName - + " is simple."); + throw new IllegalArgumentException("The given attribute name " + attributeName + " is simple."); } LOG.debug("Attribute Name {}", attributeName); @@ -847,9 +830,7 @@ public class LogicalPlanConfiguration { if (Context.class.isAssignableFrom(clazz)) { contextClass = (Class<? extends Context>)clazz; } else { - throw new IllegalArgumentException("The provided context class name " - + contextClassName - + " is not valid."); + throw new IllegalArgumentException("The provided context class name " + contextClassName + " is not valid."); } } catch (ClassNotFoundException ex) { throw new IllegalArgumentException(ex); @@ -858,9 +839,7 @@ public class LogicalPlanConfiguration { String simpleAttributeName = getSimpleAttributeName(attributeName); if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(simpleAttributeName)) { - throw new ValidationException(simpleAttributeName - + " is not a valid attribute of " - + contextClass); + throw new ValidationException(simpleAttributeName + " is not a valid attribute of " + contextClass); } return contextClass; @@ -879,9 +858,7 @@ public class LogicalPlanConfiguration { } if (attributeName.endsWith(KEY_SEPARATOR)) { - throw new IllegalArgumentException("The given attribute name ends with \"" - + KEY_SEPARATOR - + "\" so a simple name cannot be extracted."); + throw new IllegalArgumentException("The given attribute name ends with \"" + KEY_SEPARATOR + "\" so a simple name cannot be extracted."); } return attributeName.substring(attributeName.lastIndexOf(KEY_SEPARATOR) + 1, attributeName.length()); @@ -961,6 +938,13 @@ public class LogicalPlanConfiguration { return (T)parentConf; } + /** + * Gets an ancestor {@link Conf} of this {@link Conf} of the given {@link StramElement} type. + * @param <T> The {@link Conf} Class of the ancestor conf + * @param ancestorElement The {@link StramElement} representing the type of the ancestor {@link Conf}. + * @return The ancestor {@link Conf} of the corresponding {@link StramElement} type, or null if no ancestor {@link Conf} with + * the given {@link StramElement} type exists. + */ @SuppressWarnings("unchecked") public <T extends Conf> T getAncestorConf(StramElement ancestorElement) { if (getConfElement().getStramElement() == ancestorElement) { @@ -973,6 +957,16 @@ public class LogicalPlanConfiguration { } } + /** + * This method retrieves a child {@link Conf} of the given {@link StramElement} type with the given name. If + * a child {@link Conf} with the given name and {@link StramElement} type doesn't exist, then it is added. + * @param <T> The type of the child {@link Conf}. + * @param id The name of the child {@link Conf}. + * @param childType The {@link StramElement} representing the type of the child {@link Conf}. + * @param clazz The {@link java.lang.Class} of the child {@link Conf} to add if a {@link Conf} of the given id + * and {@link StramElement} type is not present. + * @return A child {@link Conf} of this {@link Conf} with the given id and {@link StramElement} type. + */ public <T extends Conf> T getOrAddChild(String id, StramElement childType, Class<T> clazz) { @SuppressWarnings("unchecked") Map<String, T> elChildren = (Map<String, T>)children.get(childType); @@ -999,6 +993,15 @@ public class LogicalPlanConfiguration { properties.setDefaultProperties(defaults); } + /** + * This method returns a list of all the child {@link Conf}s of this {@link Conf} with the matching name + * and {@link StramElement} type. + * @param <T> The types of the child {@link Conf}s. + * @param name The name of the child {@link Conf}s to return. If the name of the specified child {@link Conf} + * is null then configurations with the name specified as a {@link LogicalPlanConfiguration#WILDCARD} are matched. + * @param childType The {@link StramElement} corresponding to the type of a child {@link Conf}. + * @return The list of child {@link Conf}s with a matching name and {@link StramElement} type. + */ public <T extends Conf> List<T> getMatchingChildConf(String name, StramElement childType) { List<T> childConfs = new ArrayList<>(); Map<String, T> elChildren = getChildren(childType); @@ -1038,6 +1041,17 @@ public class LogicalPlanConfiguration { return childConfs; } + /** + * Returns the {@link Conf} corresponding to the given id from the given map. If a {@link Conf} with the + * given id is not present in the given map, then a new {@link Conf} of the given class is created and added + * to the map. + * @param <T> The type of the {@link Conf}s contained in the map. + * @param map The map to retrieve a {@link Conf} from or add a {@link Conf} to. + * @param id The name of the {@link Conf} to retrieve from or add to the given map. + * @param clazz The {@link java.lang.Class} of the {@link Conf} to add to the given map, if a {@link Conf} with + * the given name is not present in the given map. + * @return A {@link Conf} with the given name, contained in the given map. + */ protected <T extends Conf> T getOrAddConf(Map<String, T> map, String id, Class<T> clazz) { T conf = map.get(id); if (conf == null) { @@ -1046,12 +1060,7 @@ public class LogicalPlanConfiguration { conf = declaredConstructor.newInstance(new Object[] {}); conf.setId(id); map.put(id, conf); - } catch (IllegalAccessException | - IllegalArgumentException | - InstantiationException | - NoSuchMethodException | - SecurityException | - InvocationTargetException e) { + } catch (IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | SecurityException | InvocationTargetException e) { LOG.error("Error instantiating configuration", e); } } @@ -1470,6 +1479,19 @@ public class LogicalPlanConfiguration { elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class); } + /** + * This is a helper method which performs the following checks:<br/><br/> + * <ol> + * <li>If the given {@link StramElement} corresponds to a {@link Conf} type which is + * the same as the type of the given {@link Conf}, then the given {@link Conf} is returned.</li> + * <li>If the given {@link StramElement} corresponds to a {@link Conf} type which is + * a valid parent {@link Conf} type for the given ancestorConf, then the given ancestor {@link Conf} is + * returned.</li> + * @param element The {@link StramElement} type corresponding to this {@link Conf} or + * to a valid ancestor {@link Conf}. + * @param ancestorConf The {@link Conf} to return. + * @return The given {@link Conf}, or null if the first call to this method passes a null {@link StramElement}. + */ private static Conf getConf(StramElement element, Conf ancestorConf) { if (element == ancestorConf.getConfElement().getStramElement()) { return ancestorConf; @@ -1481,9 +1503,23 @@ public class LogicalPlanConfiguration { } StramElement parentElement = ConfElement.getAllowedParentConf(element); Conf parentConf = getConf(parentElement, ancestorConf); + + if(parentConf == null) { + throw new IllegalArgumentException("The given StramElement is not the same type as the given ancestorConf, " + + "and it is not a valid type for a parent conf."); + } + return parentConf.getOrAddChild(WILDCARD, element, elementMaps.get(element)); } + /** + * This method adds a child {@link Conf} with the given {@link StramElement} type and name to the given + * ancestorConf. + * @param element The {@link StramElement} of the child {@link Conf} to add to the given ancestorConf. + * @param name The name of the child {@link Conf} to add to the given ancestorConf. + * @param ancestorConf The {@link Conf} to add a child {@link Conf} to. + * @return The child {@link Conf} that was added to the given ancestorConf. + */ private static Conf addConf(StramElement element, String name, Conf ancestorConf) { StramElement parentElement = ConfElement.getAllowedParentConf(element); Conf conf1 = null; @@ -1494,6 +1530,16 @@ public class LogicalPlanConfiguration { return conf1; } + /** + * This method returns a list of all the child {@link Conf}s of the given {@link List} of {@link Conf}s with the matching name + * and {@link StramElement} type. + * @param <T> The types of the child {@link Conf}s. + * @param confs The list of {@link Conf}s whose children will be searched. + * @param name The name of the child {@link Conf}s to return. If the name of the specified child {@link Conf} + * is null then configurations with the name specified as a {@link LogicalPlanConfiguration#WILDCARD} are matched. + * @param childType The {@link StramElement} corresponding to the type of a child {@link Conf}. + * @return The list of child {@link Conf}s with a matching name and {@link StramElement} type. + */ private <T extends Conf> List<T> getMatchingChildConf(List<? extends Conf> confs, String name, StramElement childType) { List<T> childConfs = Lists.newArrayList(); for (Conf conf1 : confs) { @@ -1685,7 +1731,7 @@ public class LogicalPlanConfiguration { * @param index The current index that the parser is on for processing the property name. * @param propertyName The original unsplit Apex property name. * @param propertyValue The value corresponding to the Apex property. - * @param conf + * @param conf The current {@link Conf} to add properties to. */ private void parseStramPropertyTokens(String[] keys, int index, String propertyName, String propertyValue, Conf conf) { if (index < keys.length) { @@ -1697,104 +1743,141 @@ public class LogicalPlanConfiguration { if ((element == StramElement.APPLICATION) || (element == StramElement.OPERATOR) || (element == StramElement.STREAM) || (element == StramElement.PORT) || (element == StramElement.INPUT_PORT) || (element == StramElement.OUTPUT_PORT) || (element == StramElement.TEMPLATE)) { - if ((index + 1) < keys.length) { - String name = keys[index+1]; - Conf elConf = addConf(element, name, conf); - if (elConf != null) { - parseStramPropertyTokens(keys, index + 2, propertyName, propertyValue, elConf); - } else { - LOG.error("Invalid configuration key: {}", propertyName); - } - } else { - LOG.warn("Invalid configuration key: {}", propertyName); - } - } else if ((element == StramElement.GATEWAY)) { - Conf elConf = addConf(element, null, conf); - if (elConf != null) { - parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf); - } else { - LOG.error("Invalid configuration key: {}", propertyName); - } + parseAppElement(index, keys, element, conf, propertyName, propertyValue); + } else if (element == StramElement.GATEWAY) { + parseGatewayElement(element, conf, keys, index, propertyName, propertyValue); } else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) { - String attributeName = AttributeParseUtils.getAttributeName(element, keys, index); - - if (element != StramElement.ATTR) { - String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName; - LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName); - } - - if (conf.getConfElement().getStramElement() == null) { - conf = addConf(StramElement.APPLICATION, WILDCARD, conf); - } - - if (conf != null) { - if (AttributeParseUtils.isSimpleAttributeName(attributeName)) { - //The provided attribute name was a simple name - - if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) { - throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0)); - } - - if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) { - throw new ValidationException(attributeName - + " is not defined for the " - + conf.getConfElement().getStramElement() - + " or any of its child configurations."); - } - - if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) { - //If the attribute name is ambiguous at this configuration level we should tell the user. - LOG.warn("The attribute " - + attributeName - + " is ambiguous when specified on an " + conf.getConfElement().getStramElement()); - } - - if (conf.getConfElement().getContextAttributes().contains(attributeName)) { - @SuppressWarnings("unchecked") - Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName); - conf.setAttribute(attr, propertyValue); - } else { - AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue); - } - } else { - //This is a FQ attribute name - Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName); - - //Convert to a simple name - attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName); - - if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) { - throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName()); - } + parseAttributeElement(element, keys, index, conf, propertyValue, propertyName); + } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) { + parsePropertyElement(element, keys, index, conf, propertyValue, propertyName); + } else if (element != null) { + conf.parseElement(element, keys, index, propertyValue); + } + } + } - ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass); + /** + * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an app element. + * @param element The current {@link StramElement} of the property being parsed. + * @param keys The keys that the property being parsed was split into. + * @param index The current key that the parser is on. + * @param propertyValue The value associated with the property being parsed. + * @param propertyName The complete unprocessed name of the property being parsed. + */ + private void parseAppElement(int index, String[] keys, StramElement element, Conf conf1, String propertyName, String propertyValue) + { + if ((index + 1) < keys.length) { + String name = keys[index+1]; + Conf elConf = addConf(element, name, conf1); + if (elConf != null) { + parseStramPropertyTokens(keys, index + 2, propertyName, propertyValue, elConf); + } else { + LOG.error("Invalid configuration key: {}", propertyName); + } + } else { + LOG.warn("Invalid configuration key: {}", propertyName); + } + } - conf = ConfElement.addConfs(conf, confWithAttr); + /** + * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a gateway element. + * @param element The current {@link StramElement} of the property being parsed. + * @param keys The keys that the property being parsed was split into. + * @param index The current key that the parser is on. + * @param propertyValue The value associated with the property being parsed. + * @param propertyName The complete unprocessed name of the property being parsed. + */ + private void parseGatewayElement(StramElement element, Conf conf1, String[] keys, int index, String propertyName, String propertyValue) + { + Conf elConf = addConf(element, null, conf1); + if (elConf != null) { + parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf); + } else { + LOG.error("Invalid configuration key: {}", propertyName); + } + } - @SuppressWarnings("unchecked") - Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName); - conf.setAttribute(attr, propertyValue); - } - } else { - LOG.error("Invalid configuration key: {}", propertyName); + /** + * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an attribute. + * @param element The current {@link StramElement} of the property being parsed. + * @param keys The keys that the property being parsed was split into. + * @param index The current key that the parser is on. + * @param conf The current {@link Conf}. + * @param propertyValue The value associated with the property being parsed. + * @param propertyName The complete unprocessed name of the property being parsed. + */ + private void parseAttributeElement(StramElement element, String[] keys, int index, Conf conf, String propertyValue, String propertyName) + { + String attributeName = AttributeParseUtils.getAttributeName(element, keys, index); + if (element != StramElement.ATTR) { + String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName; + LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName); + } + if (conf.getConfElement().getStramElement() == null) { + conf = addConf(StramElement.APPLICATION, WILDCARD, conf); + } + if (conf != null) { + if (AttributeParseUtils.isSimpleAttributeName(attributeName)) { + //The provided attribute name was a simple name + if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) { + throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0)); } - } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) { - // Currently opProps are only supported on operators and streams - // Supporting current implementation where property can be directly specified under operator - String prop; - if (element == StramElement.PROP) { - prop = getCompleteKey(keys, index+1); - } else { - prop = getCompleteKey(keys, index); + if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) { + throw new ValidationException(attributeName + " is not defined for the " + conf.getConfElement().getStramElement() + " or any of its child configurations."); + } + if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) { + //If the attribute name is ambiguous at this configuration level we should tell the user. + LOG.warn("The attribute " + attributeName + " is ambiguous when specified on an " + conf.getConfElement().getStramElement()); } - if (prop != null) { - conf.setProperty(prop, propertyValue); + if (conf.getConfElement().getContextAttributes().contains(attributeName)) { + @SuppressWarnings(value = "unchecked") + Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName); + conf.setAttribute(attr, propertyValue); } else { - LOG.warn("Invalid property specification, no property name specified for {}", propertyName); + AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue); } - } else if (element != null) { - conf.parseElement(element, keys, index, propertyValue); + } else { + //This is a FQ attribute name + Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName); + //Convert to a simple name + attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName); + if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) { + throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName()); + } + ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass); + conf = ConfElement.addConfs(conf, confWithAttr); + @SuppressWarnings("unchecked") + Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName); + conf.setAttribute(attr, propertyValue); } + } else { + LOG.error("Invalid configuration key: {}", propertyName); + } + } + + /** + * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a prop. + * @param element The current {@link StramElement} of the property being parsed. + * @param keys The keys that the property being parsed was split into. + * @param index The current key that the parser is on. + * @param conf The current {@link Conf}. + * @param propertyValue The value associated with the property being parsed. + * @param propertyName The complete unprocessed name of the property being parsed. + */ + private void parsePropertyElement(StramElement element, String[] keys, int index, Conf conf, String propertyValue, String propertyName) + { + // Currently opProps are only supported on operators and streams + // Supporting current implementation where property can be directly specified under operator + String prop; + if (element == StramElement.PROP) { + prop = getCompleteKey(keys, index+1); + } else { + prop = getCompleteKey(keys, index); + } + if (prop != null) { + conf.setProperty(prop, propertyValue); + } else { + LOG.warn("Invalid property specification, no property name specified for {}", propertyName); } } @@ -1831,7 +1914,12 @@ public class LogicalPlanConfiguration { * @return The completed key. */ private static String getCompleteKey(String[] keys, int start, int end) { - StringBuilder sb = new StringBuilder(1024); + int length = 0; + for (int keyIndex = 0; keyIndex < keys.length; keyIndex++) { + length += keys[keyIndex].length(); + } + + StringBuilder sb = new StringBuilder(length); for (int i = start; i < end; ++i) { if (i > start) { sb.append(KEY_SEPARATOR);
