[ 
https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15093250#comment-15093250
 ] 

ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------

Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r49414079
  
    --- Diff: 
engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1917,25 +1930,30 @@ public void updateRecoveryCheckpoints(PTOperator 
operator, UpdateCheckpointsCont
           long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, 
this.vars.windowStartMillis, 
this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
           maxCheckpoint = currentWindowId;
         }
    +    ctx.visited.add(operator);
     
         // DFS downstream operators
    -    for (PTOperator.PTOutput out : operator.getOutputs()) {
    -      for (PTOperator.PTInput sink : out.sinks) {
    -        PTOperator sinkOperator = sink.target;
    -        if (!ctx.visited.contains(sinkOperator)) {
    -          // downstream traversal
    -          updateRecoveryCheckpoints(sinkOperator, ctx);
    -        }
    -        // recovery window id cannot move backwards
    -        // when dynamically adding new operators
    -        if (sinkOperator.getRecoveryCheckpoint().windowId >= 
operator.getRecoveryCheckpoint().windowId) {
    -          maxCheckpoint = Math.min(maxCheckpoint, 
sinkOperator.getRecoveryCheckpoint().windowId);
    -        }
    +    if (operator.getOperatorMeta().getOperator() instanceof 
Operator.DelayOperator) {
    +      addVisited(operator, ctx);
    +    } else {
    --- End diff --
    
    It's not working because the recovery checkpoint of the operator where the 
delay loop joins can be older than the downstream operators. Therefore, when 
traversing the loop, upstream checkpoints needs to be taken into consideration, 
which is part of the broader solution Pramod refers to. Looking into this 
further, would like to clean up the special case handling for delay operator 
also.


> Iterative processing support
> ----------------------------
>
>                 Key: APEXCORE-60
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-60
>             Project: Apache Apex Core
>          Issue Type: New Feature
>            Reporter: David Yan
>            Assignee: David Yan
>              Labels: roadmap
>             Fix For: 3.3.0
>
>
> We would like to support iterative processing by introducing cycles in the 
> graph (known as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
>      |----|
>      v    |
> A -> B -> C -> D
> ^         |
> |---------|
> {noformat} 
> C has two separate backward streams to A and B.  The input ports of A and B 
> that C connects to will have a special attribute on how many window IDs ahead 
> the incoming windows should be treated as, and A and B will be responsible 
> for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have 
> C generate the initial data on its output ports to A and B.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to