[
https://issues.apache.org/jira/browse/APEXCORE-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15066732#comment-15066732
]
ASF GitHub Bot commented on APEXCORE-3:
---------------------------------------
Github user tushargosavi commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/189#discussion_r48167974
--- Diff:
engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
@@ -1116,13 +1320,78 @@ public StreamMeta addStream(String id)
public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends
T> source, Operator.InputPort<? super T>... sinks)
{
StreamMeta s = addStream(id);
- s.setSource(source);
- for (Operator.InputPort<?> sink: sinks) {
- s.addSink(sink);
+ id = s.id;
+ ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>>
streamMap = ArrayListMultimap.create();
+ if (!(source instanceof ProxyOutputPort)) {
+ s.setSource(source);
+ }
+ for (Operator.InputPort<?> sink : sinks) {
+ if (source instanceof ProxyOutputPort || sink instanceof
ProxyInputPort) {
+ streamMap.put(source, sink);
+ streamLinks.put(id, streamMap);
+ } else {
+ if (s.getSource() == null) {
+ s.setSource(source);
+ }
+ s.addSink(sink);
+ }
}
return s;
}
+ /**
+ * This will be called once the Logical Dag is expanded, and the proxy
input and proxy output ports are populated with the actual ports that they
refer to
+ * This method adds sources and sinks for the StreamMeta objects which
were left empty in the addStream call.
+ */
+ public void applyStreamLinks()
+ {
+ for (String id : streamLinks.keySet()) {
+ StreamMeta s = getStream(id);
+ for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair :
streamLinks.get(id).entries()) {
+ if (s.getSource() == null) {
+ Operator.OutputPort<?> outputPort = pair.getKey();
+ while (outputPort instanceof ProxyOutputPort) {
+ outputPort = ((ProxyOutputPort<?>)outputPort).get();
+ }
+ s.setSource(outputPort);
+ }
+
+ Operator.InputPort<?> inputPort = pair.getValue();
+ while (inputPort instanceof ProxyInputPort) {
+ inputPort = ((ProxyInputPort<?>)inputPort).get();
+ }
+ s.addSink(inputPort);
+ }
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
+ {
+ LogicalPlan subDag = moduleMeta.getDag();
+ String subDAGName = moduleMeta.getName();
+ String name;
+ for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
+ name = subDAGName + MODULE_NAMESPACE_SEPARATOR +
operatorMeta.getName();
+ this.addOperator(name, operatorMeta.getOperator());
+ OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
+ operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ?
subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR +
operatorMeta.getModuleName());
+ }
+
+ for (StreamMeta streamMeta : subDag.getAllStreams()) {
+ OutputPortMeta sourceMeta = streamMeta.getSource();
+ List<InputPort<?>> ports = new LinkedList<>();
+ for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
+ ports.add(inputPortMeta.getPortObject());
+ }
+ InputPort[] inputPorts = ports.toArray(new InputPort[]{});
+
+ name = subDAGName + MODULE_NAMESPACE_SEPARATOR +
streamMeta.getName();
+ StreamMeta streamMetaNew = this.addStream(name,
sourceMeta.getPortObject(), inputPorts);
+ streamMetaNew.setModuleName(streamMeta.getModuleName() == null ?
subDAGName : subDAGName + "_" + streamMeta.getModuleName());
--- End diff --
Removed it
> Ability for an operator to populate DAG at launch time
> ------------------------------------------------------
>
> Key: APEXCORE-3
> URL: https://issues.apache.org/jira/browse/APEXCORE-3
> Project: Apache Apex Core
> Issue Type: New Feature
> Reporter: Amol Kekre
> Assignee: Vlad Rozov
> Priority: Critical
>
> Apex should have an operator API that lets the operator generate DAG during
> launch time. This will mean the following
> - Logical DAG will have one operator. This is the operator that will generate
> a DAG underneath
> - Physical plan will have the DAG generated by the operator
> - Execution plan will mimic physical plan + container location etc.
> For example lets say we have three operators in a DAG (app) A->B->C
> B during launch time generates a DAG B1->B2->B3, then the physical plan will
> be
> A->B1->B2->B3->C
> This should work irrespective of number of ports, etc. A typical flattening.
> The operators inside of B (B1, B2, B3) should have properties and attributes
> just as any. Users should be able to access these at run time and compile
> time. B itself should support properties and attributes that B1, B2, B3 can
> inherit from.
> This is a very critical feature as it will open up users to plug-in their own
> engines and still take up complete operability support from Apex engine.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)