As I mentioned before, when running a Flink Job and simply cancelling it all 
state about that job is discarded (with some exceptions, such as externalised 
checkpoints). If you want the state of a Job to survive a cancellation you have 
to perform a savepoint [1] and then when restarting the Job you have to specify 
a savepoint from which you want to restore.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html

> On 22 Mar 2017, at 01:43, Raghu Angadi <rang...@google.com.INVALID> wrote:
> 
> Expanding a bit more on what Dan wrote:
> 
>   - In Dataflow, there are two modes of restarting a job : regular stop
>   and then start & an *update*. The checkpoint is carried over only in the
>   case of update.
>   - Update is the only to keep 'exactly-once' semantics.
>   - If the requirements are not very strict, you can enable offset commits
>   in Kafka itself. KafkaIO lets you configure this. Here the pipeline would
>   start reading from approximately where it left off in the previous run.
>      - When a offset commits are enabled, KafkaIO could this by
>      implementing 'finalize()' API on KafkaCheckpointMark [1].
>      - This is runner independent.
>      - The compromise is that this might skip a few records or read a few
>      old records when the pipeline is restarted.
>      - This does not override 'resume from checkpoint' support when runner
>      provides KafkaCheckpointMark. Externally committed offsets are used only
>      when KafkaIO's own CheckpointMark is not available.
> 
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50
> 
> On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <dhalp...@apache.org> wrote:
> 
>> [We should keep user list involved if that's where the discussion
>> originally was :)]
>> 
>> Jins George's original question was a good one. The right way to resume
>> from the previous offset here is what we're already doing – use the
>> KafkaCheckpointMark. In Beam, the runner maintains the state and not the
>> external system. Beam runners are responsible for maintaining the
>> checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
>> a user disables checkpointing, then they are explicitly opting into "redo
>> all work" on restart.
>> 
>> --> If checkpointing is enabled but the KafkaCheckpointMark is not being
>> provided, then I'm inclined to agree with Amit that there may simply be a
>> bug in the FlinkRunner. (+aljoscha)
>> 
>> For what Mingmin Xu asked about: presumably if the Kafka source is
>> initially configured to "read from latest offset", when it restarts with no
>> checkpoint this will automatically go find the latest offset. That would
>> mimic at-most-once semantics in a buggy runner that did not provide
>> checkpointing.
>> 
>> Dan
>> 
>> On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote:
>> 
>>> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
>>> Can it restore during job restart? --Not test the runner in streaming for
>>> some time.
>>> 
>>> Regarding to data-completeness, I would use at-most-once when few data
>>> missing(mostly tasknode failure) is tolerated, compared to the performance
>>> cost introduced by 'state'/'checkpoint'.
>>> 
>>> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>> 
>>>> On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote:
>>>> 
>>>>> Move discuss to dev-list
>>>>> 
>>>>> Savepoint in Flink, also checkpoint in Spark, should be good enough to
>>>>> handle this case.
>>>>> 
>>>>> When people don't enable these features, for example only need
>>>> at-most-once
>>>>> 
>>>> The Spark runner forces checkpointing on any streaming (Beam)
>>> application,
>>>> mostly because it uses mapWithState for reading from UnboundedSource and
>>>> updateStateByKey form GroupByKey - so by design, Spark runner is
>>>> at-least-once. Generally, I always thought that applications that
>>> require
>>>> at-most-once are more focused on processing time only, as they only care
>>>> about whatever get's ingested into the pipeline at a specific time and
>>>> don't care (up to the point of losing data) about correctness.
>>>> I would be happy to hear more about your use case.
>>>> 
>>>>> semantic, each unbounded IO should try its best to restore from last
>>>>> offset, although CheckpointMark is null. Any ideas?
>>>>> 
>>>>> Mingmin
>>>>> 
>>>>> On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org>
>>>> wrote:
>>>>> 
>>>>>> hey,
>>>>>> 
>>>>>> The native Beam UnboundedSource API supports resuming from
>>> checkpoint
>>>> --
>>>>>> that specifically happens here
>>>>>> <
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafk
>>>> a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
>>>>> when
>>>>>> the KafkaCheckpointMark is non-null.
>>>>>> 
>>>>>> The FlinkRunner should be providing the KafkaCheckpointMark from the
>>>> most
>>>>>> recent savepoint upon restore.
>>>>>> 
>>>>>> There shouldn't be any "special" Flink runner support needed, nor is
>>>> the
>>>>>> State API involved.
>>>>>> 
>>>>>> Dan
>>>>>> 
>>>>>> On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
>>> j...@nanthrax.net
>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Would not it be Flink runner specific ?
>>>>>>> 
>>>>>>> Maybe the State API could do the same in a runner agnostic way
>>> (just
>>>>>>> thinking loud) ?
>>>>>>> 
>>>>>>> Regards
>>>>>>> JB
>>>>>>> 
>>>>>>> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
>>>>>>> 
>>>>>>>> From KafkaIO itself, looks like it either start_from_beginning or
>>>>>>>> start_from_latest. It's designed to leverage
>>>>>>>> `UnboundedSource.CheckpointMark`
>>>>>>>> during initialization, but so far I don't see it's provided by
>>>> runners.
>>>>>>>> At the
>>>>>>>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it
>>> in
>>>>>>>> KafkaIO.
>>>>>>>> 
>>>>>>>> Mingmin
>>>>>>>> 
>>>>>>>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <
>>>> aljos...@apache.org
>>>>>>>> <mailto:aljos...@apache.org>> wrote:
>>>>>>>> 
>>>>>>>>    Hi,
>>>>>>>>    Are you using Flink savepoints [1] when restoring your
>>>> application?
>>>>>>>> If you
>>>>>>>>    use this the Kafka offset should be stored in state and it
>>> should
>>>>>>>> restart
>>>>>>>>    from the correct position.
>>>>>>>> 
>>>>>>>>    Best,
>>>>>>>>    Aljoscha
>>>>>>>> 
>>>>>>>>    [1]
>>>>>>>>    https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>>> setup/savepoints.html
>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-release-1.3
>>>>>>>> /setup/savepoints.html>
>>>>>>>>> On 21 Mar 2017, at 01:50, Jins George <
>>> jins.geo...@aeris.net
>>>>>>>>    <mailto:jins.geo...@aeris.net>> wrote:
>>>>>>>>> 
>>>>>>>>> Hello,
>>>>>>>>> 
>>>>>>>>> I am writing a Beam pipeline(streaming) with Flink runner to
>>>>>>>> consume data
>>>>>>>>    from Kafka and apply some transformations and persist to
>>> Hbase.
>>>>>>>>> 
>>>>>>>>> If I restart the application ( due to failure/manual
>>> restart),
>>>>>>>> consumer
>>>>>>>>    does not resume from the offset where it was prior to
>>> restart. It
>>>>>>>> always
>>>>>>>>    resume from the latest offset.
>>>>>>>>> 
>>>>>>>>> If I enable Flink checkpionting with hdfs state back-end,
>>>> system
>>>>>>>> appears
>>>>>>>>    to be resuming from the earliest offset
>>>>>>>>> 
>>>>>>>>> Is there a recommended way to resume from the offset where
>>> it
>>>> was
>>>>>>>> stopped ?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Jins George
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> ----
>>>>>>>> Mingmin
>>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbono...@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> ----
>>>>> Mingmin
>>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> ----
>>> Mingmin
>>> 
>> 
>> 

Reply via email to