Tim, If the operator is such that I can discard the state from the previous checkpoint to the current checkpoint on the checkpointed callback then the state can be reconstructed by the incoming data on replay and I don't need to wait till committed to remove the un-wanted state. Its more efficient memory wise.
Thanks On Mon, Nov 23, 2015 at 11:17 AM, Timothy Farkas <[email protected]> wrote: > Pramod, > > Very good point about checkpointed being called within a window boundary. > But I still don't understand the purpose of calling checkpointed after the > asynchronous copy to HDFS is completed. It makes checkpointed > implementations more complex, and it does not provide any benefits. The > checkpointed implementation logic could be called multiple times for the > same window if the operator fails and is restored to an earlier checkpoint, > this fact invalidates any atomicity gaurantees that you are trying to > preserve. I would like a concrete example of when it is necessary to call > checkpointed after the asynchronous copy to hdfs. The only use case I can > think of is if the operator reads its own checkpointed state from hdfs in > the checkpointed call back, but this use case is not a valid use case. > > Tim > > On Mon, Nov 23, 2015 at 10:56 AM, Pramod Immaneni <[email protected]> > wrote: > > > There is a problem I see with the older way in which checkpointed was > > called as well which is relevant to this. It is called outside of window > > boundary between end window and begin window which I don't think is the > > correct as platform should not run operator business logic outside of > > window boundaries. The operator could end up sending tuples accidentally > in > > this call for example. > > > > I think we need to fix that and also not assume that checkpointed > callback > > will be called immediately after end window of the window being > > checkpointed. > > > > Thanks > > > > On Mon, Nov 23, 2015 at 10:38 AM, Chandni Singh <[email protected] > > > > wrote: > > > > > :-) I did not write it that semantics. Looks like someone made that > > > assumption along the way. Fix the code which assumes that. > > > > > > What do you mean? > > > We go on fixing all the code which was written because the behavior > with > > > Synchronous checkpointing was > > > beginWindow(x) -> endWindow -> checkpointWindow(x) ( when > checkpointing > > > is aligned with application window boundary) > > > > > > This was broken recently with async checkpointing but instead of fixing > > > that, we go on fixing all the existing solutions. > > > I guess I find this ridiculous. > > > > > > And why should we make it harder and harder to write operators? > > > I think majority of people who participated in this discussion do > believe > > > that we need to fix and I think this is critical. > > > > > > Chandni > > > > > > > > > > > > On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) < > > > [email protected]> wrote: > > > > > > > > > > > > > > > On 11/23/15, 8:44 AM, "Chandni Singh" <[email protected]> > wrote: > > > > > > > > >I think in the API, there is windowId in the checkpointed callback > for > > > > >cases when checkpointing could happen within application windows. > > > > > > > > > >IMHO backward compatibility is broken. For 3 years since the > platform > > > was > > > > >created the semantics of checkpointed were > > > > >beginWindow(x) -> endwindow -> checkpointWindow(x) when > checkpointing > > > was > > > > >aligned with application window boundaries. > > > > > > > > > > > > > :-) I did not write it that semantics. Looks like someone made that > > > > assumption along the way. Fix the code which assumes that. > > > > > > > > ‹ > > > > Chetan > > > > > > > > >Please not that aligning Checkpointing with Application Window > > boundary > > > is > > > > >the DEFAUL behavior and I think most of us agree that aligning the > > > > >checkpointing with application window boundaries is what we see in > > every > > > > >use case. > > > > > > > > > >Code was written with that assumption by committers of Apex (not > even > > by > > > > >people who are new to Apex). It is broken by by a change introduced > > > couple > > > > >of months back (August 2015). > > > > > > > > > >Moreover, what do we achieve by NOT guaranteeing that semantics- > > > delegate > > > > >the complexity to operators to handle it. Even a simple scenario > that > > I > > > > >mentioned in my first mail, is very complicated. > > > > > > > > > >I think this is a big issue and we need to fix it. > > > > > > > > > >Chandni > > > > > > > > > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath < > > [email protected] > > > > > > > > >wrote: > > > > > > > > > >> I too find Gaurav's argument cogent: endWindow() does not take a > > > > >> windowId parameter > > > > >> leading to a natural guarantee that it matches the immediately > > > > >> preceding beginWindow(). > > > > >> > > > > >> Both committed() and checkpointed() take that parameter which > > suggests > > > > >>it > > > > >> may > > > > >> lag behind the current window. The comments on those methods say > > > nothing > > > > >> that can be be interpreted as a guarantee that the windowId will > > match > > > > >>the > > > > >> window just processed. So, from the viewpoint of "original intent" > > -- > > > > >> a phrase that > > > > >> has a long and storied history in jurisprudence > > > > >> (https://en.wikipedia.org/wiki/Original_intent) -- > > > > >> it seems that any code that assumed a guarantee when none existed > > must > > > > >> be regarded > > > > >> as erroneous. > > > > >> > > > > >> Having said that, we are all aware that in software it is not at > all > > > > >> unusual to preserve behavior > > > > >> in the interests of backward compatibility, regardless of many > other > > > > >> reasons for that behavior > > > > >> to be considered objectionable. So, if the cost of fixing all the > > code > > > > >> that makes that > > > > >> assumption is too high, we should seriously consider reverting it. > > In > > > > >> this context, the > > > > >> guarantee that checkpointed() simply means that the operator state > > has > > > > >> been serialized > > > > >> seems adequate. > > > > >> > > > > >> We have a roughly analogous situation with respect to OS write() > > > calls: > > > > >> When the call returns, we _do not_ have an assurance that the data > > has > > > > >> gone to disk. > > > > >> All we know is that the OS has copied the data to its buffers for > > > > >> flushing to disk. If we > > > > >> want to know when the actual write to disk is done, we need to > use a > > > > >> different call -- fsync(). > > > > >> > > > > >> Ram > > > > >> > > > > >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni > > > > >><[email protected]> > > > > >> wrote: > > > > >> > I think the original intent for checkpointed callback was that > it > > > can > > > > >>be > > > > >> > called anytime after checkpoint and not necessarily immediately > > > after > > > > >>the > > > > >> > window prior of checkpoint. As Gaurav mentioned the API bears it > > > out. > > > > >> > Furthermore the callback has to be called within the lifecycle > > > > >>methods of > > > > >> > the operator so it will be called inside a window so you anyway > > have > > > > >>to > > > > >> > deal with new data anyway before the callback is called. Even > > though > > > > >> > checkpointed is not committed shouldn't it at least guarantee > that > > > the > > > > >> > operator is recoverable to that state in which case it should be > > > > >>called > > > > >> > after the save to HDFS actually finishes. > > > > >> > > > > > >> > Thanks > > > > >> > > > > > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta < > > > [email protected] > > > > > > > > > >> > wrote: > > > > >> > > > > > >> >> IMO, I don¹t think there is any backward incompatibility wrt > > > > >> Checkpointing > > > > >> >> call back semantics because > > > > >> >> > > > > >> >> 1. The checkpointed call is only made once the operator state > is > > > > >> preserved. > > > > >> >> 2. The window ids being passed to checkpointed are in > increasing > > > > >>order. > > > > >> >> 3. The window ids being passed are still the same ids as were > > > passed > > > > >> >> earlier. > > > > >> >> 4. The sequence is still > > > begingWindow()->endWindow()->checkpointed(). > > > > >> >> > > > > >> >> Thanks > > > > >> >> - Gaurav > > > > >> >> > > > > >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta < > > > [email protected]> > > > > >> >> wrote: > > > > >> >> > > > > > >> >> > If the requirement is that the order is always > > > > >> >> begingWindow()->endWindow()->checkpointed(), why to pass > windowId > > > in > > > > >>the > > > > >> >> checkpointed() call back? > > > > >> >> > > > > > >> >> > Thanks > > > > >> >> > - Gaurav > > > > >> >> > > > > > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh > > > > >><[email protected] > > > > >> >> <mailto:[email protected]>> wrote: > > > > >> >> >> > > > > >> >> >> FYI, > > > > >> >> >> > > > > >> >> >> HDHTWriter implementation is dependent on the older > semantics > > > and > > > > >> seems > > > > >> >> to > > > > >> >> >> be broken now. > > > > >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x) > > > > >> >> >> In the checkpointed implementation, it copies certain state > > > > >> (transient) > > > > >> >> and > > > > >> >> >> transfers it to a checkpointedWriteCache with respect to > > window > > > > >>'x'. > > > > >> >> >> > > > > >> >> >> With Async checkpointing it, the state that is transferred > is > > > much > > > > >> more > > > > >> >> >> recent than window 'x'. > > > > >> >> >> > > > > >> >> >> Chandni > > > > >> >> >> > > > > >> >> >> > > > > >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh < > > > > >> >> [email protected] <mailto:[email protected]>> > > > > >> >> >> wrote: > > > > >> >> >> > > > > >> >> >>> Agreed. Thomas's solution fixes the backward > > incompatibility. I > > > > >> think > > > > >> >> we > > > > >> >> >>> really need to fix this. > > > > >> >> >>> > > > > >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas < > > > > >> [email protected] > > > > >> >> <mailto:[email protected]>> > > > > >> >> >>> wrote: > > > > >> >> >>> > > > > >> >> >>>> Gaurav, > > > > >> >> >>>> > > > > >> >> >>>> I think if the state copy fails then STRAM should roll > back > > > the > > > > >> >> operator > > > > >> >> >>>> to > > > > >> >> >>>> a checkpoint that is further back than the last > checkpoint. > > If > > > > >>you > > > > >> are > > > > >> >> >>>> saying that you want to preserve the semantic that > > > checkpointed > > > > >>is > > > > >> >> only > > > > >> >> >>>> called after a checkpoint is completed, I would argue that > > > that > > > > >> >> guarantee > > > > >> >> >>>> is already pointless in the current implementation since > it > > is > > > > >> always > > > > >> >> >>>> possible for an operator to be rolled back to a checkpoint > > > > >>before > > > > >> it's > > > > >> >> >>>> last > > > > >> >> >>>> completed checkpoint. So, it is already currently possible > > for > > > > >>some > > > > >> >> >>>> database or file operation performed after a completed > > > > >>checkpoint > > > > >> to > > > > >> >> be > > > > >> >> >>>> redone after a failure. Because of this I think Thomas's > > > > >>solution > > > > >> >> makes > > > > >> >> >>>> the > > > > >> >> >>>> most sense. Thomas's solution would also address Chandni's > > > > >>original > > > > >> >> point > > > > >> >> >>>> that the semantics for the checkpointed call back have > been > > > > >> violated. > > > > >> >> >>>> There > > > > >> >> >>>> are operators in our libraries that have depended on the > > > > >> >> beginWindow(x), > > > > >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is > > now > > > > >> broken. > > > > >> >> We > > > > >> >> >>>> should probably fix that. > > > > >> >> >>>> > > > > >> >> >>>> Tim > > > > >> >> >>>> > > > > >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta < > > > > >> >> [email protected] <mailto:[email protected]>> > > > > >> >> >>>> wrote: > > > > >> >> >>>> > > > > >> >> >>>>> Thomas, > > > > >> >> >>>>> > > > > >> >> >>>>> This was done to preserve checkpointing semantics that is > > to > > > > >>tell > > > > >> the > > > > >> >> >>>>> operator that its state is preserved. Say if database is > > > > >>updated > > > > >> or > > > > >> >> >>>> files > > > > >> >> >>>>> are moved in checkpointed call but the state copy fails, > > how > > > to > > > > >> >> address > > > > >> >> >>>>> such scenarios? > > > > >> >> >>>>> > > > > >> >> >>>>> Thanks > > > > >> >> >>>>> - Gaurav > > > > >> >> >>>>> > > > > >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise < > > > > >> [email protected] > > > > >> >> <mailto:[email protected]>> > > > > >> >> >>>>> wrote: > > > > >> >> >>>>>> > > > > >> >> >>>>>> Alternatively I would ask why the checkpointed callback > > > needs > > > > >>to > > > > >> >> wait > > > > >> >> >>>>> until > > > > >> >> >>>>>> the data was copied to HDFS instead upon completion of > the > > > > >>state > > > > >> >> >>>>>> serialization. > > > > >> >> >>>>>> > > > > >> >> >>>>>> Thomas > > > > >> >> >>>>>> > > > > >> >> >>>>>> > > > > >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh < > > > > >> >> >>>> [email protected] <mailto:[email protected]>> > > > > >> >> >>>>>> wrote: > > > > >> >> >>>>>> > > > > >> >> >>>>>>> Gaurav, > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> My question is about why Async was made the default > when > > it > > > > >> changed > > > > >> >> >>>> the > > > > >> >> >>>>>>> semantics of operator callbacks. Your response doesn't > > > answer > > > > >> that. > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> In a way we broke backward compatibility. > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> Chandni > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta < > > > > >> >> >>>> [email protected] <mailto:[email protected]>> > > > > >> >> >>>>>>> wrote: > > > > >> >> >>>>>>> > > > > >> >> >>>>>>>> The idea behind Async checkpointing is to unblock > > operator > > > > >> while > > > > >> >> the > > > > >> >> >>>>>>> state > > > > >> >> >>>>>>>> is getting transferred to HDFS. > > > > >> >> >>>>>>>> Just to clarify that this beginWindow (x) -> > > endWindow(x) > > > -> > > > > >> >> >>>>> checkpointed > > > > >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is > > > slow > > > > >>or > > > > >> for > > > > >> >> >>>> some > > > > >> >> >>>>>>>> other reason transferring the state to HDFS is slow > this > > > > >> sequence > > > > >> >> >>>> may > > > > >> >> >>>>> not > > > > >> >> >>>>>>>> hold true. > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> Can your use case be addressed by > > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 < > > > > >> >> https://malhar.atlassian.net/browse/APEX-78> < > > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 < > > > > >> >> https://malhar.atlassian.net/browse/APEX-78>>? > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> Thanks > > > > >> >> >>>>>>>> - Gaurav > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh < > > > > >> >> >>>> [email protected] <mailto:[email protected]>> > > > > >> >> >>>>>>>> wrote: > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in > > > > >> >> CheckpointPoint > > > > >> >> >>>>>>>>> listener is called for a previous window, that is, > > > > >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 > ) > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> This feature was newly introduced. With synchronous > > > > >> >> checkpointing, > > > > >> >> >>>> the > > > > >> >> >>>>>>>>> behavior was always > > > > >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x) > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> A lot of operators were written before asynchronous > > > > >> checkpointing > > > > >> >> >>>> was > > > > >> >> >>>>>>>>> introduced and few of them can rely on the sequencing > > > > >> guaranteed > > > > >> >> by > > > > >> >> >>>>>>>>> synchronous checkpointing. > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> So why was Async Checkpointed made default? > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> With how Async checkpoint is today, the complexity to > > > > >>handle > > > > >> >> >>>> transient > > > > >> >> >>>>>>>>> state in checkpointed callback falls on every > operator. > > > For > > > > >> eg, > > > > >> >> >>>> lets > > > > >> >> >>>>>>> say > > > > >> >> >>>>>>>>> earlier I had a transient map which I cleared every > > time > > > > >>the > > > > >> >> >>>>>>> checkpointed > > > > >> >> >>>>>>>>> was called, with async checkpointing this simple task > > > will > > > > >>be > > > > >> a > > > > >> >> lot > > > > >> >> >>>>>>> more > > > > >> >> >>>>>>>>> complicated. > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> I think Async checkpointing broke the semantics of > > > operator > > > > >> >> >>>> callbacks > > > > >> >> >>>>>>> and > > > > >> >> >>>>>>>>> should NOT be the default. > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>> > > > > >> >> >>>>> > > > > >> >> >>>>> > > > > >> >> >>>> > > > > >> >> >>> > > > > >> >> >>> > > > > >> >> > > > > > >> >> > > > > >> >> > > > > >> > > > > > > > > > > > > > >
