They way I understand this loop is that the ContinuiousFileReaderOperator
only processes records in the background while the operator is idle, i.e.
while it's not receiving any records.

At the very bottom of that loop here
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L358C5-L358C6>
it exits if the executor is no longer idle, i.e. there are incoming records.

If you look here
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407>,
the operator supports checkpointable input splits, meaning it'll save it's
place within a file split. This would only be possible if the reader can be
interrupted in the middle of a split. I have written custom splits that do
this exactly.

Darin

On Tue, Dec 5, 2023 at 11:31 AM Prabhu Joseph <prabhujose.ga...@gmail.com>
wrote:

> This is the loop - code reference
> <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L346
> >,
> where it fetches all records from the split, and then only the
> MailboxProcessor gets control to check other mail. This loop was introduced
> here
> <
> https://github.com/apache/flink/commit/1a69cb9fce629b0c458f5ea514d9ac8de008687f
> >
> .
>
>
>
>
> On Tue, Dec 5, 2023 at 9:00 PM Darin Amos <darin.a...@instacart.com
> .invalid>
> wrote:
>
> > I thought for sure this was already the existing behavior with this
> > operator. Does it not check the mailbox executor after every record read?
> >
> > On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) <j...@apache.org>
> > wrote:
> >
> > > Prabhu Joseph created FLINK-33753:
> > > -------------------------------------
> > >
> > >              Summary: ContinuousFileReaderOperator consume records as
> > mini
> > > batch
> > >                  Key: FLINK-33753
> > >                  URL:
> https://issues.apache.org/jira/browse/FLINK-33753
> > >              Project: Flink
> > >           Issue Type: Improvement
> > >     Affects Versions: 1.18.0
> > >             Reporter: Prabhu Joseph
> > >
> > >
> > > The ContinuousFileReaderOperator reads and collects the records from a
> > > split in a loop. If the split size is large, then the loop will take
> more
> > > time, and then the mailbox executor won't have a chance to process the
> > > checkpoint barrier. This leads to checkpoint timing out.
> > > ContinuousFileReaderOperator could be improved to consume the records
> in
> > a
> > > mini batch, similar to Hudi's StreamReadOperator (
> > > https://issues.apache.org/jira/browse/HUDI-2485).
> > >
> > >
> > >
> > > --
> > > This message was sent by Atlassian Jira
> > > (v8.20.10#820010)
> > >
> >
>

Reply via email to