I too find Gaurav's argument cogent: endWindow() does not take a
windowId parameter
leading to a natural guarantee that it matches the immediately
preceding beginWindow().

Both committed() and checkpointed() take that parameter which suggests it may
lag behind the current window. The comments on those methods say nothing
that can be be interpreted as a guarantee that the windowId will match the
window just processed. So, from the viewpoint of "original intent" --
a phrase that
has a long and storied history in jurisprudence
(https://en.wikipedia.org/wiki/Original_intent) --
it seems that any code that assumed a guarantee when none existed must
be regarded
as erroneous.

Having said that, we are all aware that in software it is not at all
unusual to preserve behavior
in the interests of backward compatibility, regardless of many other
reasons for that behavior
to be considered objectionable. So, if the cost of fixing all the code
that makes that
assumption is too high, we should seriously consider reverting it. In
this context, the
guarantee that checkpointed() simply means that the operator state has
been serialized
seems adequate.

We have a roughly analogous situation with respect to OS write() calls:
When the call returns, we _do not_ have an assurance that the data has
gone to disk.
All we know is that the OS has copied the data to its buffers for
flushing to disk. If we
want to know when the actual write to disk is done, we need to use a
different call -- fsync().

Ram

On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni <[email protected]> wrote:
> I think the original intent for checkpointed callback was that it can be
> called anytime after checkpoint and not necessarily immediately after the
> window prior of checkpoint. As Gaurav mentioned the API bears it out.
> Furthermore the callback has to be called within the lifecycle methods of
> the operator so it will be called inside a window so you anyway have to
> deal with new data anyway before the callback is called. Even though
> checkpointed is not committed shouldn't it at least guarantee that the
> operator is recoverable to that state in which case it should be called
> after the save to HDFS actually finishes.
>
> Thanks
>
> On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <[email protected]>
> wrote:
>
>> IMO, I don’t think there is any backward incompatibility wrt Checkpointing
>> call back semantics because
>>
>> 1. The checkpointed call is only made once the operator state is preserved.
>> 2. The window ids being passed to checkpointed are in increasing order.
>> 3. The window ids being passed are still the same ids as were passed
>> earlier.
>> 4. The sequence is still begingWindow()->endWindow()->checkpointed().
>>
>> Thanks
>> - Gaurav
>>
>> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <[email protected]>
>> wrote:
>> >
>> > If the requirement is that the order is always
>> begingWindow()->endWindow()->checkpointed(), why to pass windowId in the
>> checkpointed() call back?
>> >
>> > Thanks
>> > - Gaurav
>> >
>> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh <[email protected]
>> <mailto:[email protected]>> wrote:
>> >>
>> >> FYI,
>> >>
>> >> HDHTWriter implementation is dependent on the older semantics and seems
>> to
>> >> be broken now.
>> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
>> >> In the checkpointed implementation, it copies certain state (transient)
>> and
>> >> transfers it to a checkpointedWriteCache with respect to window 'x'.
>> >>
>> >> With Async checkpointing it, the state that is transferred is much more
>> >> recent than window 'x'.
>> >>
>> >> Chandni
>> >>
>> >>
>> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
>> [email protected] <mailto:[email protected]>>
>> >> wrote:
>> >>
>> >>> Agreed. Thomas's solution fixes the backward incompatibility. I think
>> we
>> >>> really need to fix this.
>> >>>
>> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <[email protected]
>> <mailto:[email protected]>>
>> >>> wrote:
>> >>>
>> >>>> Gaurav,
>> >>>>
>> >>>> I think if the state copy fails then STRAM should roll back the
>> operator
>> >>>> to
>> >>>> a checkpoint that is further back than the last checkpoint. If you are
>> >>>> saying that you want to preserve the semantic that checkpointed is
>> only
>> >>>> called after a checkpoint is completed, I would argue that that
>> guarantee
>> >>>> is already pointless in the current implementation since it is always
>> >>>> possible for an operator to be rolled back to a checkpoint before it's
>> >>>> last
>> >>>> completed checkpoint. So, it is already currently possible for some
>> >>>> database or file operation performed after a completed checkpoint to
>> be
>> >>>> redone after a failure. Because of this I think Thomas's solution
>> makes
>> >>>> the
>> >>>> most sense. Thomas's solution would also address Chandni's original
>> point
>> >>>> that the semantics for the checkpointed call back have been violated.
>> >>>> There
>> >>>> are operators in our libraries that have depended on the
>> beginWindow(x),
>> >>>> endWindow(x), and checkpointed(x) call sequence, which is now broken.
>> We
>> >>>> should probably fix that.
>> >>>>
>> >>>> Tim
>> >>>>
>> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
>> [email protected] <mailto:[email protected]>>
>> >>>> wrote:
>> >>>>
>> >>>>> Thomas,
>> >>>>>
>> >>>>> This was done to preserve checkpointing semantics that is to tell the
>> >>>>> operator that its state is preserved. Say if database is updated or
>> >>>> files
>> >>>>> are moved in checkpointed call but the state copy fails, how to
>> address
>> >>>>> such scenarios?
>> >>>>>
>> >>>>> Thanks
>> >>>>> - Gaurav
>> >>>>>
>> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <[email protected]
>> <mailto:[email protected]>>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Alternatively I would ask why the checkpointed callback needs to
>> wait
>> >>>>> until
>> >>>>>> the data was copied to HDFS instead upon completion of the state
>> >>>>>> serialization.
>> >>>>>>
>> >>>>>> Thomas
>> >>>>>>
>> >>>>>>
>> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
>> >>>> [email protected] <mailto:[email protected]>>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Gaurav,
>> >>>>>>>
>> >>>>>>> My question is about why Async was made the default when it changed
>> >>>> the
>> >>>>>>> semantics of operator callbacks. Your response doesn't answer that.
>> >>>>>>>
>> >>>>>>> In a way we broke backward compatibility.
>> >>>>>>>
>> >>>>>>> Chandni
>> >>>>>>>
>> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
>> >>>> [email protected] <mailto:[email protected]>>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> The idea behind Async checkpointing is to unblock operator while
>> the
>> >>>>>>> state
>> >>>>>>>> is getting transferred to HDFS.
>> >>>>>>>> Just to clarify that this beginWindow (x) -> endWindow(x) ->
>> >>>>> checkpointed
>> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is slow or for
>> >>>> some
>> >>>>>>>> other reason transferring the state to HDFS is slow this sequence
>> >>>> may
>> >>>>> not
>> >>>>>>>> hold true.
>> >>>>>>>>
>> >>>>>>>> Can your use case be addressed by
>> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>> https://malhar.atlassian.net/browse/APEX-78> <
>> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
>> https://malhar.atlassian.net/browse/APEX-78>>?
>> >>>>>>>>
>> >>>>>>>> Thanks
>> >>>>>>>> - Gaurav
>> >>>>>>>>
>> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
>> >>>> [email protected] <mailto:[email protected]>>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> With Async checkpointing the checkpoint callback in
>> CheckpointPoint
>> >>>>>>>>> listener is called for a previous window, that is,
>> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1 )
>> >>>>>>>>>
>> >>>>>>>>> This feature was newly introduced. With synchronous
>> checkpointing,
>> >>>> the
>> >>>>>>>>> behavior was always
>> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
>> >>>>>>>>>
>> >>>>>>>>> A lot of operators were written before asynchronous checkpointing
>> >>>> was
>> >>>>>>>>> introduced and few of them can rely on the sequencing guaranteed
>> by
>> >>>>>>>>> synchronous checkpointing.
>> >>>>>>>>>
>> >>>>>>>>> So why was Async Checkpointed made default?
>> >>>>>>>>>
>> >>>>>>>>> With how Async checkpoint is today, the complexity to handle
>> >>>> transient
>> >>>>>>>>> state in checkpointed callback falls on every operator. For eg,
>> >>>> lets
>> >>>>>>> say
>> >>>>>>>>> earlier I had a transient map which I cleared every time the
>> >>>>>>> checkpointed
>> >>>>>>>>> was called, with async checkpointing this simple task will be a
>> lot
>> >>>>>>> more
>> >>>>>>>>> complicated.
>> >>>>>>>>>
>> >>>>>>>>> I think Async checkpointing broke the semantics of operator
>> >>>> callbacks
>> >>>>>>> and
>> >>>>>>>>> should NOT be the default.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >
>>
>>

Reply via email to