I agree. Also create https://issues.apache.org/jira/browse/FLINK-34704 for
tracking and further discussion.


Best,
Zakelly

On Fri, Mar 15, 2024 at 2:59 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Posting this to dev as well...
>
> Thanks Zakelly,
> Sounds like a solution could be to add a new different version of yield
> that would actually yield to the checkpoint barrier too. That way operator
> implementations could decide whether any state modification may or may not
> have happened and can optionally allow checkpoint to be taken in the
> "middle of record  processing".
>
> Gyula
>
> On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan <zakelly....@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Processing checkpoint halfway through `processElement` is problematic.
>> The current element will not be included in the input in-flight data, and
>> we cannot assume it has taken effect on the state by user code. So the best
>> way is to treat `processElement` as an 'atomic' operation. I guess that's
>> why the priority of the cp barrier is set low.
>> However, the AsyncWaitOperator is a special case where we know the
>> element blocked at `addToWorkQueue` has not started triggering the
>> userFunction. Thus I'd suggest putting the element in the queue when the cp
>> barrier comes, and taking a snapshot of the whole queue afterwards. The
>> problem will be solved. But this approach also involves some code
>> modifications on the mailbox executor.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> Thank you for the detailed analysis Zakelly.
>>>
>>> I think we should consider whether yield should process checkpoint
>>> barriers because this puts quite a serious limitation on the unaligned
>>> checkpoints in these cases.
>>> Do you know what is the reason behind the current priority setting? Is
>>> there a problem with processing the barrier here?
>>>
>>> Gyula
>>>
>>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan <zakelly....@gmail.com>
>>> wrote:
>>>
>>>> Hi Gyula,
>>>>
>>>> Well I tried your example in local mini-cluster, and it seems the
>>>> source can take checkpoints but it will block in the following
>>>> AsyncWaitOperator. IIUC, the unaligned checkpoint barrier should wait until
>>>> the current `processElement` finishes its execution. In your example, the
>>>> element queue of `AsyncWaitOperator` will end up full and `processElement`
>>>> will be blocked at `addToWorkQueue`. Even though it will call
>>>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>>>> unprocessed since the priority of the barrier is -1, lower than the one
>>>> `yield()` should handle. I verified this using single-step debugging.
>>>>
>>>> And if one element could finish its async io, the cp barrier can be
>>>> processed afterwards. For example:
>>>> ```
>>>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>>>> env.getCheckpointConfig().setCheckpointInterval(10000);  // 10s interval
>>>> env.getConfig().setParallelism(1);
>>>> AsyncDataStream.orderedWait(
>>>>                 env.fromSequence(Long.MIN_VALUE,
>>>> Long.MAX_VALUE).shuffle(),
>>>>                 new AsyncFunction<Long, Long>() {
>>>>                     boolean first = true;
>>>>                     @Override
>>>>                     public void asyncInvoke(Long aLong,
>>>> ResultFuture<Long> resultFuture) {
>>>>                         if (first) {
>>>>
>>>> Executors.newSingleThreadExecutor().execute(() -> {
>>>>                                 try {
>>>>                                     Thread.sleep(20000); // process
>>>> after 20s, only for the first one.
>>>>                                 } catch (Throwable e) {}
>>>>                                 LOG.info("Complete one");
>>>>
>>>> resultFuture.complete(Collections.singleton(1L));
>>>>                             });
>>>>                             first = false;
>>>>                         }
>>>>                     }
>>>>                 },
>>>>                 24,
>>>>                 TimeUnit.HOURS,
>>>>                 1)
>>>>         .print();
>>>> ```
>>>> The checkpoint 1 can be normally finished after the "Complete one" log
>>>> print.
>>>>
>>>> I guess the users have no means to solve this problem, we might
>>>> optimize this later.
>>>>
>>>>
>>>> Best,
>>>> Zakelly
>>>>
>>>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <gyula.f...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey all!
>>>>>
>>>>> I encountered a strange and unexpected behaviour when trying to use
>>>>> unaligned checkpoints with AsyncIO.
>>>>>
>>>>> If the async operation queue is full and backpressures the pipeline
>>>>> completely, then unaligned checkpoints cannot be completed. To me this
>>>>> sounds counterintuitive because one of the benefits of the AsyncIO would 
>>>>> be
>>>>> that we can simply checkpoint the queue and not have to wait for the
>>>>> completion.
>>>>>
>>>>> To repro you can simply run:
>>>>>
>>>>> AsyncDataStream.orderedWait(
>>>>>     env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>>>>>     new AsyncFunction<Long, Long>() {
>>>>>         @Override
>>>>>         public void asyncInvoke(Long aLong, ResultFuture<Long>
>>>>> resultFuture) {}
>>>>>     },
>>>>>     24,
>>>>>     TimeUnit.HOURS,
>>>>>     1)
>>>>>     .print();
>>>>>
>>>>> This pipeline will completely backpressure the source and checkpoints
>>>>> do not progress even though they are unaligned. Already the source cannot
>>>>> take a checkpoint it seems which for me is surprising because this is 
>>>>> using
>>>>> the new source interface.
>>>>>
>>>>> Does anyone know why this happens and if there may be a solution?
>>>>>
>>>>> Thanks
>>>>> Gyula
>>>>>
>>>>

Reply via email to