Expanding a bit more on what Dan wrote: - In Dataflow, there are two modes of restarting a job : regular stop and then start & an *update*. The checkpoint is carried over only in the case of update. - Update is the only to keep 'exactly-once' semantics. - If the requirements are not very strict, you can enable offset commits in Kafka itself. KafkaIO lets you configure this. Here the pipeline would start reading from approximately where it left off in the previous run. - When a offset commits are enabled, KafkaIO could this by implementing 'finalize()' API on KafkaCheckpointMark [1]. - This is runner independent. - The compromise is that this might skip a few records or read a few old records when the pipeline is restarted. - This does not override 'resume from checkpoint' support when runner provides KafkaCheckpointMark. Externally committed offsets are used only when KafkaIO's own CheckpointMark is not available.
[1]: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50 On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <dhalp...@apache.org> wrote: > [We should keep user list involved if that's where the discussion > originally was :)] > > Jins George's original question was a good one. The right way to resume > from the previous offset here is what we're already doing – use the > KafkaCheckpointMark. In Beam, the runner maintains the state and not the > external system. Beam runners are responsible for maintaining the > checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If > a user disables checkpointing, then they are explicitly opting into "redo > all work" on restart. > > --> If checkpointing is enabled but the KafkaCheckpointMark is not being > provided, then I'm inclined to agree with Amit that there may simply be a > bug in the FlinkRunner. (+aljoscha) > > For what Mingmin Xu asked about: presumably if the Kafka source is > initially configured to "read from latest offset", when it restarts with no > checkpoint this will automatically go find the latest offset. That would > mimic at-most-once semantics in a buggy runner that did not provide > checkpointing. > > Dan > > On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote: > >> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory. >> Can it restore during job restart? --Not test the runner in streaming for >> some time. >> >> Regarding to data-completeness, I would use at-most-once when few data >> missing(mostly tasknode failure) is tolerated, compared to the performance >> cost introduced by 'state'/'checkpoint'. >> >> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote: >> >> > On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote: >> > >> > > Move discuss to dev-list >> > > >> > > Savepoint in Flink, also checkpoint in Spark, should be good enough to >> > > handle this case. >> > > >> > > When people don't enable these features, for example only need >> > at-most-once >> > > >> > The Spark runner forces checkpointing on any streaming (Beam) >> application, >> > mostly because it uses mapWithState for reading from UnboundedSource and >> > updateStateByKey form GroupByKey - so by design, Spark runner is >> > at-least-once. Generally, I always thought that applications that >> require >> > at-most-once are more focused on processing time only, as they only care >> > about whatever get's ingested into the pipeline at a specific time and >> > don't care (up to the point of losing data) about correctness. >> > I would be happy to hear more about your use case. >> > >> > > semantic, each unbounded IO should try its best to restore from last >> > > offset, although CheckpointMark is null. Any ideas? >> > > >> > > Mingmin >> > > >> > > On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org> >> > wrote: >> > > >> > > > hey, >> > > > >> > > > The native Beam UnboundedSource API supports resuming from >> checkpoint >> > -- >> > > > that specifically happens here >> > > > < >> > > https://github.com/apache/beam/blob/master/sdks/java/io/kafk >> > a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674> >> > > when >> > > > the KafkaCheckpointMark is non-null. >> > > > >> > > > The FlinkRunner should be providing the KafkaCheckpointMark from the >> > most >> > > > recent savepoint upon restore. >> > > > >> > > > There shouldn't be any "special" Flink runner support needed, nor is >> > the >> > > > State API involved. >> > > > >> > > > Dan >> > > > >> > > > On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré < >> j...@nanthrax.net >> > > >> > > > wrote: >> > > > >> > > >> Would not it be Flink runner specific ? >> > > >> >> > > >> Maybe the State API could do the same in a runner agnostic way >> (just >> > > >> thinking loud) ? >> > > >> >> > > >> Regards >> > > >> JB >> > > >> >> > > >> On 03/21/2017 04:56 PM, Mingmin Xu wrote: >> > > >> >> > > >>> From KafkaIO itself, looks like it either start_from_beginning or >> > > >>> start_from_latest. It's designed to leverage >> > > >>> `UnboundedSource.CheckpointMark` >> > > >>> during initialization, but so far I don't see it's provided by >> > runners. >> > > >>> At the >> > > >>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775 >> > > >>> <https://issues.apache.org/jira/browse/BEAM-1775>) to handle it >> in >> > > >>> KafkaIO. >> > > >>> >> > > >>> Mingmin >> > > >>> >> > > >>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek < >> > aljos...@apache.org >> > > >>> <mailto:aljos...@apache.org>> wrote: >> > > >>> >> > > >>> Hi, >> > > >>> Are you using Flink savepoints [1] when restoring your >> > application? >> > > >>> If you >> > > >>> use this the Kafka offset should be stored in state and it >> should >> > > >>> restart >> > > >>> from the correct position. >> > > >>> >> > > >>> Best, >> > > >>> Aljoscha >> > > >>> >> > > >>> [1] >> > > >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >> > > >>> setup/savepoints.html >> > > >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3 >> > > >>> /setup/savepoints.html> >> > > >>> > On 21 Mar 2017, at 01:50, Jins George < >> jins.geo...@aeris.net >> > > >>> <mailto:jins.geo...@aeris.net>> wrote: >> > > >>> > >> > > >>> > Hello, >> > > >>> > >> > > >>> > I am writing a Beam pipeline(streaming) with Flink runner to >> > > >>> consume data >> > > >>> from Kafka and apply some transformations and persist to >> Hbase. >> > > >>> > >> > > >>> > If I restart the application ( due to failure/manual >> restart), >> > > >>> consumer >> > > >>> does not resume from the offset where it was prior to >> restart. It >> > > >>> always >> > > >>> resume from the latest offset. >> > > >>> > >> > > >>> > If I enable Flink checkpionting with hdfs state back-end, >> > system >> > > >>> appears >> > > >>> to be resuming from the earliest offset >> > > >>> > >> > > >>> > Is there a recommended way to resume from the offset where >> it >> > was >> > > >>> stopped ? >> > > >>> > >> > > >>> > Thanks, >> > > >>> > Jins George >> > > >>> >> > > >>> >> > > >>> >> > > >>> >> > > >>> -- >> > > >>> ---- >> > > >>> Mingmin >> > > >>> >> > > >> >> > > >> -- >> > > >> Jean-Baptiste Onofré >> > > >> jbono...@apache.org >> > > >> http://blog.nanthrax.net >> > > >> Talend - http://www.talend.com >> > > >> >> > > > >> > > > >> > > >> > > >> > > -- >> > > ---- >> > > Mingmin >> > > >> > >> >> >> >> -- >> ---- >> Mingmin >> > >