[ 
https://issues.apache.org/jira/browse/APEXCORE-107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15265409#comment-15265409
 ] 

ASF GitHub Bot commented on APEXCORE-107:
-----------------------------------------

Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/313#discussion_r61673039
  
    --- Diff: 
engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -778,32 +799,162 @@ public void 
resetStreamPersistanceOnSinkRemoval(InputPortMeta sinkBeingRemoved)
         }
       }
     
    +  public class DagNodeMeta implements Serializable
    +  {
    +    protected final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams 
= new LinkedHashMap<>();
    +    protected final LinkedHashMap<OutputPortMeta, StreamMeta> 
outputStreams = new LinkedHashMap<>();
    +    @NotNull
    +    protected final String name;
    +    protected transient Vertex node;
    +
    +    /*
    +     * Used for  OIO validation,
    +     *  value null => node not visited yet
    +     *  other value => represents the root oio node for this node
    +     */
    +    protected transient Integer oioRoot = null;
    +    public DagNodeMeta(String name, Vertex node)
    +    {
    +      this.name = name;
    +      this.node = node;
    +    }
    +
    +    public String getName()
    +    {
    +      return name;
    +    }
    +
    +    public Vertex getNode()
    +    {
    +      return node;
    +    }
    +
    +    private class PortMapping implements Operators.OperatorDescriptor
    +    {
    +      private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = 
new HashMap<>();
    +      private final Map<Operator.OutputPort<?>, OutputPortMeta> outPortMap 
= new HashMap<>();
    +      private final Map<String, Object> portNameMap = new HashMap<>();
    +
    +      @Override
    +      public void addInputPort(InputPort<?> portObject, Field field, 
InputPortFieldAnnotation portAnnotation, AppData.QueryPort adqAnnotation)
    +      {
    +        if (!DagNodeMeta.this.inputStreams.isEmpty()) {
    +          for (Map.Entry<LogicalPlan.InputPortMeta, 
LogicalPlan.StreamMeta> e : DagNodeMeta.this.inputStreams.entrySet()) {
    +            LogicalPlan.InputPortMeta pm = e.getKey();
    +            if (pm.operatorMeta == DagNodeMeta.this && 
pm.fieldName.equals(field.getName())) {
    +              //LOG.debug("Found existing port meta for: " + field);
    +              inPortMap.put(portObject, pm);
    +              markInputPortIfHidden(pm.getPortName(), pm, 
field.getDeclaringClass());
    +              return;
    +            }
    +          }
    +        }
    +        InputPortMeta metaPort = new InputPortMeta();
    +        metaPort.operatorMeta = DagNodeMeta.this;
    +        metaPort.fieldName = field.getName();
    +        metaPort.portAnnotation = portAnnotation;
    +        metaPort.adqAnnotation = adqAnnotation;
    +        inPortMap.put(portObject, metaPort);
    +        markInputPortIfHidden(metaPort.getPortName(), metaPort, 
field.getDeclaringClass());
    +      }
    +
    +      @Override
    +      public void addOutputPort(OutputPort<?> portObject, Field field, 
OutputPortFieldAnnotation portAnnotation, AppData.ResultPort adrAnnotation)
    +      {
    +        if (!DagNodeMeta.this.outputStreams.isEmpty()) {
    +          for (Map.Entry<LogicalPlan.OutputPortMeta, 
LogicalPlan.StreamMeta> e : DagNodeMeta.this.outputStreams.entrySet()) {
    +            LogicalPlan.OutputPortMeta pm = e.getKey();
    +            if (pm.operatorMeta == DagNodeMeta.this && 
pm.fieldName.equals(field.getName())) {
    +              //LOG.debug("Found existing port meta for: " + field);
    +              outPortMap.put(portObject, pm);
    +              markOutputPortIfHidden(pm.getPortName(), pm, 
field.getDeclaringClass());
    +              return;
    +            }
    +          }
    +        }
    +        OutputPortMeta metaPort = new OutputPortMeta();
    +        metaPort.operatorMeta = DagNodeMeta.this;
    +        metaPort.fieldName = field.getName();
    +        metaPort.portAnnotation = portAnnotation;
    +        metaPort.adrAnnotation = adrAnnotation;
    +        outPortMap.put(portObject, metaPort);
    +        markOutputPortIfHidden(metaPort.getPortName(), metaPort, 
field.getDeclaringClass());
    +      }
    +
    +      private void markOutputPortIfHidden(String portName, OutputPortMeta 
portMeta, Class<?> declaringClass)
    +      {
    +        if (!portNameMap.containsKey(portName)) {
    +          portNameMap.put(portName, portMeta);
    +        } else {
    +          // make the port optional
    +          portMeta.classDeclaringHiddenPort = declaringClass;
    +        }
    +
    +      }
    +
    +      private void markInputPortIfHidden(String portName, InputPortMeta 
portMeta, Class<?> declaringClass)
    +      {
    +        if (!portNameMap.containsKey(portName)) {
    +          portNameMap.put(portName, portMeta);
    +        } else {
    +          // make the port optional
    +          portMeta.classDeclaringHiddenPort = declaringClass;
    +        }
    +      }
    +    }
    +
    +    /**
    +     * Ports objects are transient, we keep a lazy initialized mapping
    +     */
    +    private transient PortMapping portMapping = null;
    +
    +    protected PortMapping getPortMapping()
    +    {
    +      if (this.portMapping == null) {
    +        this.portMapping = new PortMapping();
    +        Operators.describe(this.getNode(), portMapping);
    +      }
    +      return portMapping;
    +    }
    +
    +    public OutputPortMeta getMeta(Operator.OutputPort<?> port)
    +    {
    +      return getPortMapping().outPortMap.get(port);
    +    }
    +
    +    public InputPortMeta getMeta(Operator.InputPort<?> port)
    +    {
    +      return getPortMapping().inPortMap.get(port);
    +    }
    +
    +    public Map<InputPortMeta, StreamMeta> getInputStreams()
    +    {
    +      return this.inputStreams;
    +    }
    +
    +    public Map<OutputPortMeta, StreamMeta> getOutputStreams()
    +    {
    +      return this.outputStreams;
    +    }
    +
    +  }
    +
       /**
        * Operator meta object.
        */
    -  public final class OperatorMeta implements DAG.OperatorMeta, Serializable
    +  public class OperatorMeta extends DagNodeMeta implements 
