Currently /org.apache.beam.runners.flink.FlinkPipelineOptions/ does not
have a way to configure externalized checkpoints. Is that something in
the road map for FlinkRunner?
Thanks,
Jins George
On 03/23/2017 10:27 AM, Aljoscha Krettek wrote:
For this you would use externalised checkpoints:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html
Unfortunately, the doc is still a bit sparse but it’s basically a
combination of savepoints and checkpoints. Checkpoints are not cleaned
up when a job fails and you can restore from them as you would from a
savepoint.
Best,
Aljoscha
On 22 Mar 2017, at 21:25, Jins George <jins.geo...@aeris.net
<mailto:jins.geo...@aeris.net>> wrote:
Thanks Aljoscha for the clarification. Savepoints works fine in case
of controlled stop and restart. In case of a failure( say the entire
job failed due node crash or application software bug) is there a way
to resume from the checkpoint on restarting the application ?
Checkpoint location is configured with HDFS.
Thanks,
Jins George
On 03/22/2017 03:23 AM, Aljoscha Krettek wrote:
As I mentioned before, when running a Flink Job and simply
cancelling it all state about that job is discarded (with some
exceptions, such as externalised checkpoints). If you want the state
of a Job to survive a cancellation you have to perform a savepoint
[1] and then when restarting the Job you have to specify a savepoint
from which you want to restore.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
On 22 Mar 2017, at 01:43, Raghu Angadi <rang...@google.com.INVALID
<mailto:rang...@google.com.INVALID>> wrote:
Expanding a bit more on what Dan wrote:
- In Dataflow, there are two modes of restarting a job : regular stop
and then start & an *update*. The checkpoint is carried over only
in the
case of update.
- Update is the only to keep 'exactly-once' semantics.
- If the requirements are not very strict, you can enable offset
commits
in Kafka itself. KafkaIO lets you configure this. Here the
pipeline would
start reading from approximately where it left off in the
previous run.
- When a offset commits are enabled, KafkaIO could this by
implementing 'finalize()' API on KafkaCheckpointMark [1].
- This is runner independent.
- The compromise is that this might skip a few records or read
a few
old records when the pipeline is restarted.
- This does not override 'resume from checkpoint' support when
runner
provides KafkaCheckpointMark. Externally committed offsets are
used only
when KafkaIO's own CheckpointMark is not available.
[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50
On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <dhalp...@apache.org>
wrote:
[We should keep user list involved if that's where the discussion
originally was :)]
Jins George's original question was a good one. The right way to
resume
from the previous offset here is what we're already doing – use the
KafkaCheckpointMark. In Beam, the runner maintains the state and
not the
external system. Beam runners are responsible for maintaining the
checkpoint marks, and for redoing all uncommitted (uncheckpointed)
work. If
a user disables checkpointing, then they are explicitly opting
into "redo
all work" on restart.
--> If checkpointing is enabled but the KafkaCheckpointMark is not
being
provided, then I'm inclined to agree with Amit that there may
simply be a
bug in the FlinkRunner. (+aljoscha)
For what Mingmin Xu asked about: presumably if the Kafka source is
initially configured to "read from latest offset", when it
restarts with no
checkpoint this will automatically go find the latest offset. That
would
mimic at-most-once semantics in a buggy runner that did not provide
checkpointing.
Dan
On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com>
wrote:
In SparkRunner, the default checkpoint storage is
TmpCheckpointDirFactory.
Can it restore during job restart? --Not test the runner in
streaming for
some time.
Regarding to data-completeness, I would use at-most-once when few
data
missing(mostly tasknode failure) is tolerated, compared to the
performance
cost introduced by 'state'/'checkpoint'.
On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com>
wrote:
On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com>
wrote:
Move discuss to dev-list
Savepoint in Flink, also checkpoint in Spark, should be good
enough to
handle this case.
When people don't enable these features, for example only need
at-most-once
The Spark runner forces checkpointing on any streaming (Beam)
application,
mostly because it uses mapWithState for reading from
UnboundedSource and
updateStateByKey form GroupByKey - so by design, Spark runner is
at-least-once. Generally, I always thought that applications that
require
at-most-once are more focused on processing time only, as they
only care
about whatever get's ingested into the pipeline at a specific
time and
don't care (up to the point of losing data) about correctness.
I would be happy to hear more about your use case.
semantic, each unbounded IO should try its best to restore from
last
offset, although CheckpointMark is null. Any ideas?
Mingmin
On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org>
wrote:
hey,
The native Beam UnboundedSource API supports resuming from
checkpoint
--
that specifically happens here
<
https://github.com/apache/beam/blob/master/sdks/java/io/kafk
a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
when
the KafkaCheckpointMark is non-null.
The FlinkRunner should be providing the KafkaCheckpointMark
from the
most
recent savepoint upon restore.
There shouldn't be any "special" Flink runner support needed,
nor is
the
State API involved.
Dan
On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
j...@nanthrax.net
wrote:
Would not it be Flink runner specific ?
Maybe the State API could do the same in a runner agnostic way
(just
thinking loud) ?
Regards
JB
On 03/21/2017 04:56 PM, Mingmin Xu wrote:
From KafkaIO itself, looks like it either
start_from_beginning or
start_from_latest. It's designed to leverage
`UnboundedSource.CheckpointMark`
during initialization, but so far I don't see it's provided by
runners.
At the
moment Flink savepoints is a good option, created a
JIRA(BEAM-1775
<https://issues.apache.org/jira/browse/BEAM-1775>) to handle it
in
KafkaIO.
Mingmin
On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <
aljos...@apache.org
<mailto:aljos...@apache.org>> wrote:
Hi,
Are you using Flink savepoints [1] when restoring your
application?
If you
use this the Kafka offset should be stored in state and it
should
restart
from the correct position.
Best,
Aljoscha
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/
setup/savepoints.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3
/setup/savepoints.html>
On 21 Mar 2017, at 01:50, Jins George <
jins.geo...@aeris.net
<mailto:jins.geo...@aeris.net>> wrote:
Hello,
I am writing a Beam pipeline(streaming) with Flink runner to
consume data
from Kafka and apply some transformations and persist to
Hbase.
If I restart the application ( due to failure/manual
restart),
consumer
does not resume from the offset where it was prior to
restart. It
always
resume from the latest offset.
If I enable Flink checkpionting with hdfs state back-end,
system
appears
to be resuming from the earliest offset
Is there a recommended way to resume from the offset where
it
was
stopped ?
Thanks,
Jins George
--
----
Mingmin
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
----
Mingmin
--
----
Mingmin