[
https://issues.apache.org/jira/browse/APEXCORE-107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15265409#comment-15265409
]
ASF GitHub Bot commented on APEXCORE-107:
-----------------------------------------
Github user tweise commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/313#discussion_r61673039
--- Diff:
engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
@@ -778,32 +799,162 @@ public void
resetStreamPersistanceOnSinkRemoval(InputPortMeta sinkBeingRemoved)
}
}
+ public class DagNodeMeta implements Serializable
+ {
+ protected final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams
= new LinkedHashMap<>();
+ protected final LinkedHashMap<OutputPortMeta, StreamMeta>
outputStreams = new LinkedHashMap<>();
+ @NotNull
+ protected final String name;
+ protected transient Vertex node;
+
+ /*
+ * Used for OIO validation,
+ * value null => node not visited yet
+ * other value => represents the root oio node for this node
+ */
+ protected transient Integer oioRoot = null;
+ public DagNodeMeta(String name, Vertex node)
+ {
+ this.name = name;
+ this.node = node;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public Vertex getNode()
+ {
+ return node;
+ }
+
+ private class PortMapping implements Operators.OperatorDescriptor
+ {
+ private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap =
new HashMap<>();
+ private final Map<Operator.OutputPort<?>, OutputPortMeta> outPortMap
= new HashMap<>();
+ private final Map<String, Object> portNameMap = new HashMap<>();
+
+ @Override
+ public void addInputPort(InputPort<?> portObject, Field field,
InputPortFieldAnnotation portAnnotation, AppData.QueryPort adqAnnotation)
+ {
+ if (!DagNodeMeta.this.inputStreams.isEmpty()) {
+ for (Map.Entry<LogicalPlan.InputPortMeta,
LogicalPlan.StreamMeta> e : DagNodeMeta.this.inputStreams.entrySet()) {
+ LogicalPlan.InputPortMeta pm = e.getKey();
+ if (pm.operatorMeta == DagNodeMeta.this &&
pm.fieldName.equals(field.getName())) {
+ //LOG.debug("Found existing port meta for: " + field);
+ inPortMap.put(portObject, pm);
+ markInputPortIfHidden(pm.getPortName(), pm,
field.getDeclaringClass());
+ return;
+ }
+ }
+ }
+ InputPortMeta metaPort = new InputPortMeta();
+ metaPort.operatorMeta = DagNodeMeta.this;
+ metaPort.fieldName = field.getName();
+ metaPort.portAnnotation = portAnnotation;
+ metaPort.adqAnnotation = adqAnnotation;
+ inPortMap.put(portObject, metaPort);
+ markInputPortIfHidden(metaPort.getPortName(), metaPort,
field.getDeclaringClass());
+ }
+
+ @Override
+ public void addOutputPort(OutputPort<?> portObject, Field field,
OutputPortFieldAnnotation portAnnotation, AppData.ResultPort adrAnnotation)
+ {
+ if (!DagNodeMeta.this.outputStreams.isEmpty()) {
+ for (Map.Entry<LogicalPlan.OutputPortMeta,
LogicalPlan.StreamMeta> e : DagNodeMeta.this.outputStreams.entrySet()) {
+ LogicalPlan.OutputPortMeta pm = e.getKey();
+ if (pm.operatorMeta == DagNodeMeta.this &&
pm.fieldName.equals(field.getName())) {
+ //LOG.debug("Found existing port meta for: " + field);
+ outPortMap.put(portObject, pm);
+ markOutputPortIfHidden(pm.getPortName(), pm,
field.getDeclaringClass());
+ return;
+ }
+ }
+ }
+ OutputPortMeta metaPort = new OutputPortMeta();
+ metaPort.operatorMeta = DagNodeMeta.this;
+ metaPort.fieldName = field.getName();
+ metaPort.portAnnotation = portAnnotation;
+ metaPort.adrAnnotation = adrAnnotation;
+ outPortMap.put(portObject, metaPort);
+ markOutputPortIfHidden(metaPort.getPortName(), metaPort,
field.getDeclaringClass());
+ }
+
+ private void markOutputPortIfHidden(String portName, OutputPortMeta
portMeta, Class<?> declaringClass)
+ {
+ if (!portNameMap.containsKey(portName)) {
+ portNameMap.put(portName, portMeta);
+ } else {
+ // make the port optional
+ portMeta.classDeclaringHiddenPort = declaringClass;
+ }
+
+ }
+
+ private void markInputPortIfHidden(String portName, InputPortMeta
portMeta, Class<?> declaringClass)
+ {
+ if (!portNameMap.containsKey(portName)) {
+ portNameMap.put(portName, portMeta);
+ } else {
+ // make the port optional
+ portMeta.classDeclaringHiddenPort = declaringClass;
+ }
+ }
+ }
+
+ /**
+ * Ports objects are transient, we keep a lazy initialized mapping
+ */
+ private transient PortMapping portMapping = null;
+
+ protected PortMapping getPortMapping()
+ {
+ if (this.portMapping == null) {
+ this.portMapping = new PortMapping();
+ Operators.describe(this.getNode(), portMapping);
+ }
+ return portMapping;
+ }
+
+ public OutputPortMeta getMeta(Operator.OutputPort<?> port)
+ {
+ return getPortMapping().outPortMap.get(port);
+ }
+
+ public InputPortMeta getMeta(Operator.InputPort<?> port)
+ {
+ return getPortMapping().inPortMap.get(port);
+ }
+
+ public Map<InputPortMeta, StreamMeta> getInputStreams()
+ {
+ return this.inputStreams;
+ }
+
+ public Map<OutputPortMeta, StreamMeta> getOutputStreams()
+ {
+ return this.outputStreams;
+ }
+
+ }
+
/**
* Operator meta object.
*/
- public final class OperatorMeta implements DAG.OperatorMeta, Serializable
+ public class OperatorMeta extends DagNodeMeta implements
DAG.OperatorMeta, Serializable
--- End diff --
Not clear why we need this distinction and why some of the fields are
present here and not in "DagNodeMeta".
> Support adding module to application using property file API.
> -------------------------------------------------------------
>
> Key: APEXCORE-107
> URL: https://issues.apache.org/jira/browse/APEXCORE-107
> Project: Apache Apex Core
> Issue Type: Task
> Reporter: Tushar Gosavi
> Assignee: Tushar Gosavi
>
> Add support for adding modules in the DAG specified through property file and
> json file. The sample json format is specified below.
> {code}
> {
> "operators": [
> {
> "name": "operator1",
> "class": "com.datatorrent.lib.operator.Input",
> "properties": {
> "property1": "value1"
> }
> },
> {
> "name": "module1",
> "class": "com.datatorrent.module.Module1",
> "properties": {
> "property1": "value1"
> }
> },
> ],
> "streams": [
> {
> "name": "s1",
> "source": {
> "operatorName": "operator1",
> "portName": "output"
> },
> "sinks": [
> {
> "operatorName": "module",
> "portName": "input"
> }
> ]
> }
> ]
> }
> {code}
> For this to work, we will need to support adding module in DAG through
> property file API.
> This will allow external tools to visually construct the DAG containing
> modules.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)