[ 
https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15091235#comment-15091235
 ] 

ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------

Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r49281400
  
    --- Diff: 
engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1917,25 +1930,30 @@ public void updateRecoveryCheckpoints(PTOperator 
operator, UpdateCheckpointsCont
           long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, 
this.vars.windowStartMillis, 
this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
           maxCheckpoint = currentWindowId;
         }
    +    ctx.visited.add(operator);
     
         // DFS downstream operators
    -    for (PTOperator.PTOutput out : operator.getOutputs()) {
    -      for (PTOperator.PTInput sink : out.sinks) {
    -        PTOperator sinkOperator = sink.target;
    -        if (!ctx.visited.contains(sinkOperator)) {
    -          // downstream traversal
    -          updateRecoveryCheckpoints(sinkOperator, ctx);
    -        }
    -        // recovery window id cannot move backwards
    -        // when dynamically adding new operators
    -        if (sinkOperator.getRecoveryCheckpoint().windowId >= 
operator.getRecoveryCheckpoint().windowId) {
    -          maxCheckpoint = Math.min(maxCheckpoint, 
sinkOperator.getRecoveryCheckpoint().windowId);
    -        }
    +    if (operator.getOperatorMeta().getOperator() instanceof 
Operator.DelayOperator) {
    +      addVisited(operator, ctx);
    +    } else {
    --- End diff --
    
    @davidyan74 I have been working on making the recovery checkpoints the same 
in case of iteration or idempotent operators (APEXCORE-279). Please wait for 
the pull request.


> Iterative processing support
> ----------------------------
>
>                 Key: APEXCORE-60
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-60
>             Project: Apache Apex Core
>          Issue Type: New Feature
>            Reporter: David Yan
>            Assignee: David Yan
>              Labels: roadmap
>             Fix For: 3.3.0
>
>
> We would like to support iterative processing by introducing cycles in the 
> graph (known as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
>      |----|
>      v    |
> A -> B -> C -> D
> ^         |
> |---------|
> {noformat} 
> C has two separate backward streams to A and B.  The input ports of A and B 
> that C connects to will have a special attribute on how many window IDs ahead 
> the incoming windows should be treated as, and A and B will be responsible 
> for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have 
> C generate the initial data on its output ports to A and B.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to