Github user PramodSSImmaneni commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/168#discussion_r45093254
--- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
@@ -600,6 +605,19 @@ public void activate()
CHECKPOINT_WINDOW_COUNT = 1;
}
+ int dagChkptWndwCnt =
context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
+ if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
+ int chkOffset = dagChkptWndwCnt % CHECKPOINT_WINDOW_COUNT;
+ if (chkOffset != 0) {
+ EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt +
CHECKPOINT_WINDOW_COUNT - chkOffset;
+ } else {
+ EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt;
+ }
+ } else {
+ EFFECTIVE_CHECKPOINT_WINDOW_COUNT = 1;
+ }
--- End diff --
@ilooner Normally windowsFromCheckpoint does not have to be saved as the
operator is going to recover at a checkpoint. However I think we may have to
save a few other things like checkpointWindowCount as well because we are
saying that we are going to checkpoint at a window divisible by the operator
checkpoint window count and this count should be from the beginning of the
application and not necessarily a restart. I will look at this comprehensively.
Regarding your second point you could at the beginWindow check the windows
from checkpoint and use that information later. Are you thinking about cases
where application window spans multiple streaming window? In the first phase
implementation of iteration support we had talked about limiting it to cases
where application window was same as streaming window if you remember. In
future when we add support for full application windows we will need the
callbacks you are suggesting so we know when the streaming window starts so as
to be able to not only save but inject those tuples at the right streaming
window for ingestion. However you will still need to that how far the streaming
window is from the checkpoint and you will need a method like this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---