Currently /org.apache.beam.runners.flink.FlinkPipelineOptions/ does not have a way to configure externalized checkpoints. Is that something in the road map for FlinkRunner?

Thanks,
Jins George

On 03/23/2017 10:27 AM, Aljoscha Krettek wrote:
For this you would use externalised checkpoints: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html

Unfortunately, the doc is still a bit sparse but it’s basically a combination of savepoints and checkpoints. Checkpoints are not cleaned up when a job fails and you can restore from them as you would from a savepoint.

Best,
Aljoscha

On 22 Mar 2017, at 21:25, Jins George <jins.geo...@aeris.net <mailto:jins.geo...@aeris.net>> wrote:


Thanks Aljoscha for the clarification. Savepoints works fine in case of controlled stop and restart. In case of a failure( say the entire job failed due node crash or application software bug) is there a way to resume from the checkpoint on restarting the application ? Checkpoint location is configured with HDFS.

Thanks,
Jins George

On 03/22/2017 03:23 AM, Aljoscha Krettek wrote:
As I mentioned before, when running a Flink Job and simply cancelling it all state about that job is discarded (with some exceptions, such as externalised checkpoints). If you want the state of a Job to survive a cancellation you have to perform a savepoint [1] and then when restarting the Job you have to specify a savepoint from which you want to restore.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html

On 22 Mar 2017, at 01:43, Raghu Angadi <rang...@google.com.INVALID <mailto:rang...@google.com.INVALID>> wrote:

Expanding a bit more on what Dan wrote:

  - In Dataflow, there are two modes of restarting a job : regular stop
and then start & an *update*. The checkpoint is carried over only in the
  case of update.
  - Update is the only to keep 'exactly-once' semantics.
- If the requirements are not very strict, you can enable offset commits in Kafka itself. KafkaIO lets you configure this. Here the pipeline would start reading from approximately where it left off in the previous run.
     - When a offset commits are enabled, KafkaIO could this by
     implementing 'finalize()' API on KafkaCheckpointMark [1].
     - This is runner independent.
- The compromise is that this might skip a few records or read a few
     old records when the pipeline is restarted.
- This does not override 'resume from checkpoint' support when runner provides KafkaCheckpointMark. Externally committed offsets are used only
     when KafkaIO's own CheckpointMark is not available.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50

On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <dhalp...@apache.org> wrote:

[We should keep user list involved if that's where the discussion
originally was :)]

Jins George's original question was a good one. The right way to resume
from the previous offset here is what we're already doing – use the
KafkaCheckpointMark. In Beam, the runner maintains the state and not the
external system. Beam runners are responsible for maintaining the
checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If a user disables checkpointing, then they are explicitly opting into "redo
all work" on restart.

--> If checkpointing is enabled but the KafkaCheckpointMark is not being provided, then I'm inclined to agree with Amit that there may simply be a
bug in the FlinkRunner. (+aljoscha)

For what Mingmin Xu asked about: presumably if the Kafka source is
initially configured to "read from latest offset", when it restarts with no checkpoint this will automatically go find the latest offset. That would
mimic at-most-once semantics in a buggy runner that did not provide
checkpointing.

Dan

On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote:

In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory. Can it restore during job restart? --Not test the runner in streaming for
some time.

Regarding to data-completeness, I would use at-most-once when few data missing(mostly tasknode failure) is tolerated, compared to the performance
cost introduced by 'state'/'checkpoint'.

On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote:

On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote:

Move discuss to dev-list

Savepoint in Flink, also checkpoint in Spark, should be good enough to
handle this case.

When people don't enable these features, for example only need
at-most-once
The Spark runner forces checkpointing on any streaming (Beam)
application,
mostly because it uses mapWithState for reading from UnboundedSource and
updateStateByKey form GroupByKey - so by design, Spark runner is
at-least-once. Generally, I always thought that applications that
require
at-most-once are more focused on processing time only, as they only care about whatever get's ingested into the pipeline at a specific time and
don't care (up to the point of losing data) about correctness.
I would be happy to hear more about your use case.

semantic, each unbounded IO should try its best to restore from last
offset, although CheckpointMark is null. Any ideas?

Mingmin

On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org>
wrote:
hey,

The native Beam UnboundedSource API supports resuming from
checkpoint
--
that specifically happens here
<
https://github.com/apache/beam/blob/master/sdks/java/io/kafk
a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
when
the KafkaCheckpointMark is non-null.

The FlinkRunner should be providing the KafkaCheckpointMark from the
most
recent savepoint upon restore.

There shouldn't be any "special" Flink runner support needed, nor is
the
State API involved.

Dan

On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
j...@nanthrax.net
wrote:

Would not it be Flink runner specific ?

Maybe the State API could do the same in a runner agnostic way
(just
thinking loud) ?

Regards
JB

On 03/21/2017 04:56 PM, Mingmin Xu wrote:

From KafkaIO itself, looks like it either start_from_beginning or
start_from_latest. It's designed to leverage
`UnboundedSource.CheckpointMark`
during initialization, but so far I don't see it's provided by
runners.
At the
moment Flink savepoints is a good option, created a JIRA(BEAM-1775
<https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it
in
KafkaIO.

Mingmin

On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <
aljos...@apache.org
<mailto:aljos...@apache.org>> wrote:

   Hi,
   Are you using Flink savepoints [1] when restoring your
application?
If you
   use this the Kafka offset should be stored in state and it
should
restart
   from the correct position.

   Best,
   Aljoscha

   [1]
   https://ci.apache.org/projects/flink/flink-docs-release-1.3/
setup/savepoints.html
   <https://ci.apache.org/projects/flink/flink-docs-release-1.3
/setup/savepoints.html>
On 21 Mar 2017, at 01:50, Jins George <
jins.geo...@aeris.net
   <mailto:jins.geo...@aeris.net>> wrote:
Hello,

I am writing a Beam pipeline(streaming) with Flink runner to
consume data
   from Kafka and apply some transformations and persist to
Hbase.
If I restart the application ( due to failure/manual
restart),
consumer
   does not resume from the offset where it was prior to
restart. It
always
   resume from the latest offset.
If I enable Flink checkpionting with hdfs state back-end,
system
appears
   to be resuming from the earliest offset
Is there a recommended way to resume from the offset where
it
was
stopped ?
Thanks,
Jins George



--
----
Mingmin

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



--
----
Mingmin



--
----
Mingmin





Reply via email to