Hi Marek,

That's a great question. The answer depends on whether you are using portability or the "classic" Runner:

Portability
===========

In portability, the SDF functionality includes the option for the Runner to split a given bundle such that the remaining current bundle's work will be minimized and the deferred remainder of the work will be maximized. That would be ideal for checkpointing as soon as possible [1].

Unfortunately, this is not yet implemented in the Flink Runner. That's why you are seeing the entire split finishing before the checkpoint.

Implementing this would mean to issue the split call upon checkpointing, making sure to checkpoint the remaining work, and resume it after the checkpoint has finished.

Perhaps others also could chime in, if there is anything else missing?

Classic
=======

AFAIK there is no way to do splitting while processing a split. The best option would be to create a custom UnboundedSource which creates smaller splits. The default is to use the parallelism for the number of splits. Depending on your source this may or may not be trivial.

Cheers,
Max

[1] https://github.com/apache/beam/blob/6266296ac037afc775735d4f08d25ffcc1a8e897/model/fn-execution/src/main/proto/beam_fn_api.proto#L421

On 07.02.20 12:18, [email protected] wrote:
Hi,
   I am using FileIO with continuously watching folder for new files to process. The problem is when flink starts reading 200MB file (around 3M elements) and also starts checkpointing. Checkpoint never finishes until WHOLE file is processed.

Minimal example :
https://github.com/seznam/beam/blob/simunek/failingCheckpoint/examples/java/src/main/java/org/apache/beam/examples/CheckpointFailingExample.java

My theory what could be wrong from my understanding :
CheckpointMark in this case starts from Create.ofProvider and then its propagated to downstream operators where it will be (in queue) behind all splits, which means all splits have to be read to successfully checkpoint the operator. The problem is even bigger when there are more files, then we need to wait for processing all files to successfully checkpoint.

1. Are my assumption correct?
2. Is there some possibility to improve behavior of SplittableDoFn (or subsequent reading from BoundedSource) for Flink to better propagate checkpoint barrier?

For now my fix is reading smaller files (30MB) one by one, by it’s not very future proof.

Versions:
Beam 2.17
Flink 1.9

Please correct my poor understanding of checkpointing with Beam and Flink and it would be wonderful if you have some advice what to improve or where to look.

Reply via email to