Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45093254
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
    @@ -600,6 +605,19 @@ public void activate()
           CHECKPOINT_WINDOW_COUNT = 1;
         }
     
    +    int dagChkptWndwCnt = 
context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
    +    if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
    +      int chkOffset = dagChkptWndwCnt % CHECKPOINT_WINDOW_COUNT;
    +      if (chkOffset != 0) {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt + 
CHECKPOINT_WINDOW_COUNT - chkOffset;
    +      } else {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt;
    +      }
    +    } else {
    +      EFFECTIVE_CHECKPOINT_WINDOW_COUNT = 1;
    +    }
    --- End diff --
    
    @ilooner Normally windowsFromCheckpoint does not have to be saved as the 
operator is going to recover at a checkpoint. However I think we may have to 
save a few other things like checkpointWindowCount as well because we are 
saying that we are going to checkpoint at a window divisible by the operator 
checkpoint window count and this count should be from the beginning of the 
application and not necessarily a restart. I will look at this comprehensively.
    
    Regarding your second point you could at the beginWindow check the windows 
from checkpoint and use that information later. Are you thinking about cases 
where application window spans multiple streaming window? In the first phase 
implementation of iteration support we had talked about limiting it to cases 
where application window was same as streaming window if you remember. In 
future when we add support for full application windows we will need the 
callbacks you are suggesting so we know when the streaming window starts so as 
to be able to not only save but inject those tuples at the right streaming 
window for ingestion. However you will still need to that how far the streaming 
window is from the checkpoint and you will need a method like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to