As I mentioned before, when running a Flink Job and simply cancelling it all state about that job is discarded (with some exceptions, such as externalised checkpoints). If you want the state of a Job to survive a cancellation you have to perform a savepoint [1] and then when restarting the Job you have to specify a savepoint from which you want to restore.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html > On 22 Mar 2017, at 01:43, Raghu Angadi <rang...@google.com.INVALID> wrote: > > Expanding a bit more on what Dan wrote: > > - In Dataflow, there are two modes of restarting a job : regular stop > and then start & an *update*. The checkpoint is carried over only in the > case of update. > - Update is the only to keep 'exactly-once' semantics. > - If the requirements are not very strict, you can enable offset commits > in Kafka itself. KafkaIO lets you configure this. Here the pipeline would > start reading from approximately where it left off in the previous run. > - When a offset commits are enabled, KafkaIO could this by > implementing 'finalize()' API on KafkaCheckpointMark [1]. > - This is runner independent. > - The compromise is that this might skip a few records or read a few > old records when the pipeline is restarted. > - This does not override 'resume from checkpoint' support when runner > provides KafkaCheckpointMark. Externally committed offsets are used only > when KafkaIO's own CheckpointMark is not available. > > [1]: > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50 > > On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <dhalp...@apache.org> wrote: > >> [We should keep user list involved if that's where the discussion >> originally was :)] >> >> Jins George's original question was a good one. The right way to resume >> from the previous offset here is what we're already doing – use the >> KafkaCheckpointMark. In Beam, the runner maintains the state and not the >> external system. Beam runners are responsible for maintaining the >> checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If >> a user disables checkpointing, then they are explicitly opting into "redo >> all work" on restart. >> >> --> If checkpointing is enabled but the KafkaCheckpointMark is not being >> provided, then I'm inclined to agree with Amit that there may simply be a >> bug in the FlinkRunner. (+aljoscha) >> >> For what Mingmin Xu asked about: presumably if the Kafka source is >> initially configured to "read from latest offset", when it restarts with no >> checkpoint this will automatically go find the latest offset. That would >> mimic at-most-once semantics in a buggy runner that did not provide >> checkpointing. >> >> Dan >> >> On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote: >> >>> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory. >>> Can it restore during job restart? --Not test the runner in streaming for >>> some time. >>> >>> Regarding to data-completeness, I would use at-most-once when few data >>> missing(mostly tasknode failure) is tolerated, compared to the performance >>> cost introduced by 'state'/'checkpoint'. >>> >>> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote: >>> >>>> On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote: >>>> >>>>> Move discuss to dev-list >>>>> >>>>> Savepoint in Flink, also checkpoint in Spark, should be good enough to >>>>> handle this case. >>>>> >>>>> When people don't enable these features, for example only need >>>> at-most-once >>>>> >>>> The Spark runner forces checkpointing on any streaming (Beam) >>> application, >>>> mostly because it uses mapWithState for reading from UnboundedSource and >>>> updateStateByKey form GroupByKey - so by design, Spark runner is >>>> at-least-once. Generally, I always thought that applications that >>> require >>>> at-most-once are more focused on processing time only, as they only care >>>> about whatever get's ingested into the pipeline at a specific time and >>>> don't care (up to the point of losing data) about correctness. >>>> I would be happy to hear more about your use case. >>>> >>>>> semantic, each unbounded IO should try its best to restore from last >>>>> offset, although CheckpointMark is null. Any ideas? >>>>> >>>>> Mingmin >>>>> >>>>> On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org> >>>> wrote: >>>>> >>>>>> hey, >>>>>> >>>>>> The native Beam UnboundedSource API supports resuming from >>> checkpoint >>>> -- >>>>>> that specifically happens here >>>>>> < >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafk >>>> a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674> >>>>> when >>>>>> the KafkaCheckpointMark is non-null. >>>>>> >>>>>> The FlinkRunner should be providing the KafkaCheckpointMark from the >>>> most >>>>>> recent savepoint upon restore. >>>>>> >>>>>> There shouldn't be any "special" Flink runner support needed, nor is >>>> the >>>>>> State API involved. >>>>>> >>>>>> Dan >>>>>> >>>>>> On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré < >>> j...@nanthrax.net >>>>> >>>>>> wrote: >>>>>> >>>>>>> Would not it be Flink runner specific ? >>>>>>> >>>>>>> Maybe the State API could do the same in a runner agnostic way >>> (just >>>>>>> thinking loud) ? >>>>>>> >>>>>>> Regards >>>>>>> JB >>>>>>> >>>>>>> On 03/21/2017 04:56 PM, Mingmin Xu wrote: >>>>>>> >>>>>>>> From KafkaIO itself, looks like it either start_from_beginning or >>>>>>>> start_from_latest. It's designed to leverage >>>>>>>> `UnboundedSource.CheckpointMark` >>>>>>>> during initialization, but so far I don't see it's provided by >>>> runners. >>>>>>>> At the >>>>>>>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775 >>>>>>>> <https://issues.apache.org/jira/browse/BEAM-1775>) to handle it >>> in >>>>>>>> KafkaIO. >>>>>>>> >>>>>>>> Mingmin >>>>>>>> >>>>>>>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek < >>>> aljos...@apache.org >>>>>>>> <mailto:aljos...@apache.org>> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> Are you using Flink savepoints [1] when restoring your >>>> application? >>>>>>>> If you >>>>>>>> use this the Kafka offset should be stored in state and it >>> should >>>>>>>> restart >>>>>>>> from the correct position. >>>>>>>> >>>>>>>> Best, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>>>> setup/savepoints.html >>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3 >>>>>>>> /setup/savepoints.html> >>>>>>>>> On 21 Mar 2017, at 01:50, Jins George < >>> jins.geo...@aeris.net >>>>>>>> <mailto:jins.geo...@aeris.net>> wrote: >>>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I am writing a Beam pipeline(streaming) with Flink runner to >>>>>>>> consume data >>>>>>>> from Kafka and apply some transformations and persist to >>> Hbase. >>>>>>>>> >>>>>>>>> If I restart the application ( due to failure/manual >>> restart), >>>>>>>> consumer >>>>>>>> does not resume from the offset where it was prior to >>> restart. It >>>>>>>> always >>>>>>>> resume from the latest offset. >>>>>>>>> >>>>>>>>> If I enable Flink checkpionting with hdfs state back-end, >>>> system >>>>>>>> appears >>>>>>>> to be resuming from the earliest offset >>>>>>>>> >>>>>>>>> Is there a recommended way to resume from the offset where >>> it >>>> was >>>>>>>> stopped ? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Jins George >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> ---- >>>>>>>> Mingmin >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Jean-Baptiste Onofré >>>>>>> jbono...@apache.org >>>>>>> http://blog.nanthrax.net >>>>>>> Talend - http://www.talend.com >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> ---- >>>>> Mingmin >>>>> >>>> >>> >>> >>> >>> -- >>> ---- >>> Mingmin >>> >> >>