DAG.OperatorMeta, Serializable
    --- End diff --
    
    Not clear why we need this distinction and why some of the fields are 
present here and not in "DagNodeMeta".


> Support adding module to application using property file API.
> -------------------------------------------------------------
>
>                 Key: APEXCORE-107
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-107
>             Project: Apache Apex Core
>          Issue Type: Task
>            Reporter: Tushar Gosavi
>            Assignee: Tushar Gosavi
>
> Add support for adding modules in the DAG specified through property file and 
> json file. The sample json format is specified below.
> {code}
> {
>   "operators": [
>     {
>       "name": "operator1",
>       "class": "com.datatorrent.lib.operator.Input",
>       "properties": {
>         "property1": "value1"
>       }
>     },
>     {
>       "name": "module1",
>       "class": "com.datatorrent.module.Module1",
>       "properties": {
>         "property1": "value1"
>       }
>     },
>   ],
>   "streams": [
>     {
>       "name": "s1",
>       "source": {
>         "operatorName": "operator1",
>         "portName": "output"
>       },
>       "sinks": [
>         {
>           "operatorName": "module",
>           "portName": "input"
>         }
>       ]
>     }
>   ]
> }
> {code}
> For this to work, we will need to support adding module in DAG through 
> property file API.
> This will allow external tools to visually construct the DAG containing 
> modules.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to