You’re right, it’s currently not possible. I created a Jira issue to track the problem: https://issues.apache.org/jira/browse/BEAM-1812 <https://issues.apache.org/jira/browse/BEAM-1812>
It shouldn’t be to hard to add this since it boils down to forwarding some configuration settings. Best, Aljoscha > On 24 Mar 2017, at 23:27, Jins George <jins.geo...@aeris.net> wrote: > > Currently /org.apache.beam.runners.flink.FlinkPipelineOptions/ does not have > a way to configure externalized checkpoints. Is that something in the road > map for FlinkRunner? > > Thanks, > Jins George > > On 03/23/2017 10:27 AM, Aljoscha Krettek wrote: >> For this you would use externalised checkpoints: >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html >> >> >> Unfortunately, the doc is still a bit sparse but it’s basically a >> combination of savepoints and checkpoints. Checkpoints are not cleaned up >> when a job fails and you can restore from them as you would from a savepoint. >> >> Best, >> Aljoscha >> >>> On 22 Mar 2017, at 21:25, Jins George <jins.geo...@aeris.net >>> <mailto:jins.geo...@aeris.net>> wrote: >>> >>> >>> Thanks Aljoscha for the clarification. Savepoints works fine in case of >>> controlled stop and restart. In case of a failure( say the entire job >>> failed due node crash or application software bug) is there a way to resume >>> from the checkpoint on restarting the application ? Checkpoint location is >>> configured with HDFS. >>> >>> Thanks, >>> Jins George >>> >>> On 03/22/2017 03:23 AM, Aljoscha Krettek wrote: >>>> As I mentioned before, when running a Flink Job and simply cancelling it >>>> all state about that job is discarded (with some exceptions, such as >>>> externalised checkpoints). If you want the state of a Job to survive a >>>> cancellation you have to perform a savepoint [1] and then when restarting >>>> the Job you have to specify a savepoint from which you want to restore. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html >>>> >>>>> On 22 Mar 2017, at 01:43, Raghu Angadi <rang...@google.com.INVALID >>>>> <mailto:rang...@google.com.INVALID>> wrote: >>>>> >>>>> Expanding a bit more on what Dan wrote: >>>>> >>>>> - In Dataflow, there are two modes of restarting a job : regular stop >>>>> and then start & an *update*. The checkpoint is carried over only in the >>>>> case of update. >>>>> - Update is the only to keep 'exactly-once' semantics. >>>>> - If the requirements are not very strict, you can enable offset commits >>>>> in Kafka itself. KafkaIO lets you configure this. Here the pipeline would >>>>> start reading from approximately where it left off in the previous run. >>>>> - When a offset commits are enabled, KafkaIO could this by >>>>> implementing 'finalize()' API on KafkaCheckpointMark [1]. >>>>> - This is runner independent. >>>>> - The compromise is that this might skip a few records or read a few >>>>> old records when the pipeline is restarted. >>>>> - This does not override 'resume from checkpoint' support when runner >>>>> provides KafkaCheckpointMark. Externally committed offsets are used >>>>> only >>>>> when KafkaIO's own CheckpointMark is not available. >>>>> >>>>> [1]: >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50 >>>>> >>>>> On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <dhalp...@apache.org> wrote: >>>>> >>>>>> [We should keep user list involved if that's where the discussion >>>>>> originally was :)] >>>>>> >>>>>> Jins George's original question was a good one. The right way to resume >>>>>> from the previous offset here is what we're already doing – use the >>>>>> KafkaCheckpointMark. In Beam, the runner maintains the state and not the >>>>>> external system. Beam runners are responsible for maintaining the >>>>>> checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. >>>>>> If >>>>>> a user disables checkpointing, then they are explicitly opting into "redo >>>>>> all work" on restart. >>>>>> >>>>>> --> If checkpointing is enabled but the KafkaCheckpointMark is not being >>>>>> provided, then I'm inclined to agree with Amit that there may simply be a >>>>>> bug in the FlinkRunner. (+aljoscha) >>>>>> >>>>>> For what Mingmin Xu asked about: presumably if the Kafka source is >>>>>> initially configured to "read from latest offset", when it restarts with >>>>>> no >>>>>> checkpoint this will automatically go find the latest offset. That would >>>>>> mimic at-most-once semantics in a buggy runner that did not provide >>>>>> checkpointing. >>>>>> >>>>>> Dan >>>>>> >>>>>> On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote: >>>>>> >>>>>>> In SparkRunner, the default checkpoint storage is >>>>>>> TmpCheckpointDirFactory. >>>>>>> Can it restore during job restart? --Not test the runner in streaming >>>>>>> for >>>>>>> some time. >>>>>>> >>>>>>> Regarding to data-completeness, I would use at-most-once when few data >>>>>>> missing(mostly tasknode failure) is tolerated, compared to the >>>>>>> performance >>>>>>> cost introduced by 'state'/'checkpoint'. >>>>>>> >>>>>>> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote: >>>>>>> >>>>>>>> On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Move discuss to dev-list >>>>>>>>> >>>>>>>>> Savepoint in Flink, also checkpoint in Spark, should be good enough to >>>>>>>>> handle this case. >>>>>>>>> >>>>>>>>> When people don't enable these features, for example only need >>>>>>>> at-most-once >>>>>>>> The Spark runner forces checkpointing on any streaming (Beam) >>>>>>> application, >>>>>>>> mostly because it uses mapWithState for reading from UnboundedSource >>>>>>>> and >>>>>>>> updateStateByKey form GroupByKey - so by design, Spark runner is >>>>>>>> at-least-once. Generally, I always thought that applications that >>>>>>> require >>>>>>>> at-most-once are more focused on processing time only, as they only >>>>>>>> care >>>>>>>> about whatever get's ingested into the pipeline at a specific time and >>>>>>>> don't care (up to the point of losing data) about correctness. >>>>>>>> I would be happy to hear more about your use case. >>>>>>>> >>>>>>>>> semantic, each unbounded IO should try its best to restore from last >>>>>>>>> offset, although CheckpointMark is null. Any ideas? >>>>>>>>> >>>>>>>>> Mingmin >>>>>>>>> >>>>>>>>> On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org> >>>>>>>> wrote: >>>>>>>>>> hey, >>>>>>>>>> >>>>>>>>>> The native Beam UnboundedSource API supports resuming from >>>>>>> checkpoint >>>>>>>> -- >>>>>>>>>> that specifically happens here >>>>>>>>>> < >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafk >>>>>>>> a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674> >>>>>>>>> when >>>>>>>>>> the KafkaCheckpointMark is non-null. >>>>>>>>>> >>>>>>>>>> The FlinkRunner should be providing the KafkaCheckpointMark from the >>>>>>>> most >>>>>>>>>> recent savepoint upon restore. >>>>>>>>>> >>>>>>>>>> There shouldn't be any "special" Flink runner support needed, nor is >>>>>>>> the >>>>>>>>>> State API involved. >>>>>>>>>> >>>>>>>>>> Dan >>>>>>>>>> >>>>>>>>>> On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré < >>>>>>> j...@nanthrax.net >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Would not it be Flink runner specific ? >>>>>>>>>>> >>>>>>>>>>> Maybe the State API could do the same in a runner agnostic way >>>>>>> (just >>>>>>>>>>> thinking loud) ? >>>>>>>>>>> >>>>>>>>>>> Regards >>>>>>>>>>> JB >>>>>>>>>>> >>>>>>>>>>> On 03/21/2017 04:56 PM, Mingmin Xu wrote: >>>>>>>>>>> >>>>>>>>>>>> From KafkaIO itself, looks like it either start_from_beginning or >>>>>>>>>>>> start_from_latest. It's designed to leverage >>>>>>>>>>>> `UnboundedSource.CheckpointMark` >>>>>>>>>>>> during initialization, but so far I don't see it's provided by >>>>>>>> runners. >>>>>>>>>>>> At the >>>>>>>>>>>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775 >>>>>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-1775>) to handle it >>>>>>> in >>>>>>>>>>>> KafkaIO. >>>>>>>>>>>> >>>>>>>>>>>> Mingmin >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek < >>>>>>>> aljos...@apache.org >>>>>>>>>>>> <mailto:aljos...@apache.org>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> Are you using Flink savepoints [1] when restoring your >>>>>>>> application? >>>>>>>>>>>> If you >>>>>>>>>>>> use this the Kafka offset should be stored in state and it >>>>>>> should >>>>>>>>>>>> restart >>>>>>>>>>>> from the correct position. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Aljoscha >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>>>>>>>> setup/savepoints.html >>>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3 >>>>>>>>>>>> /setup/savepoints.html> >>>>>>>>>>>>> On 21 Mar 2017, at 01:50, Jins George < >>>>>>> jins.geo...@aeris.net >>>>>>>>>>>> <mailto:jins.geo...@aeris.net>> wrote: >>>>>>>>>>>>> Hello, >>>>>>>>>>>>> >>>>>>>>>>>>> I am writing a Beam pipeline(streaming) with Flink runner to >>>>>>>>>>>> consume data >>>>>>>>>>>> from Kafka and apply some transformations and persist to >>>>>>> Hbase. >>>>>>>>>>>>> If I restart the application ( due to failure/manual >>>>>>> restart), >>>>>>>>>>>> consumer >>>>>>>>>>>> does not resume from the offset where it was prior to >>>>>>> restart. It >>>>>>>>>>>> always >>>>>>>>>>>> resume from the latest offset. >>>>>>>>>>>>> If I enable Flink checkpionting with hdfs state back-end, >>>>>>>> system >>>>>>>>>>>> appears >>>>>>>>>>>> to be resuming from the earliest offset >>>>>>>>>>>>> Is there a recommended way to resume from the offset where >>>>>>> it >>>>>>>> was >>>>>>>>>>>> stopped ? >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Jins George >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> ---- >>>>>>>>>>>> Mingmin >>>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>> jbono...@apache.org >>>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>>> Talend - http://www.talend.com >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> ---- >>>>>>>>> Mingmin >>>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> ---- >>>>>>> Mingmin >>>>>>> >>>>>> >>> >> >