[
https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068633#comment-15068633
]
ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------
Github user davidyan74 commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/185#discussion_r48290536
--- Diff:
engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
@@ -940,17 +942,17 @@ private void updateStreamMappings(PMapping m)
PTOperator slidingUnifier =
StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT),
slidingWindowCount);
StreamMapping.addInput(slidingUnifier, sourceOut, null);
- input = new PTInput(ipm.getKey().getPortName(),
ipm.getValue(), oper, null, slidingUnifier.outputs.get(0));
+ input = new PTInput(ipm.getKey().getPortName(),
ipm.getValue(), oper, null, slidingUnifier.outputs.get(0),
ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
}
else {
- input = new PTInput(ipm.getKey().getPortName(),
ipm.getValue(), oper, null, sourceOut);
+ input = new PTInput(ipm.getKey().getPortName(),
ipm.getValue(), oper, null, sourceOut,
ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
}
oper.inputs.add(input);
}
}
}
- } else {
+ } else if (sourceMapping != null) {
--- End diff --
Fixed
> Iterative processing support
> ----------------------------
>
> Key: APEXCORE-60
> URL: https://issues.apache.org/jira/browse/APEXCORE-60
> Project: Apache Apex Core
> Issue Type: New Feature
> Reporter: David Yan
> Assignee: David Yan
>
> We would like to support iterative processing by introducing cycles in the
> graph (known as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
> |----|
> v |
> A -> B -> C -> D
> ^ |
> |---------|
> {noformat}
> C has two separate backward streams to A and B. The input ports of A and B
> that C connects to will have a special attribute on how many window IDs ahead
> the incoming windows should be treated as, and A and B will be responsible
> for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have
> C generate the initial data on its output ports to A and B.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)