Thanks Aljoscha for the clarification. Savepoints works fine in case of
controlled stop and restart. In case of a failure( say the entire job
failed due node crash or application software bug) is there a way to
resume from the checkpoint on restarting the application ? Checkpoint
location is configured with HDFS.
Thanks,
Jins George
On 03/22/2017 03:23 AM, Aljoscha Krettek wrote:
As I mentioned before, when running a Flink Job and simply cancelling it all
state about that job is discarded (with some exceptions, such as externalised
checkpoints). If you want the state of a Job to survive a cancellation you have
to perform a savepoint [1] and then when restarting the Job you have to specify
a savepoint from which you want to restore.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
On 22 Mar 2017, at 01:43, Raghu Angadi <rang...@google.com.INVALID> wrote:
Expanding a bit more on what Dan wrote:
- In Dataflow, there are two modes of restarting a job : regular stop
and then start & an *update*. The checkpoint is carried over only in the
case of update.
- Update is the only to keep 'exactly-once' semantics.
- If the requirements are not very strict, you can enable offset commits
in Kafka itself. KafkaIO lets you configure this. Here the pipeline would
start reading from approximately where it left off in the previous run.
- When a offset commits are enabled, KafkaIO could this by
implementing 'finalize()' API on KafkaCheckpointMark [1].
- This is runner independent.
- The compromise is that this might skip a few records or read a few
old records when the pipeline is restarted.
- This does not override 'resume from checkpoint' support when runner
provides KafkaCheckpointMark. Externally committed offsets are used only
when KafkaIO's own CheckpointMark is not available.
[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50
On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <dhalp...@apache.org> wrote:
[We should keep user list involved if that's where the discussion
originally was :)]
Jins George's original question was a good one. The right way to resume
from the previous offset here is what we're already doing – use the
KafkaCheckpointMark. In Beam, the runner maintains the state and not the
external system. Beam runners are responsible for maintaining the
checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
a user disables checkpointing, then they are explicitly opting into "redo
all work" on restart.
--> If checkpointing is enabled but the KafkaCheckpointMark is not being
provided, then I'm inclined to agree with Amit that there may simply be a
bug in the FlinkRunner. (+aljoscha)
For what Mingmin Xu asked about: presumably if the Kafka source is
initially configured to "read from latest offset", when it restarts with no
checkpoint this will automatically go find the latest offset. That would
mimic at-most-once semantics in a buggy runner that did not provide
checkpointing.
Dan
On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <mingm...@gmail.com> wrote:
In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
Can it restore during job restart? --Not test the runner in streaming for
some time.
Regarding to data-completeness, I would use at-most-once when few data
missing(mostly tasknode failure) is tolerated, compared to the performance
cost introduced by 'state'/'checkpoint'.
On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <amitsel...@gmail.com> wrote:
On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <mingm...@gmail.com> wrote:
Move discuss to dev-list
Savepoint in Flink, also checkpoint in Spark, should be good enough to
handle this case.
When people don't enable these features, for example only need
at-most-once
The Spark runner forces checkpointing on any streaming (Beam)
application,
mostly because it uses mapWithState for reading from UnboundedSource and
updateStateByKey form GroupByKey - so by design, Spark runner is
at-least-once. Generally, I always thought that applications that
require
at-most-once are more focused on processing time only, as they only care
about whatever get's ingested into the pipeline at a specific time and
don't care (up to the point of losing data) about correctness.
I would be happy to hear more about your use case.
semantic, each unbounded IO should try its best to restore from last
offset, although CheckpointMark is null. Any ideas?
Mingmin
On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <dhalp...@apache.org>
wrote:
hey,
The native Beam UnboundedSource API supports resuming from
checkpoint
--
that specifically happens here
<
https://github.com/apache/beam/blob/master/sdks/java/io/kafk
a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
when
the KafkaCheckpointMark is non-null.
The FlinkRunner should be providing the KafkaCheckpointMark from the
most
recent savepoint upon restore.
There shouldn't be any "special" Flink runner support needed, nor is
the
State API involved.
Dan
On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
j...@nanthrax.net
wrote:
Would not it be Flink runner specific ?
Maybe the State API could do the same in a runner agnostic way
(just
thinking loud) ?
Regards
JB
On 03/21/2017 04:56 PM, Mingmin Xu wrote:
From KafkaIO itself, looks like it either start_from_beginning or
start_from_latest. It's designed to leverage
`UnboundedSource.CheckpointMark`
during initialization, but so far I don't see it's provided by
runners.
At the
moment Flink savepoints is a good option, created a JIRA(BEAM-1775
<https://issues.apache.org/jira/browse/BEAM-1775>) to handle it
in
KafkaIO.
Mingmin
On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <
aljos...@apache.org
<mailto:aljos...@apache.org>> wrote:
Hi,
Are you using Flink savepoints [1] when restoring your
application?
If you
use this the Kafka offset should be stored in state and it
should
restart
from the correct position.
Best,
Aljoscha
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/
setup/savepoints.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3
/setup/savepoints.html>
On 21 Mar 2017, at 01:50, Jins George <
jins.geo...@aeris.net
<mailto:jins.geo...@aeris.net>> wrote:
Hello,
I am writing a Beam pipeline(streaming) with Flink runner to
consume data
from Kafka and apply some transformations and persist to
Hbase.
If I restart the application ( due to failure/manual
restart),
consumer
does not resume from the offset where it was prior to
restart. It
always
resume from the latest offset.
If I enable Flink checkpionting with hdfs state back-end,
system
appears
to be resuming from the earliest offset
Is there a recommended way to resume from the offset where
it
was
stopped ?
Thanks,
Jins George
--
----
Mingmin
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
----
Mingmin
--
----
Mingmin