You’re right, it’s currently not possible. I created a Jira issue to track the 
problem: https://issues.apache.org/jira/browse/BEAM-1812 
<https://issues.apache.org/jira/browse/BEAM-1812>

It shouldn’t be to hard to add this since it boils down to forwarding some 
configuration settings.

Best,
Aljoscha

> On 24 Mar 2017, at 23:27, Jins George <jins.geo...@aeris.net> wrote:
> 
> Currently /org.apache.beam.runners.flink.FlinkPipelineOptions/ does not have 
> a way to configure externalized checkpoints. Is that something in the road 
> map for FlinkRunner?
> 
> Thanks,
> Jins George
> 
> On 03/23/2017 10:27 AM, Aljoscha Krettek wrote:
>> For this you would use externalised checkpoints: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html
>>  
>> 
>> Unfortunately, the doc is still a bit sparse but it’s basically a 
>> combination of savepoints and checkpoints. Checkpoints are not cleaned up 
>> when a job fails and you can restore from them as you would from a savepoint.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 22 Mar 2017, at 21:25, Jins George <jins.geo...@aeris.net 
>>> <mailto:jins.geo...@aeris.net>> wrote:
>>> 
>>> 
>>> Thanks Aljoscha for the clarification. Savepoints works fine in case of 
>>> controlled stop and restart. In case of a failure( say the entire job 
>>> failed due node crash or application software bug) is there a way to resume 
>>> from the checkpoint on restarting the application ? Checkpoint location is 
>>> configured with HDFS.
>>> 
>>> Thanks,
>>> Jins George
>>> 
>>> On 03/22/2017 03:23 AM, Aljoscha Krettek wrote:
>>>> As I mentioned before, when running a Flink Job and simply cancelling it 
>>>> all state about that job is discarded (with some exceptions, such as 
>>>> externalised checkpoints). If you want the state of a Job to survive a 
>>>> cancellation you have to perform a savepoint [1] and then when restarting 
>>>> the Job you have to specify a savepoint from which you want to restore.
>>>> 
>>>> [1] 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>>>> 
>>>>> On 22 Mar 2017, at 01:43, Raghu Angadi <rang...@google.com.INVALID 
>>>>> <mailto:rang...@google.com.INVALID>> wrote:
>>>>> 
>>>>> Expanding a bit more on what Dan wrote:
>>>>> 
>>>>>  - In Dataflow, there are two modes of restarting a job : regular stop
>>>>>  and then start & an *update*. The checkpoint is carried over only in the
>>>>>  case of update.
>>>>>  - Update is the only to keep 'exactly-once' semantics.
>>>>>  - If the requirements are not very strict, you can enable offset commits
>>>>>  in Kafka itself. KafkaIO lets you configure this. Here the pipeline would
>>>>>  start reading from approximately where it left off in the previous run.
>>>>>     - When a offset commits are enabled, KafkaIO could this by
>>>>>     implementing 'finalize()' API on KafkaCheckpointMark [1].
>>>>>     - This is runner independent.
>>>>>     - The compromise is that this might skip a few records or read a few
>>>>>     old records when the pipeline is restarted.
>>>>>     - This does not override 'resume from checkpoint' support when runner
>>>>>     provides KafkaCheckpointMark. Externally committed offsets are used 
>>>>> only
>>>>>     when KafkaIO's own CheckpointMark is not available.
>>>>> 
>>>>> [1]:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50
>>>>> 
>>>>> On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <dhalp...@apache.org> wrote:
>>>>> 
>>>>>> [We should keep user list involved if that's where the discussion
>>>>>> originally was :)]
>>>>>> 
>>>>>> Jins George's original question was a good one. The right way to resume
>>>>>> from the previous offset here is what we're already doing – use the
>>>>>> KafkaCheckpointMark. In Beam, the runner maintains the state and not the
>>>>>> external system. Beam runners are responsible for maintaining the
>>>>>> checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. 
>>>>>> If
>>>>>> a user disables checkpointing, then they are explicitly opting into "redo
>>>>>> all work" on restart.
>>>>>> 
>>>>>> --> If checkpointing is enabled but the KafkaCheckpointMark is not being
>>>>>> provided, then I'm inclined to agree with Amit that there may simply be a
>>>>>> bug in the FlinkRunner. (+aljoscha)
>>>>>> 
>>>>>> For what Mingmin Xu asked about: presumably if the Kafka source is
>>>>>> initially configured to "read from latest offset", when it restarts with 
>>>>>> no
>>>>>> checkpoint this will automatically go find the latest offset. That would
>>>>>> mimic at-most-once semantics in a buggy runner that did not provide
>>>>>> checkpointing.
>>>>>> 
>>>>>> Dan
>>>>>> 
>>>>>> On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote:
>>>>>> 
>>>>>>> In SparkRunner, the default checkpoint storage is 
>>>>>>> TmpCheckpointDirFactory.
>>>>>>> Can it restore during job restart? --Not test the runner in streaming 
>>>>>>> for
>>>>>>> some time.
>>>>>>> 
>>>>>>> Regarding to data-completeness, I would use at-most-once when few data
>>>>>>> missing(mostly tasknode failure) is tolerated, compared to the 
>>>>>>> performance
>>>>>>> cost introduced by 'state'/'checkpoint'.
>>>>>>> 
>>>>>>> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> Move discuss to dev-list
>>>>>>>>> 
>>>>>>>>> Savepoint in Flink, also checkpoint in Spark, should be good enough to
>>>>>>>>> handle this case.
>>>>>>>>> 
>>>>>>>>> When people don't enable these features, for example only need
>>>>>>>> at-most-once
>>>>>>>> The Spark runner forces checkpointing on any streaming (Beam)
>>>>>>> application,
>>>>>>>> mostly because it uses mapWithState for reading from UnboundedSource 
>>>>>>>> and
>>>>>>>> updateStateByKey form GroupByKey - so by design, Spark runner is
>>>>>>>> at-least-once. Generally, I always thought that applications that
>>>>>>> require
>>>>>>>> at-most-once are more focused on processing time only, as they only 
>>>>>>>> care
>>>>>>>> about whatever get's ingested into the pipeline at a specific time and
>>>>>>>> don't care (up to the point of losing data) about correctness.
>>>>>>>> I would be happy to hear more about your use case.
>>>>>>>> 
>>>>>>>>> semantic, each unbounded IO should try its best to restore from last
>>>>>>>>> offset, although CheckpointMark is null. Any ideas?
>>>>>>>>> 
>>>>>>>>> Mingmin
>>>>>>>>> 
>>>>>>>>> On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>> hey,
>>>>>>>>>> 
>>>>>>>>>> The native Beam UnboundedSource API supports resuming from
>>>>>>> checkpoint
>>>>>>>> --
>>>>>>>>>> that specifically happens here
>>>>>>>>>> <
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafk
>>>>>>>> a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
>>>>>>>>> when
>>>>>>>>>> the KafkaCheckpointMark is non-null.
>>>>>>>>>> 
>>>>>>>>>> The FlinkRunner should be providing the KafkaCheckpointMark from the
>>>>>>>> most
>>>>>>>>>> recent savepoint upon restore.
>>>>>>>>>> 
>>>>>>>>>> There shouldn't be any "special" Flink runner support needed, nor is
>>>>>>>> the
>>>>>>>>>> State API involved.
>>>>>>>>>> 
>>>>>>>>>> Dan
>>>>>>>>>> 
>>>>>>>>>> On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
>>>>>>> j...@nanthrax.net
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Would not it be Flink runner specific ?
>>>>>>>>>>> 
>>>>>>>>>>> Maybe the State API could do the same in a runner agnostic way
>>>>>>> (just
>>>>>>>>>>> thinking loud) ?
>>>>>>>>>>> 
>>>>>>>>>>> Regards
>>>>>>>>>>> JB
>>>>>>>>>>> 
>>>>>>>>>>> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> From KafkaIO itself, looks like it either start_from_beginning or
>>>>>>>>>>>> start_from_latest. It's designed to leverage
>>>>>>>>>>>> `UnboundedSource.CheckpointMark`
>>>>>>>>>>>> during initialization, but so far I don't see it's provided by
>>>>>>>> runners.
>>>>>>>>>>>> At the
>>>>>>>>>>>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
>>>>>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it
>>>>>>> in
>>>>>>>>>>>> KafkaIO.
>>>>>>>>>>>> 
>>>>>>>>>>>> Mingmin
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <
>>>>>>>> aljos...@apache.org
>>>>>>>>>>>> <mailto:aljos...@apache.org>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>   Hi,
>>>>>>>>>>>>   Are you using Flink savepoints [1] when restoring your
>>>>>>>> application?
>>>>>>>>>>>> If you
>>>>>>>>>>>>   use this the Kafka offset should be stored in state and it
>>>>>>> should
>>>>>>>>>>>> restart
>>>>>>>>>>>>   from the correct position.
>>>>>>>>>>>> 
>>>>>>>>>>>>   Best,
>>>>>>>>>>>>   Aljoscha
>>>>>>>>>>>> 
>>>>>>>>>>>>   [1]
>>>>>>>>>>>>   https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>>>>>>> setup/savepoints.html
>>>>>>>>>>>>   <https://ci.apache.org/projects/flink/flink-docs-release-1.3
>>>>>>>>>>>> /setup/savepoints.html>
>>>>>>>>>>>>> On 21 Mar 2017, at 01:50, Jins George <
>>>>>>> jins.geo...@aeris.net
>>>>>>>>>>>>   <mailto:jins.geo...@aeris.net>> wrote:
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am writing a Beam pipeline(streaming) with Flink runner to
>>>>>>>>>>>> consume data
>>>>>>>>>>>>   from Kafka and apply some transformations and persist to
>>>>>>> Hbase.
>>>>>>>>>>>>> If I restart the application ( due to failure/manual
>>>>>>> restart),
>>>>>>>>>>>> consumer
>>>>>>>>>>>>   does not resume from the offset where it was prior to
>>>>>>> restart. It
>>>>>>>>>>>> always
>>>>>>>>>>>>   resume from the latest offset.
>>>>>>>>>>>>> If I enable Flink checkpionting with hdfs state back-end,
>>>>>>>> system
>>>>>>>>>>>> appears
>>>>>>>>>>>>   to be resuming from the earliest offset
>>>>>>>>>>>>> Is there a recommended way to resume from the offset where
>>>>>>> it
>>>>>>>> was
>>>>>>>>>>>> stopped ?
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Jins George
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> ----
>>>>>>>>>>>> Mingmin
>>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>> jbono...@apache.org
>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> ----
>>>>>>>>> Mingmin
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> ----
>>>>>>> Mingmin
>>>>>>> 
>>>>>> 
>>> 
>> 
> 

Reply via email to