On 11/23/15, 8:44 AM, "Chandni Singh" <[email protected]> wrote:
>I think in the API, there is windowId in the checkpointed callback for >cases when checkpointing could happen within application windows. > >IMHO backward compatibility is broken. For 3 years since the platform was >created the semantics of checkpointed were >beginWindow(x) -> endwindow -> checkpointWindow(x) when checkpointing was >aligned with application window boundaries. > :-) I did not write it that semantics. Looks like someone made that assumption along the way. Fix the code which assumes that. ‹ Chetan >Please not that aligning Checkpointing with Application Window boundary is >the DEFAUL behavior and I think most of us agree that aligning the >checkpointing with application window boundaries is what we see in every >use case. > >Code was written with that assumption by committers of Apex (not even by >people who are new to Apex). It is broken by by a change introduced couple >of months back (August 2015). > >Moreover, what do we achieve by NOT guaranteeing that semantics- delegate >the complexity to operators to handle it. Even a simple scenario that I >mentioned in my first mail, is very complicated. > >I think this is a big issue and we need to fix it. > >Chandni > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <[email protected]> >wrote: > >> I too find Gaurav's argument cogent: endWindow() does not take a >> windowId parameter >> leading to a natural guarantee that it matches the immediately >> preceding beginWindow(). >> >> Both committed() and checkpointed() take that parameter which suggests >>it >> may >> lag behind the current window. The comments on those methods say nothing >> that can be be interpreted as a guarantee that the windowId will match >>the >> window just processed. So, from the viewpoint of "original intent" -- >> a phrase that >> has a long and storied history in jurisprudence >> (https://en.wikipedia.org/wiki/Original_intent) -- >> it seems that any code that assumed a guarantee when none existed must >> be regarded >> as erroneous. >> >> Having said that, we are all aware that in software it is not at all >> unusual to preserve behavior >> in the interests of backward compatibility, regardless of many other >> reasons for that behavior >> to be considered objectionable. So, if the cost of fixing all the code >> that makes that >> assumption is too high, we should seriously consider reverting it. In >> this context, the >> guarantee that checkpointed() simply means that the operator state has >> been serialized >> seems adequate. >> >> We have a roughly analogous situation with respect to OS write() calls: >> When the call returns, we _do not_ have an assurance that the data has >> gone to disk. >> All we know is that the OS has copied the data to its buffers for >> flushing to disk. If we >> want to know when the actual write to disk is done, we need to use a >> different call -- fsync(). >> >> Ram >> >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni >><[email protected]> >> wrote: >> > I think the original intent for checkpointed callback was that it can >>be >> > called anytime after checkpoint and not necessarily immediately after >>the >> > window prior of checkpoint. As Gaurav mentioned the API bears it out. >> > Furthermore the callback has to be called within the lifecycle >>methods of >> > the operator so it will be called inside a window so you anyway have >>to >> > deal with new data anyway before the callback is called. Even though >> > checkpointed is not committed shouldn't it at least guarantee that the >> > operator is recoverable to that state in which case it should be >>called >> > after the save to HDFS actually finishes. >> > >> > Thanks >> > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <[email protected]> >> > wrote: >> > >> >> IMO, I don¹t think there is any backward incompatibility wrt >> Checkpointing >> >> call back semantics because >> >> >> >> 1. The checkpointed call is only made once the operator state is >> preserved. >> >> 2. The window ids being passed to checkpointed are in increasing >>order. >> >> 3. The window ids being passed are still the same ids as were passed >> >> earlier. >> >> 4. The sequence is still begingWindow()->endWindow()->checkpointed(). >> >> >> >> Thanks >> >> - Gaurav >> >> >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <[email protected]> >> >> wrote: >> >> > >> >> > If the requirement is that the order is always >> >> begingWindow()->endWindow()->checkpointed(), why to pass windowId in >>the >> >> checkpointed() call back? >> >> > >> >> > Thanks >> >> > - Gaurav >> >> > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh >><[email protected] >> >> <mailto:[email protected]>> wrote: >> >> >> >> >> >> FYI, >> >> >> >> >> >> HDHTWriter implementation is dependent on the older semantics and >> seems >> >> to >> >> >> be broken now. >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x) >> >> >> In the checkpointed implementation, it copies certain state >> (transient) >> >> and >> >> >> transfers it to a checkpointedWriteCache with respect to window >>'x'. >> >> >> >> >> >> With Async checkpointing it, the state that is transferred is much >> more >> >> >> recent than window 'x'. >> >> >> >> >> >> Chandni >> >> >> >> >> >> >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh < >> >> [email protected] <mailto:[email protected]>> >> >> >> wrote: >> >> >> >> >> >>> Agreed. Thomas's solution fixes the backward incompatibility. I >> think >> >> we >> >> >>> really need to fix this. >> >> >>> >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas < >> [email protected] >> >> <mailto:[email protected]>> >> >> >>> wrote: >> >> >>> >> >> >>>> Gaurav, >> >> >>>> >> >> >>>> I think if the state copy fails then STRAM should roll back the >> >> operator >> >> >>>> to >> >> >>>> a checkpoint that is further back than the last checkpoint. If >>you >> are >> >> >>>> saying that you want to preserve the semantic that checkpointed >>is >> >> only >> >> >>>> called after a checkpoint is completed, I would argue that that >> >> guarantee >> >> >>>> is already pointless in the current implementation since it is >> always >> >> >>>> possible for an operator to be rolled back to a checkpoint >>before >> it's >> >> >>>> last >> >> >>>> completed checkpoint. So, it is already currently possible for >>some >> >> >>>> database or file operation performed after a completed >>checkpoint >> to >> >> be >> >> >>>> redone after a failure. Because of this I think Thomas's >>solution >> >> makes >> >> >>>> the >> >> >>>> most sense. Thomas's solution would also address Chandni's >>original >> >> point >> >> >>>> that the semantics for the checkpointed call back have been >> violated. >> >> >>>> There >> >> >>>> are operators in our libraries that have depended on the >> >> beginWindow(x), >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is now >> broken. >> >> We >> >> >>>> should probably fix that. >> >> >>>> >> >> >>>> Tim >> >> >>>> >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta < >> >> [email protected] <mailto:[email protected]>> >> >> >>>> wrote: >> >> >>>> >> >> >>>>> Thomas, >> >> >>>>> >> >> >>>>> This was done to preserve checkpointing semantics that is to >>tell >> the >> >> >>>>> operator that its state is preserved. Say if database is >>updated >> or >> >> >>>> files >> >> >>>>> are moved in checkpointed call but the state copy fails, how to >> >> address >> >> >>>>> such scenarios? >> >> >>>>> >> >> >>>>> Thanks >> >> >>>>> - Gaurav >> >> >>>>> >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise < >> [email protected] >> >> <mailto:[email protected]>> >> >> >>>>> wrote: >> >> >>>>>> >> >> >>>>>> Alternatively I would ask why the checkpointed callback needs >>to >> >> wait >> >> >>>>> until >> >> >>>>>> the data was copied to HDFS instead upon completion of the >>state >> >> >>>>>> serialization. >> >> >>>>>> >> >> >>>>>> Thomas >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh < >> >> >>>> [email protected] <mailto:[email protected]>> >> >> >>>>>> wrote: >> >> >>>>>> >> >> >>>>>>> Gaurav, >> >> >>>>>>> >> >> >>>>>>> My question is about why Async was made the default when it >> changed >> >> >>>> the >> >> >>>>>>> semantics of operator callbacks. Your response doesn't answer >> that. >> >> >>>>>>> >> >> >>>>>>> In a way we broke backward compatibility. >> >> >>>>>>> >> >> >>>>>>> Chandni >> >> >>>>>>> >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta < >> >> >>>> [email protected] <mailto:[email protected]>> >> >> >>>>>>> wrote: >> >> >>>>>>> >> >> >>>>>>>> The idea behind Async checkpointing is to unblock operator >> while >> >> the >> >> >>>>>>> state >> >> >>>>>>>> is getting transferred to HDFS. >> >> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) -> >> >> >>>>> checkpointed >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow >>or >> for >> >> >>>> some >> >> >>>>>>>> other reason transferring the state to HDFS is slow this >> sequence >> >> >>>> may >> >> >>>>> not >> >> >>>>>>>> hold true. >> >> >>>>>>>> >> >> >>>>>>>> Can your use case be addressed by >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 < >> >> https://malhar.atlassian.net/browse/APEX-78> < >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 < >> >> https://malhar.atlassian.net/browse/APEX-78>>? >> >> >>>>>>>> >> >> >>>>>>>> Thanks >> >> >>>>>>>> - Gaurav >> >> >>>>>>>> >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh < >> >> >>>> [email protected] <mailto:[email protected]>> >> >> >>>>>>>> wrote: >> >> >>>>>>>>> >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in >> >> CheckpointPoint >> >> >>>>>>>>> listener is called for a previous window, that is, >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 ) >> >> >>>>>>>>> >> >> >>>>>>>>> This feature was newly introduced. With synchronous >> >> checkpointing, >> >> >>>> the >> >> >>>>>>>>> behavior was always >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x) >> >> >>>>>>>>> >> >> >>>>>>>>> A lot of operators were written before asynchronous >> checkpointing >> >> >>>> was >> >> >>>>>>>>> introduced and few of them can rely on the sequencing >> guaranteed >> >> by >> >> >>>>>>>>> synchronous checkpointing. >> >> >>>>>>>>> >> >> >>>>>>>>> So why was Async Checkpointed made default? >> >> >>>>>>>>> >> >> >>>>>>>>> With how Async checkpoint is today, the complexity to >>handle >> >> >>>> transient >> >> >>>>>>>>> state in checkpointed callback falls on every operator. For >> eg, >> >> >>>> lets >> >> >>>>>>> say >> >> >>>>>>>>> earlier I had a transient map which I cleared every time >>the >> >> >>>>>>> checkpointed >> >> >>>>>>>>> was called, with async checkpointing this simple task will >>be >> a >> >> lot >> >> >>>>>>> more >> >> >>>>>>>>> complicated. >> >> >>>>>>>>> >> >> >>>>>>>>> I think Async checkpointing broke the semantics of operator >> >> >>>> callbacks >> >> >>>>>>> and >> >> >>>>>>>>> should NOT be the default. >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>> >> >> >>>>> >> >> >>>>> >> >> >>>> >> >> >>> >> >> >>> >> >> > >> >> >> >> >>
