[ 
https://issues.apache.org/jira/browse/FLINK-6791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-6791:
---------------------------------
    Affects Version/s: 1.4.0

> Using MemoryStateBackend as checkpoint stream back-end may block 
> checkpoint/savepoint creation
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6791
>                 URL: https://issues.apache.org/jira/browse/FLINK-6791
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0, 1.2.1, 1.4.0
>            Reporter: Nico Kruber
>
> If the `MemoryStateBackend` is used as the checkpoint stream back-end in e.g. 
> RocksDBStateBackend, it will block further checkpoint/savepoint creation if 
> the checkpoint data reaches the back-end's max state size. In that case, an 
> error message is logged at the task manager but the save-/checkpoint never 
> completes and although the job continues, no further checkpoints will be made.
> Please see the following example that should be reproducible:
> {code:java}
> env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend(1000 * 
> 1024 * 1024, false), false));
> env.enableCheckpointing(100L);
> final long numKeys = 100_000L;
> DataStreamSource<Tuple1<Long>> source1 =
>               env.addSource(new RichParallelSourceFunction<Tuple1<Long>>() {
>                       private volatile boolean running = true;
>                       @Override
>                       public void run(SourceContext<Tuple1<Long>> ctx) throws 
> Exception {
>                               long counter = 0;
>                               while (running) {
>                                       synchronized (ctx.getCheckpointLock()) {
>                                               ctx.collect(Tuple1.of(counter % 
> numKeys));
>                                               counter++;
>                                       }
>                                       Thread.yield();
>                               }
>                       }
>                       @Override
>                       public void cancel() {
>                               running = false;
>                       }
>               });
> source1.keyBy(0)
>               .map(new RichMapFunction<Tuple1<Long>, Tuple1<Long>>() {
>                       private transient ValueState<List<Long>> val;
>                       @Override
>                       public Tuple1<Long> map(Tuple1<Long> value)
>                                       throws Exception {
>                               val.update(Collections.nCopies(100, value.f0));
>                               return value;
>                       }
>                       @Override
>                       public void open(final Configuration parameters) throws 
> Exception {
>                               ValueStateDescriptor<List<Long>> descriptor =
>                                               new ValueStateDescriptor<>(
>                                                               "data", // the 
> state name
>                                                               
> TypeInformation.of(new TypeHint<List<Long>>() {
>                                                               }) // type 
> information
>                                               );
>                               val = getRuntimeContext().getState(descriptor);
>                       }
>               }).uid("identity-map-with-state")
>               .addSink(new DiscardingSink<Tuple1<Long>>());
> env.execute("failingsnapshots");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to