APEX-28 #resolve - Added the ability to specify attributes in a properties xml as dt.attr.MY_ATTR - Added handling for ambiguous attributes - Added the ability to specify attributes for operators and ports on parent elements like applications - Cleaned up commented out code and warnings in LogicalPlanConfiguration
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/434a7170 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/434a7170 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/434a7170 Branch: refs/heads/devel-3 Commit: 434a7170e4a4e2da013e283706b5bf42f853f6a4 Parents: d748ed4 Author: Timothy Farkas <[email protected]> Authored: Tue Aug 11 10:50:57 2015 -0700 Committer: Timothy Farkas <[email protected]> Committed: Sun Aug 30 18:19:40 2015 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/api/Attribute.java | 11 +- .../main/java/com/datatorrent/api/Context.java | 10 + .../stram/plan/logical/LogicalPlan.java | 8 +- .../plan/logical/LogicalPlanConfiguration.java | 1145 ++++++++++++++---- .../plan/LogicalPlanConfigurationTest.java | 1034 +++++++++++++++- .../datatorrent/stram/plan/LogicalPlanTest.java | 2 + .../stram/plan/logical/MockStorageAgent.java | 67 + engine/src/test/resources/testTopology.json | 4 +- 8 files changed, 2008 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/api/src/main/java/com/datatorrent/api/Attribute.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java index a7492b5..4c16a2a 100644 --- a/api/src/main/java/com/datatorrent/api/Attribute.java +++ b/api/src/main/java/com/datatorrent/api/Attribute.java @@ -277,6 +277,13 @@ public class Attribute<T> implements Serializable if (map.containsKey(clazz)) { return 0; } + + map.put(clazz, getAttributesNoSave(clazz)); + return (long)clazz.getModifiers() << 32 | clazz.hashCode(); + } + + public static Set<Attribute<Object>> getAttributesNoSave(Class<?> clazz) + { Set<Attribute<Object>> set = new HashSet<Attribute<Object>>(); try { for (Field f: clazz.getDeclaredFields()) { @@ -323,8 +330,8 @@ public class Attribute<T> implements Serializable catch (Exception ex) { DTThrowable.rethrow(ex); } - map.put(clazz, set); - return (long)clazz.getModifiers() << 32 | clazz.hashCode(); + + return set; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index cd10398..c2d974a 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -33,6 +33,16 @@ import com.datatorrent.api.annotation.Stateless; */ public interface Context { + /* + * Note: If the same name is given to an Attribute specified in multiple Context classes, then the type of that + * Attribute is required to be the same accross all Context classes. This is required because if a simple attribute + * name is specified in a properties file at the top level context then that attribute needs to be set in all child configurations. If + * there were multiple Attributes specified in different Contexts with the same name, but a different type, then + * it would not be possible to set the values of Attributes specified by a simple attribute name in the root + * context of a properties file. If this were the case, then adding another Attribute with the same name as a pre-existing Attribute to a new Context + * class would be a backwards incompatible change. + */ + /** * Get the attributes associated with this context. * The returned map does not contain any attributes that may have been defined in the parent context of this context. http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 6741d37..8826896 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1430,8 +1430,12 @@ public class LogicalPlan implements Serializable, DAG } for (StreamMeta s: streams.values()) { - if (s.source == null || (s.sinks.isEmpty())) { - throw new ValidationException(String.format("stream not connected: %s", s.getName())); + if (s.source == null) { + throw new ValidationException(String.format("stream source not connected: %s", s.getName())); + } + + if (s.sinks.isEmpty()) { + throw new ValidationException(String.format("stream sink not connected: %s", s.getName())); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 46291a8..a3a18c2 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -20,12 +20,17 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.lang.reflect.Constructor; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.*; import java.util.Map.Entry; +import jline.internal.Preconditions; + import javax.validation.ValidationException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -37,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.beanutils.BeanMap; import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.commons.lang3.StringUtils; @@ -44,6 +50,7 @@ import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.*; import com.datatorrent.api.Attribute.AttributeMap.AttributeInitializer; +import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.annotation.ApplicationAnnotation; @@ -54,6 +61,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement; import com.datatorrent.stram.util.ObjectMapperFactory; /** @@ -88,30 +96,51 @@ public class LogicalPlanConfiguration { public static final String TEMPLATE_classNameRegExp = "matchClassNameRegExp"; public static final String CLASS = "class"; + public static final String KEY_SEPARATOR = "."; + public static final String KEY_SEPARATOR_SPLIT_REGEX = "\\."; private static final String CLASS_SUFFIX = "." + CLASS; private static final String WILDCARD = "*"; private static final String WILDCARD_PATTERN = ".*"; + /** + * This is done to initialize the serial id of these interfaces. + */ static { Object serial[] = new Object[] {Context.DAGContext.serialVersionUID, OperatorContext.serialVersionUID, PortContext.serialVersionUID}; LOG.debug("Initialized attributes {}", serial); } - private enum StramElement { + /** + * This represents an element that can be referenced in a DT property. + */ + protected enum StramElement { APPLICATION("application"), GATEWAY("gateway"), TEMPLATE("template"), OPERATOR("operator"),STREAM("stream"), PORT("port"), INPUT_PORT("inputport"),OUTPUT_PORT("outputport"), ATTR("attr"), PROP("prop"),CLASS("class"),PATH("path"); private final String value; + /** + * Creates a {@link StramElement} with the corresponding name. + * @param value The name of the {@link StramElement}. + */ StramElement(String value) { this.value = value; } + /** + * Gets the name of the {@link StramElement}. + * @return The name of the {@link StramElement}. + */ public String getValue() { return value; } + /** + * Gets the {@link StramElement} corresponding to the given name. + * @param value The name for which a {@link StramElement} is desired. + * @return The {@link StramElement} corresponding to the given name. + */ public static StramElement fromValue(String value) { StramElement velement = null; for (StramElement element : StramElement.values()) { @@ -124,7 +153,754 @@ public class LogicalPlanConfiguration { } } - + + /** + * This is an enum which represents a type of configuration. + */ + protected enum ConfElement + { + @SuppressWarnings("SetReplaceableByEnumSet") + STRAM(null, + null, + new HashSet<StramElement>(), + null), + @SuppressWarnings("SetReplaceableByEnumSet") + APPLICATION(StramElement.APPLICATION, + STRAM, + new HashSet<StramElement>(), + DAGContext.class), + @SuppressWarnings("SetReplaceableByEnumSet") + TEMPLATE(StramElement.TEMPLATE, + STRAM, + new HashSet<StramElement>(), + null), + @SuppressWarnings("SetReplaceableByEnumSet") + GATEWAY(StramElement.GATEWAY, + ConfElement.APPLICATION, + new HashSet<StramElement>(), + null), + @SuppressWarnings("SetReplaceableByEnumSet") + OPERATOR(StramElement.OPERATOR, + ConfElement.APPLICATION, + new HashSet<StramElement>(), + OperatorContext.class), + @SuppressWarnings("SetReplaceableByEnumSet") + STREAM(StramElement.STREAM, + ConfElement.APPLICATION, + new HashSet<StramElement>(), + null), + PORT(StramElement.PORT, + ConfElement.OPERATOR, + Sets.newHashSet(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), + PortContext.class); + + public static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap(); + public static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap(); + + static { + initialize(); + } + + protected static void initialize() + { + STRAM.setChildren(Sets.newHashSet(APPLICATION, TEMPLATE)); + APPLICATION.setChildren(Sets.newHashSet(GATEWAY, OPERATOR, STREAM)); + OPERATOR.setChildren(Sets.newHashSet(PORT)); + + STRAM_ELEMENT_TO_CONF_ELEMENT.clear(); + + //Initialize StramElement to ConfElement + for (ConfElement confElement: ConfElement.values()) { + STRAM_ELEMENT_TO_CONF_ELEMENT.put(confElement.getStramElement(), confElement); + + for (StramElement sElement: confElement.getAllRelatedElements()) { + STRAM_ELEMENT_TO_CONF_ELEMENT.put(sElement, confElement); + } + } + + //Initialize attributes + for (ConfElement confElement: ConfElement.values()) { + if (confElement.getParent() == null) { + continue; + } + + setAmbiguousAttributes(confElement); + } + + // build context to conf element map + CONTEXT_TO_CONF_ELEMENT.clear(); + + for (ConfElement confElement: ConfElement.values()) { + CONTEXT_TO_CONF_ELEMENT.put(confElement.getContextClass(), confElement); + } + + //Check if all the context classes are accounted for + Set<Class<? extends Context>> confElementContextClasses = Sets.newHashSet(); + + for (ConfElement confElement: ConfElement.values()) { + if (confElement.getContextClass() == null) { + continue; + } + + confElementContextClasses.add(confElement.getContextClass()); + } + + if (!ContextUtils.CONTEXT_CLASSES.equals(confElementContextClasses)) { + throw new IllegalStateException("All the context classes " + + ContextUtils.CONTEXT_CLASSES + + " found in " + + Context.class + + " are not used by ConfElements " + + confElementContextClasses); + } + } + + /** + * This is a recursive method to initialize the ambiguous elements for each + * {@link ConfElement}. + * + * @param element The current {@link ConfElement} at which to start initializing + * the ambiguous elements. + * @return The set of all simple attribute names encountered up to this point. + */ + public static Set<String> setAmbiguousAttributes(ConfElement element) + { + Set<String> ambiguousAttributes = Sets.newHashSet(); + Set<String> allChildAttributes = Sets.newHashSet(element.getContextAttributes()); + + for (ConfElement childElement: element.getChildren()) { + Set<String> allAttributes = setAmbiguousAttributes(childElement); + ambiguousAttributes.addAll(childElement.getAmbiguousAttributes()); + + @SuppressWarnings("unchecked") + Set<String> intersection = (Set<String>)Sets.newHashSet(CollectionUtils.intersection(allChildAttributes, + allAttributes)); + ambiguousAttributes.addAll(intersection); + allChildAttributes.addAll(allAttributes); + } + + element.setAmbiguousAttributes(ambiguousAttributes); + element.setAllChildAttributes(allChildAttributes); + + return allChildAttributes; + } + + private final StramElement element; + private final ConfElement parent; + private Set<ConfElement> children = Sets.newHashSet(); + private final Set<StramElement> allRelatedElements = Sets.newHashSet(); + private final Class<? extends Context> contextClass; + private Set<String> ambiguousAttributes = Sets.newHashSet(); + private Set<String> contextAttributes = Sets.newHashSet(); + private Set<String> allChildAttributes = Sets.newHashSet(); + + /** + * This creates a {@link ConfElement}. + * + * @param element The current {@link StramElement} representing a {@link ConfElement}. + * @param parent The parent {@link ConfElement}. + * @param additionalRelatedElements Any additional {@link StramElement} that could be + * related to this {@link ConfElement}. + * @param contextClass The {@link Context} class that contains all the attributes to + * be used by this {@link ConfElement}. + */ + ConfElement(StramElement element, + ConfElement parent, + Set<StramElement> additionalRelatedElements, + Class<? extends Context> contextClass) + { + this.element = element; + this.parent = parent; + + this.allRelatedElements.addAll(additionalRelatedElements); + this.allRelatedElements.add(element); + + this.contextClass = contextClass; + + if (contextClass != null) { + this.contextAttributes = ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass); + } else { + this.contextAttributes = Sets.newHashSet(); + } + } + + private void setAllChildAttributes(Set<String> allChildAttributes) + { + this.allChildAttributes = Preconditions.checkNotNull(allChildAttributes); + } + + public Set<String> getAllChildAttributes() + { + return allChildAttributes; + } + + private void setAmbiguousAttributes(Set<String> ambiguousAttributes) + { + this.ambiguousAttributes = Preconditions.checkNotNull(ambiguousAttributes); + } + + /** + * Gets the simple names of attributes which are specified under multiple configurations which + * include this configuration or any child configurations. + * + * @return The set of ambiguous simple attribute names. + */ + public Set<String> getAmbiguousAttributes() + { + return ambiguousAttributes; + } + + /** + * Gets the {@link Context} class that corresponds to this {@link ConfElement}. + * + * @return The {@link Context} class that corresponds to this {@link ConfElement}. + */ + public Class<? extends Context> getContextClass() + { + return contextClass; + } + + /** + * Gets the {@link StramElement} representing this {@link ConfElement}. + * + * @return The {@link StramElement} corresponding to this {@link ConfElement}. + */ + public StramElement getStramElement() + { + return element; + } + + /** + * Gets the attributes contained in the {@link Context} associated with this {@link ConfElement}. + * + * @return A {@link java.util.Set} containing the simple attribute names of all of the attributes + * contained in the {@link Context} associated with this {@link ConfElement}. + */ + public Set<String> getContextAttributes() + { + return contextAttributes; + } + + /** + * Gets the {@link ConfElement} that is the parent of this {@link ConfElement}. + * + * @return The {@link ConfElement} that is the parent of this {@link ConfElement}. + */ + public ConfElement getParent() + { + return parent; + } + + /** + * Sets the child {@link ConfElement}s of this {@link ConfElement}. + * + * @param children The child {@link ConfElement}s of this {@link ConfElement}. + */ + private void setChildren(Set<ConfElement> children) + { + this.children = Preconditions.checkNotNull(children); + } + + /** + * Gets the child {@link ConfElement}s of this {@link ConfElement}. + * + * @return The child {@link ConfElement} of this {@link ConfElement} + */ + public Set<ConfElement> getChildren() + { + return children; + } + + /** + * Gets all the {@link StramElement}s that are represented by this {@link ConfElement}. + * + * @return All the {@link StramElement}s that are represented by this {@link ConfElement}. + */ + public Set<StramElement> getAllRelatedElements() + { + return allRelatedElements; + } + + /** + * Gets the {@link StramElement} representing the {@link Conf} type which can be a parent of the {@link Conf} type + * represented by the given {@link StramElement}. + * + * @param conf The {@link StramElement} representing the {@link Conf} type of interest. + * @return The {@link StramElement} representing the {@link Conf} type which can be a parent of the given {@link Conf} type. + */ + public static StramElement getAllowedParentConf(StramElement conf) + { + ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(conf); + + if (confElement == null) { + throw new IllegalArgumentException(conf + " is not a valid conf element."); + } + + return confElement.getParent().getStramElement(); + } + + /** + * Creates a list of {@link StramElement}s which represent the path from the current {@link Conf} type to + * a root {@link Conf} type. This path includes the current {@link Conf} type as well as the root. + * + * @param conf The current {@link Conf} type. + * @return A path from the current {@link Conf} type to a root {@link Conf} type, which includes the current and root + * { + * @lin Conf} types. + */ + public static List<StramElement> getPathFromChildToRootInclusive(StramElement conf) + { + ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(conf); + + if (confElement == null) { + throw new IllegalArgumentException(conf + " does not represent a valid configuration type."); + } + + List<StramElement> path = Lists.newArrayList(); + + for (; confElement != null; confElement = confElement.getParent()) { + path.add(confElement.getStramElement()); + } + + return path; + } + + /** + * Creates a list of {@link StramElement}s which represent the path from the root {@link Conf} type to + * the current {@link Conf} type. This path includes the root {@link Conf} type as well as the current {@link Conf} type. + * + * @param conf The current {@link Conf} type. + * @return A path from the root {@link Conf} type to the current {@link Conf} type, which includes the current and root + * { + * @lin Conf} types. + */ + public static List<StramElement> getPathFromRootToChildInclusive(StramElement conf) + { + List<StramElement> path = getPathFromChildToRootInclusive(conf); + return Lists.reverse(path); + } + + /** + * Creates a list of {@link StramElement}s which represent the path from the current {@link Conf} type to + * a parent {@link Conf} type. This path includes the current {@link Conf} type as well as the parent. + * + * @param child The current {@link Conf} type. + * @param parent The parent {@link Conf} type. + * @return A path from the current {@link Conf} type to a parent {@link Conf} type, which includes the current and parent + * { + * @lin Conf} types. + */ + public static List<StramElement> getPathFromChildToParentInclusive(StramElement child, + StramElement parent) + { + ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(child); + + if (confElement == null) { + throw new IllegalArgumentException(child + " does not represent a valid configuration type."); + } + + List<StramElement> path = Lists.newArrayList(); + + if (child == parent) { + path.add(child); + return path; + } + + for (; confElement != null; confElement = confElement.getParent()) { + path.add(confElement.getStramElement()); + + if (confElement.getStramElement() == parent) { + break; + } + } + + if (path.get(path.size() - 1) != parent) { + throw new IllegalArgumentException(parent + " is not a valid parent of " + child); + } + + return path; + } + + /** + * Creates a list of {@link StramElement}s which represent the path from the parent {@link Conf} type to + * a child {@link Conf} type. This path includes the parent {@link Conf} type as well as the current {@link Conf} type. + * + * @param child The current {@link Conf} type. + * @param parent The parent {@link Conf} type. + * @return A path from the parent {@link Conf} type to the current {@link Conf} type, which includes the current and parent + * { + * @lin Conf} types. + */ + public static List<StramElement> getPathFromParentToChildInclusive(StramElement child, + StramElement parent) + { + List<StramElement> path = getPathFromChildToParentInclusive(child, + parent); + return Lists.reverse(path); + } + + /** + * This method searches the current {@link ConfElement} and its children to find a {@link ConfElement} + * that contains the given simple {@link Attribute} name. + * + * @param current The current {@link ConfElement}. + * @param simpleAttributeName The simple {@link Attribute} name to search for. + * @return The {@link ConfElement} that contains the given attribute, or null if no {@link ConfElement} contains + * the given attribute. + */ + public static ConfElement findConfElementWithAttribute(ConfElement current, + String simpleAttributeName) + { + if (current.getContextAttributes().contains(simpleAttributeName)) { + return current; + } + + for (ConfElement childConfElement: current.getChildren()) { + ConfElement result = findConfElementWithAttribute(childConfElement, + simpleAttributeName); + + if (result != null) { + return result; + } + } + + return null; + } + + protected static Conf addConfs(Conf parentConf, ConfElement childConfElement) + { + //Figure out what configurations need to be added to hold this attribute + List<StramElement> path = ConfElement.getPathFromParentToChildInclusive(childConfElement.getStramElement(), + parentConf.getConfElement().getStramElement()); + + for (int pathIndex = 1; + pathIndex < path.size(); + pathIndex++) { + LOG.debug("Adding conf"); + StramElement pathElement = path.get(pathIndex); + //Add the configurations we need to hold this attribute + parentConf = addConf(pathElement, WILDCARD, parentConf); + } + + return parentConf; + } + + } + + /** + * Utility class that holds methods for handling {@link Context} classes. + */ + @SuppressWarnings("unchecked") + protected static class ContextUtils + { + public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES; + public static final Set<Class<? extends Context>> CONTEXT_CLASSES; + public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE; + + static { + CONTEXT_CLASSES = Sets.newHashSet(); + + for (Class<?> clazz: Context.class.getDeclaredClasses()) { + if (!Context.class.isAssignableFrom(clazz)) { + continue; + } + + CONTEXT_CLASSES.add((Class<? extends Context>)clazz); + } + + CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap(); + + for (Class<? extends Context> contextClass: CONTEXT_CLASSES) { + Set<String> contextAttributes = Sets.newHashSet(); + + Field[] fields = contextClass.getDeclaredFields(); + + for (Field field: fields) { + if (!Attribute.class.isAssignableFrom(field.getType())) { + continue; + } + + contextAttributes.add(field.getName()); + } + + CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, contextAttributes); + } + + CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap(); + + for (Class<? extends Context> contextClass: CONTEXT_CLASSES) { + Map<String, Attribute<?>> simpleAttributeNameToAttribute = Maps.newHashMap(); + CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, simpleAttributeNameToAttribute); + + Set<Attribute<Object>> attributes = AttributeInitializer.getAttributesNoSave(contextClass); + + LOG.debug("context class {} and attributes {}", contextClass, attributes); + + for (Attribute<Object> attribute: attributes) { + simpleAttributeNameToAttribute.put(AttributeParseUtils.getSimpleName(attribute), attribute); + } + } + } + + private ContextUtils() + { + } + + /** + * This method is only used for testing. + * + * @param contextClass + * @param attribute + */ + @VisibleForTesting + protected static void addAttribute(Class<? extends Context> contextClass, Attribute<?> attribute) + { + Set<String> attributeNames = CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass); + + if (attributeNames == null) { + attributeNames = Sets.newHashSet(); + CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, attributeNames); + } + + attributeNames.add(attribute.getSimpleName()); + + CONTEXT_CLASSES.add(contextClass); + Map<String, Attribute<?>> attributeMap = CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(contextClass); + + if (attributeMap == null) { + attributeMap = Maps.newHashMap(); + CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, attributeMap); + } + + attributeMap.put(attribute.getSimpleName(), attribute); + } + + /** + * This method is only used for testing. + * + * @param contextClass + * @param attribute + */ + @VisibleForTesting + protected static void removeAttribute(Class<? extends Context> contextClass, Attribute<?> attribute) + { + Set<String> attributeNames = CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass); + + if (attributeNames != null) { + attributeNames.remove(attribute.getSimpleName()); + + if (attributeNames.isEmpty()) { + CONTEXT_CLASS_TO_ATTRIBUTES.remove(contextClass); + } + } + + if (!CONTEXT_CLASS_TO_ATTRIBUTES.keySet().contains(contextClass)) { + CONTEXT_CLASSES.remove(contextClass); + } + + Map<String, Attribute<?>> attributeMap = CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(contextClass); + + if (attributeMap != null) { + attributeMap.remove(attribute.getSimpleName()); + + if (attributeMap.isEmpty()) { + CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.remove(contextClass); + } + } + } + + } + + /** + * Utility class that holds methods for parsing. + */ + protected static class AttributeParseUtils + { + public static final Set<String> ALL_SIMPLE_ATTRIBUTE_NAMES; + + static { + ALL_SIMPLE_ATTRIBUTE_NAMES = Sets.newHashSet(); + + initialize(); + } + + public static void initialize() + { + ALL_SIMPLE_ATTRIBUTE_NAMES.clear(); + + for (Map.Entry<Class<? extends Context>, Set<String>> entry: ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.entrySet()) { + ALL_SIMPLE_ATTRIBUTE_NAMES.addAll(entry.getValue()); + } + } + + private AttributeParseUtils() + { + } + + /** + * This method creates all the appropriate child {@link Conf}s of the given parent {@link Conf} and adds the given + * attribute to the parent {@link Conf} if appropriate as well as all the child {@link Conf}s of the parent if + * appropriate. + * + * @param conf The parent {@link Conf}. + * @param attributeName The simple name of the attribute to add. + * @param attrValue The value of the attribute. + */ + protected static void processAllConfsForAttribute(Conf conf, String attributeName, String attrValue) + { + ConfElement confElement = conf.getConfElement(); + + LOG.debug("Current confElement {} and name {}", confElement.getStramElement(), conf.getId()); + + if (confElement.getContextAttributes().contains(attributeName)) { + LOG.debug("Adding attribute"); + @SuppressWarnings("unchecked") + Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confElement.getContextClass()).get(attributeName); + conf.setAttribute(attr, attrValue); + } + + for (ConfElement childConfElement: confElement.getChildren()) { + + if (!childConfElement.getAllChildAttributes().contains(attributeName)) { + continue; + } + + Conf childConf = addConf(childConfElement.getStramElement(), WILDCARD, conf); + processAllConfsForAttribute(childConf, attributeName, attrValue); + } + } + + /** + * This extracts the name of an attribute from the given set of keys. + * + * @param element The {@link StramElement} corresponding to the current element being parsed. + * @param keys The split keys that are being parse. + * @param index The current key that the parser is on. + * @return The FQN name of an attribute or just the name of an Attribute. + */ + public static String getAttributeName(StramElement element, String[] keys, int index) + { + + if (element != null && element != StramElement.ATTR) { + throw new IllegalArgumentException("The given " + + StramElement.class + + " must either have a value of null or " + + StramElement.ATTR + + " but it had a value of " + element); + } + + String attributeName; + + if (element == StramElement.ATTR) { + attributeName = getCompleteKey(keys, index + 1); + } else { + attributeName = getCompleteKey(keys, index); + } + + return attributeName; + } + + /** + * This method checks to see if the attribute name is simple or is prefixed with the FQCN of the {@link Context} + * class which contains it. + * + * @param attributeName The attribute name to check. + * @return True if the attribute name is simple. False otherwise. + */ + public static boolean isSimpleAttributeName(String attributeName) + { + return !attributeName.contains(KEY_SEPARATOR); + } + + /** + * Gets the {@link Context} class that the given attributeName belongs to. + * + * @param attributeName The {@link Attribute} name whose {@link Context} class needs to be + * discovered. + * @return The {@link Context} class that the given {@link Attribute} name belongs to. + */ + @SuppressWarnings("unchecked") + public static Class<? extends Context> getContainingContextClass(String attributeName) + { + if (isSimpleAttributeName(attributeName)) { + throw new IllegalArgumentException("The given attribute name " + + attributeName + + " is simple."); + } + + LOG.debug("Attribute Name {}", attributeName); + + int lastSeparator = attributeName.lastIndexOf(KEY_SEPARATOR); + String contextClassName = attributeName.substring(0, lastSeparator); + + int lastPeriod = contextClassName.lastIndexOf(KEY_SEPARATOR); + + StringBuilder sb = new StringBuilder(contextClassName); + sb.setCharAt(lastPeriod, '$'); + contextClassName = sb.toString(); + + Class<? extends Context> contextClass; + + try { + Class<?> clazz = Class.forName(contextClassName); + + if (Context.class.isAssignableFrom(clazz)) { + contextClass = (Class<? extends Context>)clazz; + } else { + throw new IllegalArgumentException("The provided context class name " + + contextClassName + + " is not valid."); + } + } catch (ClassNotFoundException ex) { + throw new IllegalArgumentException(ex); + } + + String simpleAttributeName = getSimpleAttributeName(attributeName); + + if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(simpleAttributeName)) { + throw new ValidationException(simpleAttributeName + + " is not a valid attribute of " + + contextClass); + } + + return contextClass; + } + + /** + * This extract this simple {@link Attribute} name from the given {@link Attribute} name. + * + * @param attributeName The attribute name to extract a simple attribute name from. + * @return The simple attribute name. + */ + public static String getSimpleAttributeName(String attributeName) + { + if (isSimpleAttributeName(attributeName)) { + return attributeName; + } + + if (attributeName.endsWith(KEY_SEPARATOR)) { + throw new IllegalArgumentException("The given attribute name ends with \"" + + KEY_SEPARATOR + + "\" so a simple name cannot be extracted."); + } + + return attributeName.substring(attributeName.lastIndexOf(KEY_SEPARATOR) + 1, attributeName.length()); + } + + /** + * Gets the simple name of an {@link Attribute}, which does not include the FQCN of the {@link Context} class + * which contains it. + * + * @param attribute The {@link Attribute} of interest. + * @return The name of an {@link Attribute}. + */ + public static String getSimpleName(Attribute<?> attribute) + { + return getSimpleAttributeName(attribute.name); + } + + } + public class JSONObject2String implements StringCodec<Object>, Serializable { private static final long serialVersionUID = -664977453308585878L; @@ -132,11 +908,11 @@ public class LogicalPlanConfiguration { @Override public Object fromString(String jsonObj) { - + LOG.debug("JONString {}", jsonObj); ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer(); try { return mapper.readValue(jsonObj, Object.class); - } catch (Exception e) { + } catch (IOException e) { throw new RuntimeException("Error parsing json content", e); } } @@ -147,13 +923,16 @@ public class LogicalPlanConfiguration { ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer(); try { return mapper.writeValueAsString(pojo); - } catch (Exception e) { + } catch (IOException e) { throw new RuntimeException("Error writing object as json", e); } } - + } + /** + * This is an abstract class representing the configuration applied to an element in the DAG. + */ private static abstract class Conf { protected Conf parentConf = null; @@ -184,7 +963,7 @@ public class LogicalPlanConfiguration { @SuppressWarnings("unchecked") public <T extends Conf> T getAncestorConf(StramElement ancestorElement) { - if (getElement() == ancestorElement) { + if (getConfElement().getStramElement() == ancestorElement) { return (T)this; } if (parentConf == null) { @@ -221,7 +1000,7 @@ public class LogicalPlanConfiguration { } public <T extends Conf> List<T> getMatchingChildConf(String name, StramElement childType) { - List<T> childConfs = new ArrayList<T>(); + List<T> childConfs = new ArrayList<>(); Map<String, T> elChildren = getChildren(childType); for (Map.Entry<String, T> entry : elChildren.entrySet()) { String key = entry.getKey(); @@ -267,14 +1046,18 @@ public class LogicalPlanConfiguration { conf = declaredConstructor.newInstance(new Object[] {}); conf.setId(id); map.put(id, conf); - } catch (Exception e) { + } catch (IllegalAccessException | + IllegalArgumentException | + InstantiationException | + NoSuchMethodException | + SecurityException | + InvocationTargetException e) { LOG.error("Error instantiating configuration", e); } } return conf; } - //public abstract Conf getChild(String id, StramElement childType); public <T extends Conf> T getChild(String id, StramElement childType) { T conf = null; @SuppressWarnings("unchecked") @@ -290,7 +1073,7 @@ public class LogicalPlanConfiguration { // Always return non null so caller will not have to do extra check as expected Map<String, T> elChildren = (Map<String, T>)children.get(childType); if (elChildren == null) { - elChildren = new HashMap<String, T>(); + elChildren = Maps.newHashMap(); children.put(childType, elChildren); } return elChildren; @@ -301,10 +1084,6 @@ public class LogicalPlanConfiguration { public void parseElement(StramElement element, String[] keys, int index, String propertyValue) { } - public Class<? extends Context> getAttributeContextClass() { - return null; - } - public boolean isAllowedChild(StramElement childType) { StramElement[] childElements = getChildElements(); if (childElements != null) { @@ -318,7 +1097,7 @@ public class LogicalPlanConfiguration { } public StramElement getDefaultChildElement() { - if ((getAttributeContextClass() == null) && isAllowedChild(StramElement.PROP)) { + if ((getConfElement().getContextClass() == null) && isAllowedChild(StramElement.PROP)) { return StramElement.PROP; } return null; @@ -330,8 +1109,7 @@ public class LogicalPlanConfiguration { public abstract StramElement[] getChildElements(); - public abstract StramElement getElement(); - + public abstract ConfElement getConfElement(); } private static class StramConf extends Conf { @@ -345,21 +1123,20 @@ public class LogicalPlanConfiguration { } @Override - public StramElement getElement() + public StramElement[] getChildElements() { - return null; + return CHILD_ELEMENTS; } @Override - public StramElement[] getChildElements() + public ConfElement getConfElement() { - return CHILD_ELEMENTS; + return ConfElement.STRAM; } - } /** - * App configuration + * This holds the configuration information for an Apex application. */ private static class AppConf extends Conf { @@ -372,12 +1149,6 @@ public class LogicalPlanConfiguration { } @Override - public StramElement getElement() - { - return StramElement.APPLICATION; - } - - @Override public void parseElement(StramElement element, String[] keys, int index, String propertyValue) { if ((element == StramElement.CLASS) || (element == StramElement.PATH)) { StramConf stramConf = getParentConf(); @@ -391,17 +1162,17 @@ public class LogicalPlanConfiguration { return CHILD_ELEMENTS; } - @Override - public Class<? extends Context> getAttributeContextClass() - { - return Context.DAGContext.class; - } @Override public StramElement getDefaultChildElement() { return StramElement.PROP; } + @Override + public ConfElement getConfElement() + { + return ConfElement.APPLICATION; + } } private static class GatewayConf extends Conf { @@ -419,11 +1190,10 @@ public class LogicalPlanConfiguration { } @Override - public StramElement getElement() + public ConfElement getConfElement() { - return StramElement.GATEWAY; + return ConfElement.GATEWAY; } - } /** @@ -444,11 +1214,10 @@ public class LogicalPlanConfiguration { return CHILD_ELEMENTS; } - @Override - public StramElement getElement() + public ConfElement getConfElement() { - return StramElement.TEMPLATE; + return ConfElement.TEMPLATE; } @Override @@ -472,23 +1241,23 @@ public class LogicalPlanConfiguration { } /** - * + * This holds the configuration information for a stream that connects two operators in an Apex application. */ private static class StreamConf extends Conf { private static final StramElement[] CHILD_ELEMENTS = new StramElement[] {StramElement.TEMPLATE, StramElement.PROP}; private OperatorConf sourceNode; - private final Set<OperatorConf> targetNodes = new HashSet<OperatorConf>(); + private final Set<OperatorConf> targetNodes = new HashSet<>(); @SuppressWarnings("unused") StreamConf() { } @Override - public StramElement getElement() + public ConfElement getConfElement() { - return StramElement.STREAM; + return ConfElement.STREAM; } /** @@ -534,7 +1303,6 @@ public class LogicalPlanConfiguration { throw new IllegalArgumentException("Duplicate " + name); } String[] parts = getNodeAndPortId(value); - //setSource(parts[1], getOrAddOperator(appConf, parts[0])); setSource(parts[1], appConf.getOrAddChild(parts[0], StramElement.OPERATOR, OperatorConf.class)); } else if (STREAM_SINKS.equals(name)) { String[] targetPorts = value.split(","); @@ -575,7 +1343,7 @@ public class LogicalPlanConfiguration { } /** - * + * This is a simple extension of {@link java.util.Properties} which allows you to specify default properties. */ private static class PropertiesWithModifiableDefaults extends Properties { private static final long serialVersionUID = -4675421720308249982L; @@ -589,7 +1357,7 @@ public class LogicalPlanConfiguration { } /** - * Operator configuration + * This holds the configuration information for an operator in an Apex application. */ private static class OperatorConf extends Conf { @@ -599,14 +1367,14 @@ public class LogicalPlanConfiguration { @SuppressWarnings("unused") OperatorConf() { } - private final Map<String, StreamConf> inputs = new HashMap<String, StreamConf>(); - private final Map<String, StreamConf> outputs = new HashMap<String, StreamConf>(); + private final Map<String, StreamConf> inputs = Maps.newHashMap(); + private final Map<String, StreamConf> outputs = Maps.newHashMap(); private String templateRef; @Override - public StramElement getElement() + public ConfElement getConfElement() { - return StramElement.OPERATOR; + return ConfElement.OPERATOR; } @Override @@ -651,12 +1419,6 @@ public class LogicalPlanConfiguration { return StramElement.PROP; } - @Override - public Class<? extends Context> getAttributeContextClass() - { - return OperatorContext.class; - } - /** * * @return String @@ -671,7 +1433,7 @@ public class LogicalPlanConfiguration { } /** - * Port configuration + * This holds the configuration information for a port on an operator in an Apex application. */ private static class PortConf extends Conf { @@ -682,23 +1444,16 @@ public class LogicalPlanConfiguration { } @Override - public StramElement getElement() - { - return StramElement.PORT; - } - - @Override public StramElement[] getChildElements() { return CHILD_ELEMENTS; } @Override - public Class<? extends Context> getAttributeContextClass() + public ConfElement getConfElement() { - return PortContext.class; + return ConfElement.PORT; } - } private static final Map<StramElement, Class<? extends Conf>> elementMaps = Maps.newHashMap(); @@ -715,8 +1470,8 @@ public class LogicalPlanConfiguration { elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class); } - private Conf getConf(StramElement element, Conf ancestorConf) { - if (element == ancestorConf.getElement()) { + private static Conf getConf(StramElement element, Conf ancestorConf) { + if (element == ancestorConf.getConfElement().getStramElement()) { return ancestorConf; } // If top most element is reached and didnt match ancestor conf @@ -724,13 +1479,13 @@ public class LogicalPlanConfiguration { if (element == null) { return null; } - StramElement parentElement = getAllowedParentElement(element, ancestorConf); + StramElement parentElement = ConfElement.getAllowedParentConf(element); Conf parentConf = getConf(parentElement, ancestorConf); return parentConf.getOrAddChild(WILDCARD, element, elementMaps.get(element)); } - private Conf addConf(StramElement element, String name, Conf ancestorConf) { - StramElement parentElement = getAllowedParentElement(element, ancestorConf); + private static Conf addConf(StramElement element, String name, Conf ancestorConf) { + StramElement parentElement = ConfElement.getAllowedParentConf(element); Conf conf1 = null; Conf parentConf = getConf(parentElement, ancestorConf); if (parentConf != null) { @@ -739,28 +1494,8 @@ public class LogicalPlanConfiguration { return conf1; } - private StramElement getAllowedParentElement(StramElement element, Conf ancestorConf) { - StramElement parentElement = null; - if ((element == StramElement.APPLICATION)) { - parentElement = null; - } else if ((element == StramElement.GATEWAY) || (element == StramElement.OPERATOR) || (element == StramElement.STREAM)) { - parentElement = StramElement.APPLICATION; - } else if ((element == StramElement.PORT) || (element == StramElement.INPUT_PORT) || (element == StramElement.OUTPUT_PORT)) { - parentElement = StramElement.OPERATOR; - } else if (element == StramElement.TEMPLATE) { - parentElement = null; - } - return parentElement; - } - - /* - private boolean isApplicationTypeConf(Conf conf) { - return (conf.getElement() == null) || (conf.getElement() == StramElement.APPLICATION); - } - */ - private <T extends Conf> List<T> getMatchingChildConf(List<? extends Conf> confs, String name, StramElement childType) { - List<T> childConfs = new ArrayList<T>(); + List<T> childConfs = Lists.newArrayList(); for (Conf conf1 : confs) { List<T> matchingConfs = conf1.getMatchingChildConf(name, childType); childConfs.addAll(matchingConfs); @@ -813,7 +1548,7 @@ public class LogicalPlanConfiguration { public String getAppAlias(String appPath) { String appAlias; if (appPath.endsWith(CLASS_SUFFIX)) { - appPath = appPath.replace("/", ".").substring(0, appPath.length() - CLASS_SUFFIX.length()); + appPath = appPath.replace("/", KEY_SEPARATOR).substring(0, appPath.length() - CLASS_SUFFIX.length()); } appAlias = stramConf.appAliases.get(appPath); if (appAlias == null) { @@ -836,11 +1571,11 @@ public class LogicalPlanConfiguration { JSONArray operatorArray = json.getJSONArray("operators"); for (int i = 0; i < operatorArray.length(); i++) { JSONObject operator = operatorArray.getJSONObject(i); - String operatorPrefix = StreamingApplication.DT_PREFIX + StramElement.OPERATOR.getValue() + "." + operator.getString("name") + "."; + String operatorPrefix = StreamingApplication.DT_PREFIX + StramElement.OPERATOR.getValue() + KEY_SEPARATOR + operator.getString("name") + "."; prop.setProperty(operatorPrefix + "classname", operator.getString("class")); JSONObject operatorProperties = operator.optJSONObject("properties"); if (operatorProperties != null) { - String propertiesPrefix = operatorPrefix + StramElement.PROP.getValue() + "."; + String propertiesPrefix = operatorPrefix + StramElement.PROP.getValue() + KEY_SEPARATOR; @SuppressWarnings("unchecked") Iterator<String> iter = operatorProperties.keys(); while (iter.hasNext()) { @@ -850,7 +1585,7 @@ public class LogicalPlanConfiguration { } JSONObject operatorAttributes = operator.optJSONObject("attributes"); if (operatorAttributes != null) { - String attributesPrefix = operatorPrefix + StramElement.ATTR.getValue() + "."; + String attributesPrefix = operatorPrefix + StramElement.ATTR.getValue() + KEY_SEPARATOR; @SuppressWarnings("unchecked") Iterator<String> iter = operatorAttributes.keys(); while (iter.hasNext()) { @@ -860,12 +1595,12 @@ public class LogicalPlanConfiguration { } JSONArray portArray = operator.optJSONArray("ports"); if (portArray != null) { - String portsPrefix = operatorPrefix + StramElement.PORT.getValue() + "."; + String portsPrefix = operatorPrefix + StramElement.PORT.getValue() + KEY_SEPARATOR; for (int j = 0; j < portArray.length(); j++) { JSONObject port = portArray.getJSONObject(j); JSONObject portAttributes = port.optJSONObject("attributes"); if (portAttributes != null) { - String portAttributePrefix = portsPrefix + port.getString("name") + "." + StramElement.ATTR.getValue() + "."; + String portAttributePrefix = portsPrefix + port.getString("name") + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR; @SuppressWarnings("unchecked") Iterator<String> iter = portAttributes.keys(); while (iter.hasNext()) { @@ -876,10 +1611,10 @@ public class LogicalPlanConfiguration { } } } - + JSONObject appAttributes = json.optJSONObject("attributes"); if (appAttributes != null) { - String attributesPrefix = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + "."; + String attributesPrefix = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + KEY_SEPARATOR; @SuppressWarnings("unchecked") Iterator<String> iter = appAttributes.keys(); while (iter.hasNext()) { @@ -892,9 +1627,9 @@ public class LogicalPlanConfiguration { for (int i = 0; i < streamArray.length(); i++) { JSONObject stream = streamArray.getJSONObject(i); String name = stream.optString("name", "stream-" + i); - String streamPrefix = StreamingApplication.DT_PREFIX + StramElement.STREAM.getValue() + "." + name + "."; + String streamPrefix = StreamingApplication.DT_PREFIX + StramElement.STREAM.getValue() + KEY_SEPARATOR + name + KEY_SEPARATOR; JSONObject source = stream.getJSONObject("source"); - prop.setProperty(streamPrefix + STREAM_SOURCE, source.getString("operatorName") + "." + source.getString("portName")); + prop.setProperty(streamPrefix + STREAM_SOURCE, source.getString("operatorName") + KEY_SEPARATOR + source.getString("portName")); JSONArray sinks = stream.getJSONArray("sinks"); StringBuilder sinkPropertyValue = new StringBuilder(); for (int j = 0; j < sinks.length(); j++) { @@ -902,7 +1637,7 @@ public class LogicalPlanConfiguration { sinkPropertyValue.append(","); } JSONObject sink = sinks.getJSONObject(j); - sinkPropertyValue.append(sink.getString("operatorName")).append(".").append(sink.getString("portName")); + sinkPropertyValue.append(sink.getString("operatorName")).append(KEY_SEPARATOR).append(sink.getString("portName")); } prop.setProperty(streamPrefix + STREAM_SINKS, sinkPropertyValue.toString()); String locality = stream.optString("locality", null); @@ -937,13 +1672,21 @@ public class LogicalPlanConfiguration { String propertyValue = props.getProperty(propertyName); this.properties.setProperty(propertyName, propertyValue); if (propertyName.startsWith(StreamingApplication.DT_PREFIX)) { - String[] keyComps = propertyName.split("\\."); + String[] keyComps = propertyName.split(KEY_SEPARATOR_SPLIT_REGEX); parseStramPropertyTokens(keyComps, 1, propertyName, propertyValue, stramConf); } } return this; } + /** + * This method is used to parse an Apex property name. + * @param keys The keys into which an Apex property is split into. + * @param index The current index that the parser is on for processing the property name. + * @param propertyName The original unsplit Apex property name. + * @param propertyValue The value corresponding to the Apex property. + * @param conf + */ private void parseStramPropertyTokens(String[] keys, int index, String propertyName, String propertyValue, Conf conf) { if (index < keys.length) { String key = keys[index]; @@ -973,14 +1716,65 @@ public class LogicalPlanConfiguration { LOG.error("Invalid configuration key: {}", propertyName); } } else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) { - if (conf.getElement() == null) { + String attributeName = AttributeParseUtils.getAttributeName(element, keys, index); + + if (element != StramElement.ATTR) { + String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName; + LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName); + } + + if (conf.getConfElement().getStramElement() == null) { conf = addConf(StramElement.APPLICATION, WILDCARD, conf); } + if (conf != null) { - // Supporting current implementation where attribute can be directly specified under stram - // Re-composing complete key for nested keys which are used in templates - // Implement it better way to not pre-tokenize the property string and parse progressively - parseAttribute(conf, keys, index, element, propertyValue); + if (AttributeParseUtils.isSimpleAttributeName(attributeName)) { + //The provided attribute name was a simple name + + if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) { + throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0)); + } + + if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) { + throw new ValidationException(attributeName + + " is not defined for the " + + conf.getConfElement().getStramElement() + + " or any of its child configurations."); + } + + if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) { + //If the attribute name is ambiguous at this configuration level we should tell the user. + LOG.warn("The attribute " + + attributeName + + " is ambiguous when specified on an " + conf.getConfElement().getStramElement()); + } + + if (conf.getConfElement().getContextAttributes().contains(attributeName)) { + @SuppressWarnings("unchecked") + Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName); + conf.setAttribute(attr, propertyValue); + } else { + AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue); + } + } else { + //This is a FQ attribute name + Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName); + + //Convert to a simple name + attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName); + + if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) { + throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName()); + } + + ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass); + + conf = ConfElement.addConfs(conf, confWithAttr); + + @SuppressWarnings("unchecked") + Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName); + conf.setAttribute(attr, propertyValue); + } } else { LOG.error("Invalid configuration key: {}", propertyName); } @@ -992,12 +1786,6 @@ public class LogicalPlanConfiguration { prop = getCompleteKey(keys, index+1); } else { prop = getCompleteKey(keys, index); - /* - if (conf.getAttributeContextClass() != null) { - LOG.warn("Please specify the property {} using the {} keyword as {}", prop, StramElement.PROP.getValue(), - getCompleteKey(keys, 0, index) + "." + StramElement.PROP.getValue() + "." + getCompleteKey(keys, index)); - } - */ } if (prop != null) { conf.setProperty(prop, propertyValue); @@ -1023,15 +1811,30 @@ public class LogicalPlanConfiguration { return element; } - private String getCompleteKey(String[] keys, int start) { + /** + * This constructs a string from the keys in the given keys array starting from + * the start index inclusive until the end of the array. + * @param keys The keys from which to construct a string. + * @param start The token to start creating a string from. + * @return The completed key. + */ + private static String getCompleteKey(String[] keys, int start) { return getCompleteKey(keys, start, keys.length); } - private String getCompleteKey(String[] keys, int start, int end) { + /** + * This constructs a string from the keys in the given keys array starting from + * the start index inclusive until the specified end index exclusive. + * @param keys The keys from which to construct a string. + * @param start The token to start creating a string from. + * @param end 1 + the last index to include in the concatenation. + * @return The completed key. + */ + private static String getCompleteKey(String[] keys, int start, int end) { StringBuilder sb = new StringBuilder(1024); for (int i = start; i < end; ++i) { if (i > start) { - sb.append("."); + sb.append(KEY_SEPARATOR); } sb.append(keys[i]); } @@ -1099,7 +1902,7 @@ public class LogicalPlanConfiguration { Map<String, OperatorConf> operators = appConf.getChildren(StramElement.OPERATOR); - Map<OperatorConf, Operator> nodeMap = new HashMap<OperatorConf, Operator>(operators.size()); + Map<OperatorConf, Operator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size()); // add all operators first for (Map.Entry<String, OperatorConf> nodeConfEntry : operators.entrySet()) { OperatorConf nodeConf = nodeConfEntry.getValue(); @@ -1117,7 +1920,7 @@ public class LogicalPlanConfiguration { nd = dag.addOperator(nodeConfEntry.getKey(), nodeClass); } setOperatorProperties(nd, nodeConf.getProperties()); - } catch (Exception e) { + } catch (IOException e) { throw new IllegalArgumentException("Error setting operator properties " + e.getMessage(), e); } nodeMap.put(nodeConf, nd); @@ -1178,9 +1981,9 @@ public class LogicalPlanConfiguration { /** * Populate the logical plan from the streaming application definition and configuration. * Configuration is resolved based on application alias, if any. - * @param app - * @param dag - * @param name + * @param app The {@lin StreamingApplication} to be run. + * @param dag This will hold the {@link LogicalPlan} representation of the given {@link StreamingApplication}. + * @param name The path of the application class in the jar. */ public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name) { @@ -1211,10 +2014,6 @@ public class LogicalPlanConfiguration { return props; } - private String getSimpleName(Attribute<?> attribute) { - return attribute.name.substring(attribute.name.lastIndexOf('.')+1); - } - /** * Get the configuration opProps for the given operator. * These can be operator specific settings or settings from matching templates. @@ -1229,7 +2028,7 @@ public class LogicalPlanConfiguration { } private Map<String,String> getApplicationProperties(List<AppConf> appConfs){ - Map<String, String> appProps = new HashMap<String, String>(); + Map<String, String> appProps = Maps.newHashMap(); // Apply the configurations in reverse order since the higher priority ones are at the beginning for(int i = appConfs.size()-1; i >= 0; i--){ AppConf conf1 = appConfs.get(i); @@ -1246,7 +2045,7 @@ public class LogicalPlanConfiguration { */ private Map<String, String> getProperties(OperatorMeta ow, List<OperatorConf> opConfs, String appName) { - Map<String, String> opProps = new HashMap<String, String>(); + Map<String, String> opProps = Maps.newHashMap(); Map<String, TemplateConf> templates = stramConf.getChildren(StramElement.TEMPLATE); // list of all templates that match operator, ordered by priority if (!templates.isEmpty()) { @@ -1269,12 +2068,11 @@ public class LogicalPlanConfiguration { Conf conf1 = opConfs.get(i); opProps.putAll(Maps.fromProperties(conf1.properties)); } - //properties.remove(OPERATOR_CLASSNAME); return opProps; } private List<TemplateConf> getDirectTemplates(List<OperatorConf> opConfs, Map<String, TemplateConf> templates) { - List<TemplateConf> refTemplates = new ArrayList<TemplateConf>(); + List<TemplateConf> refTemplates = Lists.newArrayList(); for (TemplateConf t : templates.values()) { for (OperatorConf opConf : opConfs) { if (t.id.equals(opConf.templateRef)) { @@ -1293,13 +2091,8 @@ public class LogicalPlanConfiguration { * @return TreeMap<Integer, TemplateConf> */ private TreeMap<Integer, TemplateConf> getMatchingTemplates(OperatorMeta ow, String appName, Map<String, TemplateConf> templates) { - TreeMap<Integer, TemplateConf> tm = new TreeMap<Integer, TemplateConf>(); + TreeMap<Integer, TemplateConf> tm = Maps.newTreeMap(); for (TemplateConf t : templates.values()) { - /*if (t.id == nodeConf.templateRef) { - // directly assigned applies last - tm.put(1, t); - continue; - } else*/ if ((t.idRegExp != null && ow.getName().matches(t.idRegExp))) { tm.put(1, t); } else if (appName != null && t.appNameRegExp != null @@ -1326,10 +2119,7 @@ public class LogicalPlanConfiguration { BeanUtils.populate(operator, properties); return operator; } - catch (IllegalAccessException e) { - throw new IllegalArgumentException("Error setting operator properties", e); - } - catch (InvocationTargetException e) { + catch (IllegalAccessException | InvocationTargetException e) { throw new IllegalArgumentException("Error setting operator properties", e); } } @@ -1340,10 +2130,7 @@ public class LogicalPlanConfiguration { BeanUtils.populate(application, properties); return application; } - catch (IllegalAccessException e) { - throw new IllegalArgumentException("Error setting application properties", e); - } - catch (InvocationTargetException e) { + catch (IllegalAccessException | InvocationTargetException e) { throw new IllegalArgumentException("Error setting application properties", e); } } @@ -1369,21 +2156,7 @@ public class LogicalPlanConfiguration { setOperatorProperties(ow.getOperator(), opProps); } } -/* - private static final Map<String, Attribute<?>> legacyKeyMap = Maps.newHashMap(); - static { - legacyKeyMap.put("appName", Context.DAGContext.APPLICATION_NAME); - legacyKeyMap.put("libjars", Context.DAGContext.LIBRARY_JARS); - legacyKeyMap.put("maxContainers", Context.DAGContext.CONTAINERS_MAX_COUNT); - legacyKeyMap.put("containerMemoryMB", Context.DAGContext.CONTAINER_MEMORY_MB); - legacyKeyMap.put("containerJvmOpts", Context.DAGContext.CONTAINER_JVM_OPTIONS); - legacyKeyMap.put("masterMemoryMB", Context.DAGContext.MASTER_MEMORY_MB); - legacyKeyMap.put("windowSizeMillis", Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); - legacyKeyMap.put("appPath", Context.DAGContext.APPLICATION_PATH); - legacyKeyMap.put("allocateResourceTimeoutMillis", Context.DAGContext.RESOURCE_ALLOCATION_TIMEOUT_MILLIS); - } -*/ /** * Set the application configuration. * @param dag @@ -1397,18 +2170,7 @@ public class LogicalPlanConfiguration { } private void setApplicationConfiguration(final LogicalPlan dag, List<AppConf> appConfs,StreamingApplication app) { - // Make the gateway address available as an application attribute -// for (Conf appConf : appConfs) { -// Conf gwConf = appConf.getChild(null, StramElement.GATEWAY); -// if (gwConf != null) { -// String gatewayAddress = gwConf.properties.getProperty(GATEWAY_LISTEN_ADDRESS_PROP); -// if (gatewayAddress != null) { -// dag.setAttribute(DAGContext.GATEWAY_CONNECT_ADDRESS, gatewayAddress); -// break; -// } -// } -// } - setAttributes(Context.DAGContext.class, appConfs, dag.getAttributes()); + setAttributes(appConfs, dag.getAttributes()); if (app != null) { Map<String, String> appProps = getApplicationProperties(appConfs); setApplicationProperties(app, appProps); @@ -1420,7 +2182,7 @@ public class LogicalPlanConfiguration { List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR); // Set the operator attributes - setAttributes(OperatorContext.class, opConfs, ow.getAttributes()); + setAttributes(opConfs, ow.getAttributes()); // Set the operator opProps Map<String, String> opProps = getProperties(ow, opConfs, appName); setOperatorProperties(ow.getOperator(), opProps); @@ -1432,7 +2194,7 @@ public class LogicalPlanConfiguration { // Add the generic port attributes as well List<PortConf> portConfs = getMatchingChildConf(opConfs, im.getPortName(), StramElement.PORT); inPortConfs.addAll(portConfs); - setAttributes(PortContext.class, inPortConfs, im.getAttributes()); + setAttributes(inPortConfs, im.getAttributes()); } for (Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : ow.getOutputStreams().entrySet()) { @@ -1441,7 +2203,7 @@ public class LogicalPlanConfiguration { // Add the generic port attributes as well List<PortConf> portConfs = getMatchingChildConf(opConfs, om.getPortName(), StramElement.PORT); outPortConfs.addAll(portConfs); - setAttributes(PortContext.class, outPortConfs, om.getAttributes()); + setAttributes(outPortConfs, om.getAttributes()); } ow.populateAggregatorMeta(); } @@ -1460,36 +2222,7 @@ public class LogicalPlanConfiguration { } } - private final Map<Class<? extends Context>, Map<String, Attribute<Object>>> attributeMap = Maps.newHashMap(); - - private void parseAttribute(Conf conf, String[] keys, int index, StramElement element, String attrValue) - { - String configKey = (element == StramElement.ATTR) ? getCompleteKey(keys, index + 1) : getCompleteKey(keys, index); - boolean isDeprecated = false; - Class<? extends Context> clazz = conf.getAttributeContextClass(); - Map<String, Attribute<Object>> m = attributeMap.get(clazz); - if (!attributeMap.containsKey(clazz)) { - Set<Attribute<Object>> attributes = AttributeInitializer.getAttributes(clazz); - m = Maps.newHashMapWithExpectedSize(attributes.size()); - for (Attribute<Object> attr : attributes) { - m.put(getSimpleName(attr), attr); - } - attributeMap.put(clazz, m); - } - Attribute<Object> attr = m.get(configKey); - if (attr == null) { - throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0)); - } - - if (element != StramElement.ATTR || isDeprecated) { - String expName = getCompleteKey(keys, 0, index) + "." + StramElement.ATTR.getValue() + "." + getSimpleName(attr); - LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName); - } - - conf.setAttribute(attr, attrValue); - } - - private void setAttributes(Class<?> clazz, List<? extends Conf> confs, Attribute.AttributeMap attributeMap) { + private void setAttributes(List<? extends Conf> confs, Attribute.AttributeMap attributeMap) { Set<Attribute<Object>> processedAttributes = Sets.newHashSet(); //json object codec for complex attributes JSONObject2String jsonCodec = new JSONObject2String();
