Repository: incubator-apex-core Updated Branches: refs/heads/master c77ea114e -> da46ec186
APEXCORE-107 Support for adding modules in the DAG property file and json file. Change module API to make module as an operator. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d4f3a506 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d4f3a506 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d4f3a506 Branch: refs/heads/master Commit: d4f3a506aa200c91a0dc6698645415d54dd67562 Parents: c2903da Author: Tushar R. Gosavi <tushargos...@gmail.com> Authored: Thu May 5 03:24:48 2016 +0530 Committer: Tushar R. Gosavi <tushargos...@gmail.com> Committed: Thu May 5 03:24:48 2016 +0530 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 20 +- .../main/java/com/datatorrent/api/Module.java | 3 +- .../main/java/com/datatorrent/api/Operator.java | 3 +- .../stram/codec/LogicalPlanSerializer.java | 2 +- .../stram/plan/logical/LogicalPlan.java | 198 +++++++------------ .../plan/logical/LogicalPlanConfiguration.java | 76 ++++--- .../stram/plan/logical/Operators.java | 3 +- .../stram/webapp/StramWebServices.java | 2 +- .../plan/logical/module/ModuleAppTest.java | 2 +- .../logical/module/TestModuleExpansion.java | 73 +++++-- .../src/test/resources/testModuleTopology.json | 141 +++++++++++++ .../resources/testModuleTopology.properties | 62 ++++++ 12 files changed, 386 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 74448fd..1518fcf 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -159,14 +159,6 @@ public interface DAG extends DAGContext, Serializable public OutputPortMeta getMeta(Operator.OutputPort<?> port); } - @InterfaceStability.Evolving - interface ModuleMeta extends Serializable, Context - { - String getName(); - - Module getModule(); - } - /** * Add new instance of operator under given name to the DAG. * The operator class must have a default constructor. @@ -272,15 +264,17 @@ public interface DAG extends DAGContext, Serializable */ public abstract OperatorMeta getOperatorMeta(String operatorId); - @InterfaceStability.Evolving - ModuleMeta getModuleMeta(String moduleId); - /** * <p>getMeta.</p> */ public abstract OperatorMeta getMeta(Operator operator); - @InterfaceStability.Evolving - ModuleMeta getMeta(Module module); + /** + * Marker interface for the Node in the DAG. Any object which can be added as a Node in the DAG + * needs to implement this interface. + */ + interface GenericOperator + { + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/api/src/main/java/com/datatorrent/api/Module.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Module.java b/api/src/main/java/com/datatorrent/api/Module.java index 8a85d8b..d93d16f 100644 --- a/api/src/main/java/com/datatorrent/api/Module.java +++ b/api/src/main/java/com/datatorrent/api/Module.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG.GenericOperator; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.Operator.Unifier; @@ -36,7 +37,7 @@ import com.datatorrent.api.Operator.Unifier; * @since 3.3.0 */ @InterfaceStability.Evolving -public interface Module +public interface Module extends GenericOperator { void populateDAG(DAG dag, Configuration conf); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/api/src/main/java/com/datatorrent/api/Operator.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java index d4a6a90..c016799 100644 --- a/api/src/main/java/com/datatorrent/api/Operator.java +++ b/api/src/main/java/com/datatorrent/api/Operator.java @@ -20,6 +20,7 @@ package com.datatorrent.api; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG.GenericOperator; /** * <p> @@ -27,7 +28,7 @@ import com.datatorrent.api.Context.PortContext; * * @since 0.3.2 */ -public interface Operator extends Component<OperatorContext> +public interface Operator extends Component<OperatorContext>, GenericOperator { /** * One can set attribute on an Operator to indicate the mode in which it processes Tuples. http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java index 9139566..6607321 100644 --- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java +++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java @@ -362,7 +362,7 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> Map<String, Object> moduleDetailMap = new HashMap<>(); ArrayList<String> operatorArray = new ArrayList<>(); moduleDetailMap.put("name", moduleMeta.getName()); - moduleDetailMap.put("className", moduleMeta.getModule().getClass().getName()); + moduleDetailMap.put("className", moduleMeta.getGenericOperator().getClass().getName()); moduleDetailMap.put("operators", operatorArray); for (OperatorMeta operatorMeta : moduleMeta.getDag().getAllOperators()) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 15969b7..af6b1bc 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -63,13 +63,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.io.input.ClassLoaderObjectInputStream; +import org.apache.commons.lang.ClassUtils; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Sets; import com.datatorrent.api.AffinityRule; @@ -195,7 +195,6 @@ public class LogicalPlan implements Serializable, DAG private final List<OperatorMeta> rootOperators = new ArrayList<>(); private final List<OperatorMeta> leafOperators = new ArrayList<>(); private final Attribute.AttributeMap attributes = new DefaultAttributeMap(); - private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>(); @Override public Attribute.AttributeMap getAttributes() @@ -489,6 +488,10 @@ public class LogicalPlan implements Serializable, DAG @Override public StreamMeta setSource(Operator.OutputPort<?> port) { + if (port instanceof ProxyOutputPort) { + proxySource = port; + return this; + } OutputPortMeta portMeta = assertGetPortMeta(port); OperatorMeta om = portMeta.getOperatorMeta(); if (om.outputStreams.containsKey(portMeta)) { @@ -508,6 +511,10 @@ public class LogicalPlan implements Serializable, DAG @Override public StreamMeta addSink(Operator.InputPort<?> port) { + if (port instanceof ProxyInputPort) { + proxySinks.add(port); + return this; + } InputPortMeta portMeta = assertGetPortMeta(port); OperatorMeta om = portMeta.getOperatorWrapper(); String portName = portMeta.getPortName(); @@ -771,12 +778,40 @@ public class LogicalPlan implements Serializable, DAG removeOperator(persistOpMeta.getOperator()); } } + + private OutputPort<?> proxySource = null; + private List<InputPort<?>> proxySinks = new ArrayList<>(); + + /** + * Go over each Proxy port and find out the actual port connected to the ProxyPort + * and update StreamMeta. + */ + private void resolvePorts() + { + if (proxySource != null && proxySource instanceof ProxyOutputPort) { + OutputPort<?> outputPort = proxySource; + while (outputPort instanceof ProxyOutputPort) { + outputPort = ((ProxyOutputPort<?>)outputPort).get(); + } + setSource(outputPort); + } + + for (InputPort<?> inputPort : proxySinks) { + while (inputPort instanceof ProxyInputPort) { + inputPort = ((ProxyInputPort<?>)inputPort).get(); + } + addSink(inputPort); + } + + proxySource = null; + proxySinks.clear(); + } } /** * Operator meta object. */ - public final class OperatorMeta implements DAG.OperatorMeta, Serializable + public class OperatorMeta implements DAG.OperatorMeta, Serializable { private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>(); private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>(); @@ -789,7 +824,7 @@ public class LogicalPlan implements Serializable, DAG private final LogicalOperatorStatus status; private transient Integer nindex; // for cycle detection private transient Integer lowlink; // for cycle detection - private transient Operator operator; + private transient GenericOperator operator; private MetricAggregatorMeta metricAggregatorMeta; private String moduleName; // Name of the module which has this operator. null if this is a top level operator. @@ -799,13 +834,14 @@ public class LogicalPlan implements Serializable, DAG * other value => represents the root oio node for this node */ private transient Integer oioRoot = null; + private ClassUtils genricOperator; - private OperatorMeta(String name, Operator operator) + private OperatorMeta(String name, GenericOperator operator) { this(name, operator, new DefaultAttributeMap()); } - private OperatorMeta(String name, Operator operator, Attribute.AttributeMap attributeMap) + private OperatorMeta(String name, GenericOperator operator, Attribute.AttributeMap attributeMap) { LOG.debug("Initializing {} as {}", name, operator.getClass().getName()); this.operatorAnnotation = operator.getClass().getAnnotation(OperatorAnnotation.class); @@ -858,7 +894,7 @@ public class LogicalPlan implements Serializable, DAG input.defaultReadObject(); // TODO: not working because - we don't have the storage agent in parent attribuet map //operator = (Operator)getValue2(OperatorContext.STORAGE_AGENT).load(id, Checkpoint.STATELESS_CHECKPOINT_WINDOW_ID); - operator = (Operator)FSStorageAgent.retrieve(input); + operator = (GenericOperator)FSStorageAgent.retrieve(input); } @Override @@ -1000,6 +1036,7 @@ public class LogicalPlan implements Serializable, DAG } } + private class PortMapping implements Operators.OperatorDescriptor { private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = new HashMap<>(); @@ -1113,6 +1150,11 @@ public class LogicalPlan implements Serializable, DAG @Override public Operator getOperator() { + return (Operator)operator; + } + + public GenericOperator getGenericOperator() + { return operator; } @@ -1198,90 +1240,28 @@ public class LogicalPlan implements Serializable, DAG /** * Module meta object. */ - public final class ModuleMeta implements DAG.ModuleMeta, Serializable + public final class ModuleMeta extends OperatorMeta implements Serializable { - private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>(); - private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>(); - private final Attribute.AttributeMap attributes; - @NotNull - private String name; - private transient Module module; private ModuleMeta parent; private LogicalPlan dag = null; private transient String fullName; + //type-casted reference to the module. + private transient Module module; + private transient boolean flattened = false; private ModuleMeta(String name, Module module) { - LOG.debug("Initializing {} as {}", name, module.getClass().getName()); - this.name = name; + super(name, module); this.module = module; - this.attributes = new DefaultAttributeMap(); + LOG.debug("Initializing {} as {}", name, module.getClass().getName()); this.dag = new LogicalPlan(); } - @Override - public String getName() - { - return name; - } - - @Override - public Module getModule() - { - return module; - } - - @Override - public Attribute.AttributeMap getAttributes() - { - return attributes; - } - - @Override - public <T> T getValue(Attribute<T> key) - { - return attributes.get(key); - } - - @Override - public void setCounters(Object counters) - { - - } - - @Override - public void sendMetrics(Collection<String> metricNames) - { - - } - - public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams() - { - return inputStreams; - } - - public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams() - { - return outputStreams; - } - public LogicalPlan getDag() { return dag; } - private void writeObject(ObjectOutputStream out) throws IOException - { - out.defaultWriteObject(); - FSStorageAgent.store(out, module); - } - - private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException - { - input.defaultReadObject(); - module = (Module)FSStorageAgent.retrieve(input); - } - /** * Expand the module and add its operator to the parentDAG. After this method finishes the module is expanded fully * with all its submodules also expanded. The parentDAG contains the operator added by all the modules. @@ -1291,6 +1271,10 @@ public class LogicalPlan implements Serializable, DAG */ public void flattenModule(LogicalPlan parentDAG, Configuration conf) { + if (flattened) { + return; + } + module.populateDAG(dag, conf); for (ModuleMeta subModuleMeta : dag.getAllModules()) { subModuleMeta.setParent(this); @@ -1298,6 +1282,7 @@ public class LogicalPlan implements Serializable, DAG } dag.applyStreamLinks(); parentDAG.addDAGToCurrentDAG(this); + flattened = true; } /** @@ -1317,9 +1302,9 @@ public class LogicalPlan implements Serializable, DAG } if (parent == null) { - fullName = name; + fullName = getName(); } else { - fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name; + fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + getName(); } return fullName; } @@ -1401,53 +1386,25 @@ public class LogicalPlan implements Serializable, DAG public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks) { StreamMeta s = addStream(id); - id = s.id; - ArrayListMultimap<OutputPort<?>, InputPort<?>> streamMap = ArrayListMultimap.create(); - if (!(source instanceof ProxyOutputPort)) { - s.setSource(source); - } + s.setSource(source); for (Operator.InputPort<?> sink : sinks) { - if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) { - streamMap.put(source, sink); - streamLinks.put(id, streamMap); - } else { - if (s.getSource() == null) { - s.setSource(source); - } - s.addSink(sink); - } + s.addSink(sink); } return s; } /** - * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with - * the actual ports that they refer to This method adds sources and sinks for the StreamMeta objects which were left - * empty in the addStream call. + * This will be called once the Logical Dag is expanded, and proxy input and proxy output ports are populated + * with actual ports they refer to. */ public void applyStreamLinks() { - for (String id : streamLinks.keySet()) { - StreamMeta s = getStream(id); - for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) { - if (s.getSource() == null) { - Operator.OutputPort<?> outputPort = pair.getKey(); - while (outputPort instanceof ProxyOutputPort) { - outputPort = ((ProxyOutputPort<?>)outputPort).get(); - } - s.setSource(outputPort); - } - - Operator.InputPort<?> inputPort = pair.getValue(); - while (inputPort instanceof ProxyInputPort) { - inputPort = ((ProxyInputPort<?>)inputPort).get(); - } - s.addSink(inputPort); - } + for (StreamMeta smeta : streams.values()) { + smeta.resolvePorts(); } } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) private void addDAGToCurrentDAG(ModuleMeta moduleMeta) { LogicalPlan subDag = moduleMeta.getDag(); @@ -1594,12 +1551,6 @@ public class LogicalPlan implements Serializable, DAG } @Override - public ModuleMeta getModuleMeta(String moduleName) - { - return this.modules.get(moduleName); - } - - @Override public OperatorMeta getMeta(Operator operator) { // TODO: cache mapping @@ -1611,17 +1562,6 @@ public class LogicalPlan implements Serializable, DAG throw new IllegalArgumentException("Operator not associated with the DAG: " + operator); } - @Override - public ModuleMeta getMeta(Module module) - { - for (ModuleMeta m : getAllModules()) { - if (m.module == module) { - return m; - } - } - throw new IllegalArgumentException("Module not associated with the DAG: " + module); - } - public int getMaxContainerCount() { return this.getValue(CONTAINERS_MAX_COUNT); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 628fce2..bab414f 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -69,6 +69,8 @@ import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.GenericOperator; +import com.datatorrent.api.Module; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.StringCodec; @@ -2111,28 +2113,28 @@ public class LogicalPlanConfiguration Map<String, OperatorConf> operators = appConf.getChildren(StramElement.OPERATOR); - Map<OperatorConf, Operator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size()); + Map<OperatorConf, GenericOperator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size()); // add all operators first for (Map.Entry<String, OperatorConf> nodeConfEntry : operators.entrySet()) { OperatorConf nodeConf = nodeConfEntry.getValue(); if (!WILDCARD.equals(nodeConf.id)) { - Class<? extends Operator> nodeClass = StramUtils.classForName(nodeConf.getClassNameReqd(), Operator.class); + Class<? extends GenericOperator> nodeClass = StramUtils.classForName(nodeConf.getClassNameReqd(), GenericOperator.class); String optJson = nodeConf.getProperties().get(nodeClass.getName()); - Operator nd = null; + GenericOperator operator = null; try { if (optJson != null) { // if there is a special key which is the class name, it means the operator is serialized in json format ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer(); - nd = mapper.readValue("{\"" + nodeClass.getName() + "\":" + optJson + "}", nodeClass); - dag.addOperator(nodeConfEntry.getKey(), nd); + operator = mapper.readValue("{\"" + nodeClass.getName() + "\":" + optJson + "}", nodeClass); + addOperator(dag, nodeConfEntry.getKey(), operator); } else { - nd = dag.addOperator(nodeConfEntry.getKey(), nodeClass); + operator = addOperator(dag, nodeConfEntry.getKey(), nodeClass); } - setOperatorProperties(nd, nodeConf.getProperties()); + setOperatorProperties(operator, nodeConf.getProperties()); } catch (IOException e) { throw new IllegalArgumentException("Error setting operator properties " + e.getMessage(), e); } - nodeMap.put(nodeConf, nd); + nodeMap.put(nodeConf, operator); } } @@ -2157,7 +2159,7 @@ public class LogicalPlanConfiguration portName = e.getKey(); } } - Operator sourceDecl = nodeMap.get(streamConf.sourceNode); + GenericOperator sourceDecl = nodeMap.get(streamConf.sourceNode); Operators.PortMappingDescriptor sourcePortMap = new Operators.PortMappingDescriptor(); Operators.describe(sourceDecl, sourcePortMap); sd.setSource(sourcePortMap.outputPorts.get(portName).component); @@ -2174,7 +2176,7 @@ public class LogicalPlanConfiguration portName = e.getKey(); } } - Operator targetDecl = nodeMap.get(targetNode); + GenericOperator targetDecl = nodeMap.get(targetNode); Operators.PortMappingDescriptor targetPortMap = new Operators.PortMappingDescriptor(); Operators.describe(targetDecl, targetPortMap); sd.addSink(targetPortMap.inputPorts.get(portName).component); @@ -2187,6 +2189,27 @@ public class LogicalPlanConfiguration } + private GenericOperator addOperator(LogicalPlan dag, String name, GenericOperator operator) + { + if (operator instanceof Module) { + dag.addModule(name, (Module)operator); + } else if (operator instanceof Operator) { + dag.addOperator(name, (Operator)operator); + } + return operator; + } + + + private GenericOperator addOperator(LogicalPlan dag, String name, Class<?> clazz) + { + if (Module.class.isAssignableFrom(clazz)) { + return dag.addModule(name, (Class<Module>)clazz); + } else if (Operator.class.isAssignableFrom(clazz)) { + return dag.addOperator(name, (Class<Operator>)clazz); + } + return null; + } + /** * Populate the logical plan from the streaming application definition and configuration. * Configuration is resolved based on application alias, if any. @@ -2323,12 +2346,7 @@ public class LogicalPlanConfiguration private PropertyArgs getPropertyArgs(OperatorMeta om) { - return new PropertyArgs(om.getName(), om.getOperator().getClass().getName()); - } - - private PropertyArgs getPropertyArgs(ModuleMeta mm) - { - return new PropertyArgs(mm.getName(), mm.getModule().getClass().getName()); + return new PropertyArgs(om.getName(), om.getGenericOperator().getClass().getName()); } /** @@ -2361,7 +2379,7 @@ public class LogicalPlanConfiguration * @param properties * @return Operator */ - public static Operator setOperatorProperties(Operator operator, Map<String, String> properties) + public static GenericOperator setOperatorProperties(GenericOperator operator, Map<String, String> properties) { try { // populate custom opProps @@ -2372,26 +2390,6 @@ public class LogicalPlanConfiguration } } - /** - * Generic helper function to inject properties on the object. - * - * @param obj - * @param properties - * @param <T> - * @return - */ - public static <T> T setObjectProperties(T obj, Map<String, String> properties) - { - try { - BeanUtils.populate(obj, properties); - return obj; - } catch (IllegalAccessException e) { - throw new IllegalArgumentException("Error setting operator properties", e); - } catch (InvocationTargetException e) { - throw new IllegalArgumentException("Error setting operator properties", e); - } - } - public static StreamingApplication setApplicationProperties(StreamingApplication application, Map<String, String> properties) { try { @@ -2421,7 +2419,7 @@ public class LogicalPlanConfiguration for (OperatorMeta ow : dag.getAllOperators()) { List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR); Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, applicationName); - setOperatorProperties(ow.getOperator(), opProps); + setOperatorProperties(ow.getGenericOperator(), opProps); } } @@ -2502,7 +2500,7 @@ public class LogicalPlanConfiguration for (final ModuleMeta mw : dag.getAllModules()) { List<OperatorConf> opConfs = getMatchingChildConf(appConfs, mw.getName(), StramElement.OPERATOR); Map<String, String> opProps = getProperties(getPropertyArgs(mw), opConfs, appName); - setObjectProperties(mw.getModule(), opProps); + setOperatorProperties(mw.getGenericOperator(), opProps); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java index 5da7383..7bb4c39 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java @@ -22,6 +22,7 @@ import java.lang.reflect.Field; import java.util.LinkedHashMap; import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG.GenericOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; @@ -80,7 +81,7 @@ public abstract class Operators } } - public static void describe(Operator operator, OperatorDescriptor descriptor) + public static void describe(GenericOperator operator, OperatorDescriptor descriptor) { for (Class<?> c = operator.getClass(); c != Object.class; c = c.getSuperclass()) { Field[] fields = c.getDeclaredFields(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java index 52be922..f09a53e 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java @@ -828,7 +828,7 @@ public class StramWebServices if (logicalModule == null) { throw new NotFoundException(); } - operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule()); + operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getOperator()); } else { operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java index 97c015e..1966678 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java @@ -118,7 +118,7 @@ public class ModuleAppTest static class TestModule implements Module { - public transient ProxyInputPort<Integer> moduleInput = new Module.ProxyInputPort<Integer>(); + public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>(); public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>(); @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java index d5af67b..97a375f 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java @@ -18,16 +18,22 @@ */ package com.datatorrent.stram.plan.logical.module; +import java.io.IOException; +import java.io.InputStream; import java.io.Serializable; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Random; +import org.codehaus.jettison.json.JSONObject; import org.junit.Assert; import org.junit.Test; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Attribute; @@ -48,7 +54,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; public class TestModuleExpansion { - static class DummyInputOperator extends BaseOperator implements InputOperator + public static class DummyInputOperator extends BaseOperator implements InputOperator { private int inputOperatorProp = 0; @@ -72,7 +78,7 @@ public class TestModuleExpansion } } - static class DummyOperator extends BaseOperator + public static class DummyOperator extends BaseOperator { private int operatorProp = 0; @@ -104,7 +110,7 @@ public class TestModuleExpansion } } - static class TestPartitioner implements Partitioner<DummyOperator>, Serializable + public static class TestPartitioner implements Partitioner<DummyOperator>, Serializable { @Override public Collection<Partition<DummyOperator>> definePartitions(Collection<Partition<DummyOperator>> partitions, PartitioningContext context) @@ -121,7 +127,7 @@ public class TestModuleExpansion } } - static class Level1Module implements Module + public static class Level1Module implements Module { private int level1ModuleProp = 0; @@ -184,7 +190,7 @@ public class TestModuleExpansion } } - static class Level2ModuleA implements Module + public static class Level2ModuleA implements Module { private int level2ModuleAProp1 = 0; private int level2ModuleAProp2 = 0; @@ -253,7 +259,7 @@ public class TestModuleExpansion } } - static class Level2ModuleB implements Module + public static class Level2ModuleB implements Module { private int level2ModuleBProp1 = 0; private int level2ModuleBProp2 = 0; @@ -321,7 +327,7 @@ public class TestModuleExpansion } } - static class Level3Module implements Module + public static class Level3Module implements Module { public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>(); @@ -344,7 +350,7 @@ public class TestModuleExpansion } } - static class NestedModuleApp implements StreamingApplication + public static class NestedModuleApp implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) @@ -564,10 +570,6 @@ public class TestModuleExpansion Assert.assertTrue(moduleNames.contains("Me")); Assert.assertEquals("Number of modules are 5", 5, dag.getAllModules().size()); - // correct module meta is returned by getMeta call. - LogicalPlan.ModuleMeta m = dag.getModuleMeta("Ma"); - Assert.assertEquals("Name of module is Ma", m.getName(), "Ma"); - } private static String componentName(String... names) @@ -671,4 +673,51 @@ public class TestModuleExpansion Assert.assertEquals("Locality is " + locality, meta.getLocality(), locality); } + @Test + public void testLoadFromPropertiesFile() throws IOException + { + Properties props = new Properties(); + String resourcePath = "/testModuleTopology.properties"; + InputStream is = this.getClass().getResourceAsStream(resourcePath); + if (is == null) { + throw new RuntimeException("Could not load " + resourcePath); + } + props.load(is); + LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false)) + .addFromProperties(props, null); + + LogicalPlan dag = new LogicalPlan(); + pb.populateDAG(dag); + pb.prepareDAG(dag, null, "testApplication"); + dag.validate(); + validateTopLevelOperators(dag); + validateTopLevelStreams(dag); + validatePublicMethods(dag); + } + + @Test + public void testLoadFromJson() throws Exception + { + String resourcePath = "/testModuleTopology.json"; + InputStream is = this.getClass().getResourceAsStream(resourcePath); + if (is == null) { + throw new RuntimeException("Could not load " + resourcePath); + } + StringWriter writer = new StringWriter(); + + IOUtils.copy(is, writer); + JSONObject json = new JSONObject(writer.toString()); + + Configuration conf = new Configuration(false); + conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf"); + + LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf); + LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson"); + planConf.prepareDAG(dag, null, "testApplication"); + dag.validate(); + validateTopLevelOperators(dag); + validateTopLevelStreams(dag); + validatePublicMethods(dag); + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/test/resources/testModuleTopology.json ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/testModuleTopology.json b/engine/src/test/resources/testModuleTopology.json new file mode 100644 index 0000000..8b2087a --- /dev/null +++ b/engine/src/test/resources/testModuleTopology.json @@ -0,0 +1,141 @@ +{ + "operators": [ + { + "name": "O1", + "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator", + "properties": { + "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator": { + "inputOperatorProp": "1" + } + } + }, + { + "name": "O2", + "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator", + "properties": { + "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator": { + "operatorProp": "2" + } + } + }, + { + "name": "Ma", + "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA", + "properties": { + "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA": { + "level2ModuleAProp1": "11", + "level2ModuleAProp2": "12", + "level2ModuleAProp3": "13" + } + } + }, + { + "name": "Mb", + "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB", + "properties": { + "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB": { + "level2ModuleBProp1": "21", + "level2ModuleBProp2": "22", + "level2ModuleBProp3": "23" + } + } + }, + { + "name": "Mc", + "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA", + "properties": { + "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA": { + "level2ModuleAProp1": "31", + "level2ModuleAProp2": "32", + "level2ModuleAProp3": "33" + } + } + }, + { + "name": "Md", + "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB", + "properties": { + "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB": { + "level2ModuleBProp1": "41", + "level2ModuleBProp2": "42", + "level2ModuleBProp3": "43" + } + } + }, + { + "name": "Me", + "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module" + } + ], + "streams": [ + { + "name": "O1_O2", + "source": { + "operatorName": "O1", + "portName": "out" + }, + "sinks": [ + { + "operatorName": "O2", + "portName": "in" + }, + { + "operatorName": "Me", + "portName": "mIn" + } + ] + }, + { + "name": "O2_Ma", + "source": { + "operatorName": "O2", + "portName": "out1" + }, + "sinks": [ + { + "operatorName": "Ma", + "portName": "mIn" + } + ] + }, + { + "name": "Ma_Mb", + "source": { + "operatorName": "Ma", + "portName": "mOut1" + }, + "sinks": [ + { + "operatorName": "Mb", + "portName": "mIn" + } + ] + }, + { + "name": "Ma_Md", + "source": { + "operatorName": "Ma", + "portName": "mOut2" + }, + "sinks": [ + { + "operatorName": "Md", + "portName": "mIn" + } + ] + }, + { + "name": "Mb_Mc", + "source": { + "operatorName": "Mb", + "portName": "mOut2" + }, + "sinks": [ + { + "operatorName": "Mc", + "portName": "mIn" + } + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/test/resources/testModuleTopology.properties ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/testModuleTopology.properties b/engine/src/test/resources/testModuleTopology.properties new file mode 100644 index 0000000..0679e26 --- /dev/null +++ b/engine/src/test/resources/testModuleTopology.properties @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# test for defining topology as property file +dt.operator.O1.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator +dt.operator.O1.inputOperatorProp=1 + +dt.operator.O2.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator +dt.operator.O2.operatorProp=2 + +dt.operator.Ma.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA +dt.operator.Ma.level2ModuleAProp1=11 +dt.operator.Ma.level2ModuleAProp2=12 +dt.operator.Ma.level2ModuleAProp3=13 + +dt.operator.Mb.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB +dt.operator.Mb.level2ModuleBProp1=21 +dt.operator.Mb.level2ModuleBProp2=22 +dt.operator.Mb.level2ModuleBProp3=23 + +dt.operator.Mc.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA +dt.operator.Mc.level2ModuleAProp1=31 +dt.operator.Mc.level2ModuleAProp2=32 +dt.operator.Mc.level2ModuleAProp3=33 + +dt.operator.Md.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB +dt.operator.Md.level2ModuleBProp1=41 +dt.operator.Md.level2ModuleBProp2=42 +dt.operator.Md.level2ModuleBProp3=43 + +dt.operator.Me.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module + +dt.stream.O1_O2.source=O1.out +dt.stream.O1_O2.sinks=O2.in,Me.mIn + +dt.stream.O2_Ma.source=O2.out1 +dt.stream.O2_Ma.sinks=Ma.mIn + +dt.stream.Ma_Mb.source=Ma.mOut1 +dt.stream.Ma_Mb.sinks=Mb.mIn + +dt.stream.Ma_Md.source=Ma.mOut2 +dt.stream.Ma_Md.sinks=Md.mIn + +dt.stream.Mb_Mc.source=Mb.mOut2 +dt.stream.Mb_Mc.sinks=Mc.mIn