[
https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15090358#comment-15090358
]
ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------
Github user davidyan74 commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/185#discussion_r49259156
--- Diff:
engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
@@ -1917,25 +1930,30 @@ public void updateRecoveryCheckpoints(PTOperator
operator, UpdateCheckpointsCont
long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms,
this.vars.windowStartMillis,
this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
maxCheckpoint = currentWindowId;
}
+ ctx.visited.add(operator);
// DFS downstream operators
- for (PTOperator.PTOutput out : operator.getOutputs()) {
- for (PTOperator.PTInput sink : out.sinks) {
- PTOperator sinkOperator = sink.target;
- if (!ctx.visited.contains(sinkOperator)) {
- // downstream traversal
- updateRecoveryCheckpoints(sinkOperator, ctx);
- }
- // recovery window id cannot move backwards
- // when dynamically adding new operators
- if (sinkOperator.getRecoveryCheckpoint().windowId >=
operator.getRecoveryCheckpoint().windowId) {
- maxCheckpoint = Math.min(maxCheckpoint,
sinkOperator.getRecoveryCheckpoint().windowId);
- }
+ if (operator.getOperatorMeta().getOperator() instanceof
Operator.DelayOperator) {
+ addVisited(operator, ctx);
+ } else {
--- End diff --
@tweise I think I'm not doing this correctly and hence the out-of-sequence
tuple in the unit test. My debugging indicates that the recovery checkpoints
are not in sync for the operators that try to recover. Can you please review
this and see what I'm doing wrong?
> Iterative processing support
> ----------------------------
>
> Key: APEXCORE-60
> URL: https://issues.apache.org/jira/browse/APEXCORE-60
> Project: Apache Apex Core
> Issue Type: New Feature
> Reporter: David Yan
> Assignee: David Yan
> Labels: roadmap
> Fix For: 3.3.0
>
>
> We would like to support iterative processing by introducing cycles in the
> graph (known as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
> |----|
> v |
> A -> B -> C -> D
> ^ |
> |---------|
> {noformat}
> C has two separate backward streams to A and B. The input ports of A and B
> that C connects to will have a special attribute on how many window IDs ahead
> the incoming windows should be treated as, and A and B will be responsible
> for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have
> C generate the initial data on its output ports to A and B.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)