[We should keep user list involved if that's where the discussion
originally was :)]

Jins George's original question was a good one. The right way to resume
from the previous offset here is what we're already doing – use the
KafkaCheckpointMark. In Beam, the runner maintains the state and not the
external system. Beam runners are responsible for maintaining the
checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
a user disables checkpointing, then they are explicitly opting into "redo
all work" on restart.

--> If checkpointing is enabled but the KafkaCheckpointMark is not being
provided, then I'm inclined to agree with Amit that there may simply be a
bug in the FlinkRunner. (+aljoscha)

For what Mingmin Xu asked about: presumably if the Kafka source is
initially configured to "read from latest offset", when it restarts with no
checkpoint this will automatically go find the latest offset. That would
mimic at-most-once semantics in a buggy runner that did not provide
checkpointing.

Dan

On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote:

> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
> Can it restore during job restart? --Not test the runner in streaming for
> some time.
>
> Regarding to data-completeness, I would use at-most-once when few data
> missing(mostly tasknode failure) is tolerated, compared to the performance
> cost introduced by 'state'/'checkpoint'.
>
> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
> > On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote:
> >
> > > Move discuss to dev-list
> > >
> > > Savepoint in Flink, also checkpoint in Spark, should be good enough to
> > > handle this case.
> > >
> > > When people don't enable these features, for example only need
> > at-most-once
> > >
> > The Spark runner forces checkpointing on any streaming (Beam)
> application,
> > mostly because it uses mapWithState for reading from UnboundedSource and
> > updateStateByKey form GroupByKey - so by design, Spark runner is
> > at-least-once. Generally, I always thought that applications that require
> > at-most-once are more focused on processing time only, as they only care
> > about whatever get's ingested into the pipeline at a specific time and
> > don't care (up to the point of losing data) about correctness.
> > I would be happy to hear more about your use case.
> >
> > > semantic, each unbounded IO should try its best to restore from last
> > > offset, although CheckpointMark is null. Any ideas?
> > >
> > > Mingmin
> > >
> > > On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org>
> > wrote:
> > >
> > > > hey,
> > > >
> > > > The native Beam UnboundedSource API supports resuming from checkpoint
> > --
> > > > that specifically happens here
> > > > <
> > > https://github.com/apache/beam/blob/master/sdks/java/io/kafk
> > a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
> > > when
> > > > the KafkaCheckpointMark is non-null.
> > > >
> > > > The FlinkRunner should be providing the KafkaCheckpointMark from the
> > most
> > > > recent savepoint upon restore.
> > > >
> > > > There shouldn't be any "special" Flink runner support needed, nor is
> > the
> > > > State API involved.
> > > >
> > > > Dan
> > > >
> > > > On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
> j...@nanthrax.net
> > >
> > > > wrote:
> > > >
> > > >> Would not it be Flink runner specific ?
> > > >>
> > > >> Maybe the State API could do the same in a runner agnostic way (just
> > > >> thinking loud) ?
> > > >>
> > > >> Regards
> > > >> JB
> > > >>
> > > >> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
> > > >>
> > > >>> From KafkaIO itself, looks like it either start_from_beginning or
> > > >>> start_from_latest. It's designed to leverage
> > > >>> `UnboundedSource.CheckpointMark`
> > > >>> during initialization, but so far I don't see it's provided by
> > runners.
> > > >>> At the
> > > >>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
> > > >>> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it
> in
> > > >>> KafkaIO.
> > > >>>
> > > >>> Mingmin
> > > >>>
> > > >>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <
> > aljos...@apache.org
> > > >>> <mailto:aljos...@apache.org>> wrote:
> > > >>>
> > > >>>     Hi,
> > > >>>     Are you using Flink savepoints [1] when restoring your
> > application?
> > > >>> If you
> > > >>>     use this the Kafka offset should be stored in state and it
> should
> > > >>> restart
> > > >>>     from the correct position.
> > > >>>
> > > >>>     Best,
> > > >>>     Aljoscha
> > > >>>
> > > >>>     [1]
> > > >>>     https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> > > >>> setup/savepoints.html
> > > >>>     <https://ci.apache.org/projects/flink/flink-docs-release-1.3
> > > >>> /setup/savepoints.html>
> > > >>>     > On 21 Mar 2017, at 01:50, Jins George <jins.geo...@aeris.net
> > > >>>     <mailto:jins.geo...@aeris.net>> wrote:
> > > >>>     >
> > > >>>     > Hello,
> > > >>>     >
> > > >>>     > I am writing a Beam pipeline(streaming) with Flink runner to
> > > >>> consume data
> > > >>>     from Kafka and apply some transformations and persist to Hbase.
> > > >>>     >
> > > >>>     > If I restart the application ( due to failure/manual
> restart),
> > > >>> consumer
> > > >>>     does not resume from the offset where it was prior to restart.
> It
> > > >>> always
> > > >>>     resume from the latest offset.
> > > >>>     >
> > > >>>     > If I enable Flink checkpionting with hdfs state back-end,
> > system
> > > >>> appears
> > > >>>     to be resuming from the earliest offset
> > > >>>     >
> > > >>>     > Is there a recommended way to resume from the offset where it
> > was
> > > >>> stopped ?
> > > >>>     >
> > > >>>     > Thanks,
> > > >>>     > Jins George
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> ----
> > > >>> Mingmin
> > > >>>
> > > >>
> > > >> --
> > > >> Jean-Baptiste Onofré
> > > >> jbono...@apache.org
> > > >> http://blog.nanthrax.net
> > > >> Talend - http://www.talend.com
> > > >>
> > > >
> > > >
> > >
> > >
> > > --
> > > ----
> > > Mingmin
> > >
> >
>
>
>
> --
> ----
> Mingmin
>

Reply via email to