Hi fanrui,

> How to identify legacySource?

legacy sources are always using the SourceStreamTask class and
SourceStreamTask is used only for legacy sources. But I'm not sure how to
enable/disable that. Adding `disableOverdraft()` call in SourceStreamTask
would be better compared to relying on the `getAvailableFuture()` call
(isn't it used for back pressure metric anyway?). Ideally we should
enable/disable it in the constructors, but that might be tricky.

> I prefer it to be between 5 and 10

I would vote for a smaller value because of FLINK-13203

Piotrek



czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a):

> Hi,
>
> Thanks a lot for your discussion.
>
> After several discussions, I think it's clear now. I updated the
> "Proposed Changes" of FLIP-227[1]. If I have something
> missing, please help to add it to FLIP, or add it in the mail
> and I can add it to FLIP. If everything is OK, I will create a
> new JIRA for the first task, and use FLINK-26762[2] as the
> second task.
>
> About the legacy source, do we set maxOverdraftBuffersPerGate=0
> directly? How to identify legacySource? Or could we add
> the overdraftEnabled in LocalBufferPool? The default value
> is false. If the getAvailableFuture is called, change
> overdraftEnabled=true.
> It indicates whether there are checks isAvailable elsewhere.
> It might be more general, it can cover more cases.
>
> Also, I think the default value of 'max-overdraft-buffers-per-gate'
> needs to be confirmed. I prefer it to be between 5 and 10. How
> do you think?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
> [2] https://issues.apache.org/jira/browse/FLINK-26762
>
> Thanks
> fanrui
>
> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi again,
>>
>> After sleeping over this, if both versions (reserve and overdraft) have
>> the same complexity, I would also prefer the overdraft.
>>
>> > `Integer.MAX_VALUE` as default value was my idea as well but now, as
>> > Dawid mentioned, I think it is dangerous since it is too implicit for
>> > the user and if the user submits one more job for the same TaskManger
>>
>> As I mentioned, it's not only an issue with multiple jobs. The same
>> problem can happen with different subtasks from the same job, potentially
>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I would be
>> in favour of Integer.MAX_VALUE to be the default value, but as it is, I
>> think we should indeed play on the safe side and limit it.
>>
>> > I still don't understand how should be limited "reserve" implementation.
>> > I mean if we have X buffers in total and the user sets overdraft equal
>> > to X we obviously can not reserve all buffers, but how many we are
>> > allowed to reserve? Should it be a different configuration like
>> > percentegeForReservedBuffers?
>>
>> The reserve could be defined as percentage, or as a fixed number of
>> buffers. But yes. In normal operation subtask would not use the reserve, as
>> if numberOfAvailableBuffers < reserve, the output would be not available.
>> Only in the flatMap/timers/huge records case the reserve could be used.
>>
>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers, will
>> LocalBufferPool never be available? Can't process data?
>>
>> Of course we would need to make sure that never happens. So the reserve
>> should be < total buffer size.
>>
>> > 2. If the overdraft buffer use the extra buffers, when the downstream
>> > task inputBuffer is insufficient, it should fail to start the job, and
>> then
>> > restart? When the InputBuffer is initialized, it will apply for enough
>> > buffers, right?
>>
>> The failover if downstream can not allocate buffers is already
>> implemented FLINK-14872 [2]. There is a timeout for how long the task is
>> waiting for buffer allocation. However this doesn't prevent many
>> (potentially infinitely many) deadlock/restarts cycles. IMO the propper
>> solution for [1] would be 2b described in the ticket:
>>
>> > 2b. Assign extra buffers only once all of the tasks are RUNNING. This
>> is a simplified version of 2a, without tracking the tasks sink-to-source.
>>
>> But that's a pre-existing problem and I don't think we have to solve it
>> before implementing overdraft. I think we would need to solve it only
>> before setting Integer.MAX_VALUE as the default for the overdraft. Maybe I
>> would hesitate setting the overdraft to anything more then a couple of
>> buffers by default for the same reason.
>>
>> > Actually, I totally agree that we don't need a lot of buffers for
>> overdraft
>>
>> and
>>
>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
>> > When we finish this feature and after users use it, if users feedback
>> > this issue we can discuss again.
>>
>> +1
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13203
>> [2] https://issues.apache.org/jira/browse/FLINK-14872
>>
>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):
>>
>>> Hi everyone,
>>>
>>> I still have some questions.
>>>
>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers, will
>>> LocalBufferPool never be available? Can't process data?
>>> 2. If the overdraft buffer use the extra buffers, when the downstream
>>> task inputBuffer is insufficient, it should fail to start the job, and
>>> then
>>> restart? When the InputBuffer is initialized, it will apply for enough
>>> buffers, right?
>>>
>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
>>> When we finish this feature and after users use it, if users feedback
>>> this issue we can discuss again.
>>>
>>> Thanks
>>> fanrui
>>>
>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <dwysakow...@apache.org>
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I have not replied in the thread yet, but I was following the
>>>> discussion.
>>>>
>>>> Personally, I like Fanrui's and Anton's idea. As far as I understand it
>>>> the idea to distinguish between inside flatMap & outside would be
>>>> fairly
>>>> simple, but maybe slightly indirect. The checkAvailability would remain
>>>> unchanged and it is checked always between separate invocations of the
>>>> UDF. Therefore the overdraft buffers would not apply there. However
>>>> once
>>>> the pool says it is available, it means it has at least an initial
>>>> buffer. So any additional request without checking for availability can
>>>> be considered to be inside of processing a single record. This does not
>>>> hold just for the LegacySource as I don't think it actually checks for
>>>> the availability of buffers in the LocalBufferPool.
>>>>
>>>> In the offline chat with Anton, we also discussed if we need a limit of
>>>> the number of buffers we could overdraft (or in other words if the
>>>> limit
>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to
>>>> stay
>>>> on the safe side and have it limited. The pool of network buffers is
>>>> shared for the entire TaskManager, so it means it can be shared even
>>>> across tasks of separate jobs. However, I might be just unnecessarily
>>>> cautious here.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 04/05/2022 10:54, Piotr Nowojski wrote:
>>>> > Hi,
>>>> >
>>>> > Thanks for the answers.
>>>> >
>>>> >> we may still need to discuss whether the
>>>> >> overdraft/reserve/spare should use extra buffers or buffers
>>>> >> in (exclusive + floating buffers)?
>>>> > and
>>>> >
>>>> >> These things resolve the different problems (at least as I see that).
>>>> >> The current hardcoded "1"  says that we switch "availability" to
>>>> >> "unavailability" when one more buffer is left(actually a little less
>>>> >> than one buffer since we write the last piece of data to this last
>>>> >> buffer). The overdraft feature doesn't change this logic we still
>>>> want
>>>> >> to switch to "unavailability" in such a way but if we are already in
>>>> >> "unavailability" and we want more buffers then we can take "overdraft
>>>> >> number" more. So we can not avoid this hardcoded "1" since we need to
>>>> >> understand when we should switch to "unavailability"
>>>> > Ok, I see. So it seems to me that both of you have in mind to keep the
>>>> > buffer pools as they are right now, but if we are in the middle of
>>>> > processing a record, we can request extra overdraft buffers on top of
>>>> > those? This is another way to implement the overdraft to what I was
>>>> > thinking. I was thinking about something like keeping the "overdraft"
>>>> or
>>>> > more precisely buffer "reserve" in the buffer pool. I think my version
>>>> > would be easier to implement, because it is just fiddling with min/max
>>>> > buffers calculation and slightly modified `checkAvailability()` logic.
>>>> >
>>>> > On the other hand  what you have in mind would better utilise the
>>>> available
>>>> > memory, right? It would require more code changes (how would we know
>>>> when
>>>> > we are allowed to request the overdraft?). However, in this case, I
>>>> would
>>>> > be tempted to set the number of overdraft buffers by default to
>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers as
>>>> > necessary. The only downside that I can think of (apart of higher
>>>> > complexity) would be higher chance of hitting a known/unsolved
>>>> deadlock [1]
>>>> > in a scenario:
>>>> > - downstream task hasn't yet started
>>>> > - upstream task requests overdraft and uses all available memory
>>>> segments
>>>> > from the global pool
>>>> > - upstream task is blocked, because downstream task hasn't started
>>>> yet and
>>>> > can not consume any data
>>>> > - downstream task tries to start, but can not, as there are no
>>>> available
>>>> > buffers
>>>> >
>>>> >> BTW, for watermark, the number of buffers it needs is
>>>> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
>>>> >> the watermark won't block in requestMemory.
>>>> > and
>>>> >
>>>> >> the best overdraft size will be equal to parallelism.
>>>> > That's a lot of buffers. I don't think we need that many for
>>>> broadcasting
>>>> > watermarks. Watermarks are small, and remember that every
>>>> subpartition has
>>>> > some partially filled/empty WIP buffer, so the vast majority of
>>>> > subpartitions will not need to request a new buffer.
>>>> >
>>>> > Best,
>>>> > Piotrek
>>>> >
>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203
>>>> >
>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com>
>>>> napisał(a):
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >>
>>>> >>   >> Do you mean to ignore it while processing records, but keep
>>>> using
>>>> >> `maxBuffersPerChannel` when calculating the availability of the
>>>> output?
>>>> >>
>>>> >>
>>>> >> Yes, it is correct.
>>>> >>
>>>> >>
>>>> >>   >> Would it be a big issue if we changed it to check if at least
>>>> >> "overdraft number of buffers are available", where "overdraft
>>>> number" is
>>>> >> configurable, instead of the currently hardcoded value of "1"?
>>>> >>
>>>> >>
>>>> >> These things resolve the different problems (at least as I see that).
>>>> >> The current hardcoded "1"  says that we switch "availability" to
>>>> >> "unavailability" when one more buffer is left(actually a little less
>>>> >> than one buffer since we write the last piece of data to this last
>>>> >> buffer). The overdraft feature doesn't change this logic we still
>>>> want
>>>> >> to switch to "unavailability" in such a way but if we are already in
>>>> >> "unavailability" and we want more buffers then we can take "overdraft
>>>> >> number" more. So we can not avoid this hardcoded "1" since we need to
>>>> >> understand when we should switch to "unavailability"
>>>> >>
>>>> >>
>>>> >> -- About "reserve" vs "overdraft"
>>>> >>
>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will be
>>>> >> equal to parallelism. Also, the user can set any value he wants. So
>>>> even
>>>> >> if parallelism is small(~5) but the user's flatmap produces a lot of
>>>> >> data, the user can set 10 or even more. Which almost double the max
>>>> >> buffers and it will be impossible to reserve. At least we need to
>>>> figure
>>>> >> out how to protect from such cases (the limit for an overdraft?). So
>>>> >> actually it looks even more difficult than increasing the maximum
>>>> buffers.
>>>> >>
>>>> >> I want to emphasize that overdraft buffers are soft configuration
>>>> which
>>>> >> means it takes as many buffers as the global buffers pool has
>>>> >> available(maybe zero) but less than this configured value. It is also
>>>> >> important to notice that perhaps, not many subtasks in TaskManager
>>>> will
>>>> >> be using this feature so we don't actually need a lot of available
>>>> >> buffers for every subtask(Here, I mean that if we have only one
>>>> >> window/flatmap operator and many other operators, then one
>>>> TaskManager
>>>> >> will have many ordinary subtasks which don't actually need overdraft
>>>> and
>>>> >> several subtasks that needs this feature). But in case of
>>>> reservation,
>>>> >> we will reserve some buffers for all operators even if they don't
>>>> really
>>>> >> need it.
>>>> >>
>>>> >>
>>>> >> -- Legacy source problem
>>>> >>
>>>> >> If we still want to change max buffers then it is problem for
>>>> >> LegacySources(since every subtask of source will always use these
>>>> >> overdraft). But right now, I think that we can force to set 0
>>>> overdraft
>>>> >> buffers for legacy subtasks in configuration during execution(if it
>>>> is
>>>> >> not too late for changing configuration in this place).
>>>> >>
>>>> >>
>>>> >> 03.05.2022 14:11, rui fan пишет:
>>>> >>> Hi
>>>> >>>
>>>> >>> Thanks for Martijn Visser and Piotrek's feedback.  I agree with
>>>> >>> ignoring the legacy source, it will affect our design. User should
>>>> >>> use the new Source Api as much as possible.
>>>> >>>
>>>> >>> Hi Piotrek, we may still need to discuss whether the
>>>> >>> overdraft/reserve/spare should use extra buffers or buffers
>>>> >>> in (exclusive + floating buffers)? They have some differences.
>>>> >>>
>>>> >>> If it uses extra buffers:
>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1
>>>> >>>    <= currentPoolSize) and all subpartitions don't reach the
>>>> >>> maxBuffersPerChannel.
>>>> >>>
>>>> >>> If it uses the buffers in (exclusive + floating buffers):
>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers +
>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions
>>>> >>> don't reach the maxBuffersPerChannel.
>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), the
>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be
>>>> >>> easily unavailable. For throughput, if users turn up the
>>>> >>> overdraft buffers, they need to turn up exclusive or floating
>>>> >>> buffers. It also affects the InputChannel, and it's is unfriendly
>>>> >>> to users.
>>>> >>>
>>>> >>> So I prefer the overdraft to use extra buffers.
>>>> >>>
>>>> >>>
>>>> >>> BTW, for watermark, the number of buffers it needs is
>>>> >>> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
>>>> >>> the watermark won't block in requestMemory. But it has
>>>> >>> 2 problems:
>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses
>>>> >>> (exclusive + floating buffers),  there will be fewer buffers
>>>> >>> available. Throughput may be affected.
>>>> >>> 2. The numberOfSubpartitions is different for each Task.
>>>> >>> So if users want to cover watermark using this feature,
>>>> >>> they don't know how to set the overdraftBuffers more r
>>>> >>> easonably. And if the parallelism is changed, users still
>>>> >>> need to change overdraftBuffers. It is unfriendly to users.
>>>> >>>
>>>> >>> So I propose we support overdraftBuffers=-1, It means
>>>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions
>>>> >>> in the Constructor of LocalBufferPool.
>>>> >>>
>>>> >>> Please correct me if I'm wrong.
>>>> >>>
>>>> >>> Thanks
>>>> >>> fanrui
>>>> >>>
>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <pnowoj...@apache.org
>>>> >
>>>> >> wrote:
>>>> >>>> Hi fanrui,
>>>> >>>>
>>>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive
>>>> >> buffers *
>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be
>>>> available
>>>> >>>> when
>>>> >>>>> (usedBuffers+overdraftBuffers <=
>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers)
>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, right?
>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum
>>>> number of
>>>> >> the
>>>> >>>> required buffers, just as we did when we were implementing the non
>>>> >> blocking
>>>> >>>> outputs and adding availability logic to LocalBufferPool. Back
>>>> then we
>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this logic
>>>> is
>>>> >>>> located
>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
>>>> >>>>
>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
>>>> numSubpartitions + 1;
>>>> >>>> For performance reasons, we always require at least one buffer per
>>>> >>>> sub-partition. Otherwise performance falls drastically. Now if we
>>>> >> require 5
>>>> >>>> overdraft buffers for output to be available, we need to have them
>>>> on
>>>> >> top
>>>> >>>> of those "one buffer per sub-partition". So the logic should be
>>>> changed
>>>> >> to:
>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
>>>> numSubpartitions +
>>>> >>>> numOverdraftBuffers;
>>>> >>>>
>>>> >>>> Regarding increasing the number of max buffers I'm not sure. As
>>>> long as
>>>> >>>> "overdraft << max number of buffers", because all buffers on the
>>>> outputs
>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft
>>>> buffers,
>>>> >> and
>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of
>>>> things if
>>>> >> we
>>>> >>>> make the output available if at least one single buffer is
>>>> available or
>>>> >> at
>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So
>>>> effects of
>>>> >>>> increasing the overdraft from 1 to for example 5 should be
>>>> negligible.
>>>> >> For
>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 still
>>>> >> increases
>>>> >>>> the overdraft by only about 25%. So maybe we can keep the max as
>>>> it is?
>>>> >>>>
>>>> >>>> If so, maybe we should change the name from "overdraft" to "buffer
>>>> >> reserve"
>>>> >>>> or "spare buffers"? And document it as "number of buffers kept in
>>>> >> reserve
>>>> >>>> in case of flatMap/firing timers/huge records"?
>>>> >>>>
>>>> >>>> What do you think Fenrui, Anton?
>>>> >>>>
>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the new
>>>> >> features,
>>>> >>>> as long as we don't brake the existing deployments too much.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Piotrek
>>>> >>>>
>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com>
>>>> >> napisał(a):
>>>> >>>>> Hi everyone,
>>>> >>>>>
>>>> >>>>> Just wanted to chip in on the discussion of legacy sources: IMHO,
>>>> we
>>>> >>>> should
>>>> >>>>> not focus too much on improving/adding capabilities for legacy
>>>> sources.
>>>> >>>> We
>>>> >>>>> want to persuade and push users to use the new Source API. Yes,
>>>> this
>>>> >>>> means
>>>> >>>>> that there's work required by the end users to port any custom
>>>> source
>>>> >> to
>>>> >>>>> the new interface. The benefits of the new Source API should
>>>> outweigh
>>>> >>>> this.
>>>> >>>>> Anything that we build to support multiple interfaces means
>>>> adding more
>>>> >>>>> complexity and more possibilities for bugs. Let's try to make our
>>>> >> lives a
>>>> >>>>> little bit easier.
>>>> >>>>>
>>>> >>>>> Best regards,
>>>> >>>>>
>>>> >>>>> Martijn Visser
>>>> >>>>> https://twitter.com/MartijnVisser82
>>>> >>>>> https://github.com/MartijnVisser
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com>
>>>> wrote:
>>>> >>>>>
>>>> >>>>>> Hi Piotrek
>>>> >>>>>>
>>>> >>>>>>> Do you mean to ignore it while processing records, but keep
>>>> using
>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the
>>>> >>>> output?
>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double check.
>>>> >>>>>>
>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a big
>>>> >>>>> problem
>>>> >>>>>>> that legacy sources would be ignoring it? Note that we already
>>>> have
>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single
>>>> >>>> buffer
>>>> >>>>>>> available and this works the same for all tasks (including
>>>> legacy
>>>> >>>>> source
>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at
>>>> least
>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft
>>>> number"
>>>> >>>>> is
>>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"?
>>>> >>>>>> Do you mean don't add the extra buffers? We just use (exclusive
>>>> >>>> buffers *
>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be
>>>> available
>>>> >>>>> when
>>>> >>>>>> (usedBuffers+overdraftBuffers <=
>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers)
>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel,
>>>> right?
>>>> >>>>>>
>>>> >>>>>> If yes, I think it can solve the problem of legacy source. There
>>>> may
>>>> >> be
>>>> >>>>>> some impact. If overdraftBuffers is large and only one buffer is
>>>> used
>>>> >>>> to
>>>> >>>>>> process a single record, exclusive buffers*parallelism + floating
>>>> >>>> buffers
>>>> >>>>>> cannot be used. It may only be possible to use (exclusive
>>>> buffers *
>>>> >>>>>> parallelism
>>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, if
>>>> turn
>>>> >> up
>>>> >>>>> the
>>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive or
>>>> >>>> floating
>>>> >>>>>> buffers. And it also affects the InputChannel.
>>>> >>>>>>
>>>> >>>>>> If not, I don't think it can solve the problem of legacy source.
>>>> The
>>>> >>>>> legacy
>>>> >>>>>> source don't check isAvailable, If there are the extra buffers,
>>>> legacy
>>>> >>>>>> source
>>>> >>>>>> will use them up until block in requestMemory.
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> Thanks
>>>> >>>>>> fanrui
>>>> >>>>>>
>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <
>>>> pnowoj...@apache.org>
>>>> >>>>>> wrote:
>>>> >>>>>>
>>>> >>>>>>> Hi,
>>>> >>>>>>>
>>>> >>>>>>> +1 for the general proposal from my side. It would be a nice
>>>> >>>> workaround
>>>> >>>>>>> flatMaps, WindowOperators and large records issues with
>>>> unaligned
>>>> >>>>>>> checkpoints.
>>>> >>>>>>>
>>>> >>>>>>>> The first task is about ignoring max buffers per channel. This
>>>> >>>> means
>>>> >>>>> if
>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the
>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just
>>>> ignore
>>>> >>>> that
>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has
>>>> it(it is
>>>> >>>>>>>> actually not a overdraft).
>>>> >>>>>>> Do you mean to ignore it while processing records, but keep
>>>> using
>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the
>>>> >>>> output?
>>>> >>>>>>>> The second task is about the real overdraft. I am pretty
>>>> convinced
>>>> >>>>> now
>>>> >>>>>>>> that we, unfortunately, need configuration for limitation of
>>>> >>>>> overdraft
>>>> >>>>>>>> number(because it is not ok if one subtask allocates all
>>>> buffers of
>>>> >>>>> one
>>>> >>>>>>>> TaskManager considering that several different jobs can be
>>>> >>>> submitted
>>>> >>>>> on
>>>> >>>>>>>> this TaskManager). So idea is to have
>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>>>> >>>>>> LocalBufferPool).
>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is
>>>> >>>> reached,
>>>> >>>>>>>> LocalBufferPool can request additionally from
>>>> NetworkBufferPool up
>>>> >>>> to
>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a big
>>>> >>>>> problem
>>>> >>>>>>> that legacy sources would be ignoring it? Note that we already
>>>> have
>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single
>>>> >>>> buffer
>>>> >>>>>>> available and this works the same for all tasks (including
>>>> legacy
>>>> >>>>> source
>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at
>>>> least
>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft
>>>> number"
>>>> >>>>> is
>>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"?
>>>> >>>>>>>
>>>> >>>>>>> Best,
>>>> >>>>>>> Piotrek
>>>> >>>>>>>
>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com>
>>>> napisał(a):
>>>> >>>>>>>
>>>> >>>>>>>> Let me add some information about the LegacySource.
>>>> >>>>>>>>
>>>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource.
>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
>>>> >>>>>>>> The default value is false. If the getAvailableFuture is
>>>> called,
>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there are
>>>> >>>>>>>> checks isAvailable elsewhere.
>>>> >>>>>>>>
>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct me
>>>> if
>>>> >>>> I'm
>>>> >>>>>>> wrong.
>>>> >>>>>>>> Thanks
>>>> >>>>>>>> fanrui
>>>> >>>>>>>>
>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com
>>>> >
>>>> >>>>> wrote:
>>>> >>>>>>>>> Hi,
>>>> >>>>>>>>>
>>>> >>>>>>>>> Thanks for your quick response.
>>>> >>>>>>>>>
>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need to
>>>> >>>>> discuss
>>>> >>>>>>> the
>>>> >>>>>>>>> default value in PR.
>>>> >>>>>>>>>
>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for
>>>> general
>>>> >>>>>>>>> implementation.
>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in
>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common
>>>> >>>>> LegacySource,
>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume
>>>> >>>> kafka,
>>>> >>>>> so
>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems.
>>>> >>>>>>>>>
>>>> >>>>>>>>> Core code:
>>>> >>>>>>>>> ```
>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() {
>>>> >>>>>>>>>        if (recordWriter == null
>>>> >>>>>>>>>             ||
>>>> >>>>>>>>>
>>>> >>
>>>> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
>>>> >>>>>>>>> false)
>>>> >>>>>>>>>             || recordWriter.isAvailable()) {
>>>> >>>>>>>>>             return;
>>>> >>>>>>>>>        }
>>>> >>>>>>>>>
>>>> >>>>>>>>>        CompletableFuture<?> resumeFuture =
>>>> >>>>>>>> recordWriter.getAvailableFuture();
>>>> >>>>>>>>>        try {
>>>> >>>>>>>>>             resumeFuture.get();
>>>> >>>>>>>>>        } catch (Throwable ignored) {
>>>> >>>>>>>>>        }
>>>> >>>>>>>>> }
>>>> >>>>>>>>> ```
>>>> >>>>>>>>>
>>>> >>>>>>>>> LegacySource calls
>>>> sourceContext.ensureRecordWriterIsAvailable()
>>>> >>>>>>>>> before synchronized (checkpointLock) and collects records.
>>>> >>>>>>>>> Please let me know if there is a better solution.
>>>> >>>>>>>>>
>>>> >>>>>>>>> Thanks
>>>> >>>>>>>>> fanrui
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
>>>> >>>>>> kaa....@yandex.com>
>>>> >>>>>>>>> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>>> Hi.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or two
>>>> >>>>>> commits
>>>> >>>>>>>> in a
>>>> >>>>>>>>>>       PR?
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this task
>>>> has
>>>> >>>>>> fewer
>>>> >>>>>>>>>> questions but we should find a solution for LegacySource
>>>> first.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> --  2. For the first task, if the flink user disables the
>>>> >>>>> Unaligned
>>>> >>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
>>>> Because
>>>> >>>> the
>>>> >>>>>>>>>> overdraft
>>>> >>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
>>>> needs to
>>>> >>>>> wait
>>>> >>>>>>> for
>>>> >>>>>>>>>>       downstream Task to consume.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> I think that the logic should be the same for AC and UC. As I
>>>> >>>>>>>> understand,
>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it
>>>> doesn't
>>>> >>>>> make
>>>> >>>>>>> it
>>>> >>>>>>>>>> worse as well.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>     3. For the second task
>>>> >>>>>>>>>> --      - The default value of
>>>> maxOverdraftBuffersPerPartition
>>>> >>>> may
>>>> >>>>>>> also
>>>> >>>>>>>>>> need
>>>> >>>>>>>>>>          to be discussed.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> I think it should be a pretty small value or even 0 since it
>>>> >>>> kind
>>>> >>>>> of
>>>> >>>>>>>>>> optimization and user should understand what they
>>>> do(especially
>>>> >>>> if
>>>> >>>>>> we
>>>> >>>>>>>>>> implement the first task).
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> --      - If the user disables the Unaligned Checkpoint, can
>>>> we
>>>> >>>>> set
>>>> >>>>>>> the
>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>>>> overdraft
>>>> >>>>>> isn't
>>>> >>>>>>>>>> useful for
>>>> >>>>>>>>>>          the Aligned Checkpoint.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make
>>>> >>>>>> degradation
>>>> >>>>>>>> for
>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make
>>>> >>>>> difference
>>>> >>>>>>>> between
>>>> >>>>>>>>>> AC and UC.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>       4. For the legacy source
>>>> >>>>>>>>>> --      - If enabling the Unaligned Checkpoint, it uses up to
>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>>>> >>>>>>>>>>          - If disabling the UC, it doesn't use the overdraft
>>>> >>>> buffer.
>>>> >>>>>>>>>>          - Do you think it's ok?
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at
>>>> all
>>>> >>>>> since
>>>> >>>>>>> it
>>>> >>>>>>>>>> can lead to undesirable results especially if the limit is
>>>> high.
>>>> >>>>> At
>>>> >>>>>>>> least,
>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode and it
>>>> >>>> will
>>>> >>>>>>>> borrow
>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool
>>>> >>>> which
>>>> >>>>>> can
>>>> >>>>>>>> lead
>>>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> --      - Actually, we added the checkAvailable logic for
>>>> >>>>>> LegacySource
>>>> >>>>>>>> in
>>>> >>>>>>>>>> our
>>>> >>>>>>>>>>          internal version. It works well.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> I don't really understand how it is possible for general case
>>>> >>>>>>>> considering
>>>> >>>>>>>>>> that each user has their own implementation of
>>>> >>>>> LegacySourceOperator
>>>> >>>>>>>>>> --   5. For the benchmark, do you have any suggestions? I
>>>> >>>>> submitted
>>>> >>>>>>> the
>>>> >>>>>>>> PR
>>>> >>>>>>>>>>       [1].
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
>>>> >>>>>>>>>>> Hi,
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>       1. Do you mean split this into two JIRAs or two PRs
>>>> or two
>>>> >>>>>>> commits
>>>> >>>>>>>>>> in a
>>>> >>>>>>>>>>>       PR?
>>>> >>>>>>>>>>>       2. For the first task, if the flink user disables the
>>>> >>>>>> Unaligned
>>>> >>>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
>>>> Because
>>>> >>>>> the
>>>> >>>>>>>>>> overdraft
>>>> >>>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
>>>> needs to
>>>> >>>>>> wait
>>>> >>>>>>>> for
>>>> >>>>>>>>>>>       downstream Task to consume.
>>>> >>>>>>>>>>>       3. For the second task
>>>> >>>>>>>>>>>          - The default value of
>>>> maxOverdraftBuffersPerPartition
>>>> >>>>> may
>>>> >>>>>>> also
>>>> >>>>>>>>>> need
>>>> >>>>>>>>>>>          to be discussed.
>>>> >>>>>>>>>>>          - If the user disables the Unaligned Checkpoint,
>>>> can we
>>>> >>>>> set
>>>> >>>>>>> the
>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>>>> >>>> overdraft
>>>> >>>>>>> isn't
>>>> >>>>>>>>>> useful for
>>>> >>>>>>>>>>>          the Aligned Checkpoint.
>>>> >>>>>>>>>>>       4. For the legacy source
>>>> >>>>>>>>>>>          - If enabling the Unaligned Checkpoint, it uses up
>>>> to
>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>>>> >>>>>>>>>>>          - If disabling the UC, it doesn't use the overdraft
>>>> >>>>> buffer.
>>>> >>>>>>>>>>>          - Do you think it's ok?
>>>> >>>>>>>>>>>          - Actually, we added the checkAvailable logic for
>>>> >>>>>>> LegacySource
>>>> >>>>>>>>>> in our
>>>> >>>>>>>>>>>          internal version. It works well.
>>>> >>>>>>>>>>>       5. For the benchmark, do you have any suggestions? I
>>>> >>>>> submitted
>>>> >>>>>>> the
>>>> >>>>>>>>>> PR
>>>> >>>>>>>>>>>       [1].
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> Thanks
>>>> >>>>>>>>>>> fanrui
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
>>>> >>>>>>> kaa....@yandex.com
>>>> >>>>>>>>>>> wrote:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>> Hi,
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here is
>>>> >>>>> some
>>>> >>>>>>>>>>>> conclusion:
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> First of all, let's split this into two tasks.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per channel.
>>>> >>>> This
>>>> >>>>>>> means
>>>> >>>>>>>> if
>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the
>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just
>>>> >>>>> ignore
>>>> >>>>>>> that
>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool has
>>>> >>>> it(it
>>>> >>>>>> is
>>>> >>>>>>>>>>>> actually not a overdraft).
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty
>>>> >>>>>> convinced
>>>> >>>>>>>> now
>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for limitation
>>>> of
>>>> >>>>>>>> overdraft
>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all
>>>> >>>>> buffers
>>>> >>>>>> of
>>>> >>>>>>>> one
>>>> >>>>>>>>>>>> TaskManager considering that several different jobs can be
>>>> >>>>>>> submitted
>>>> >>>>>>>> on
>>>> >>>>>>>>>>>> this TaskManager). So idea is to have
>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>>>> >>>>>>>>>> LocalBufferPool).
>>>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool is
>>>> >>>>>>> reached,
>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from
>>>> >>>> NetworkBufferPool
>>>> >>>>>> up
>>>> >>>>>>> to
>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource since
>>>> it
>>>> >>>>>>>> actually
>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in
>>>> >>>> overdraft
>>>> >>>>>>> mode
>>>> >>>>>>>>>>>> which is not a target. So we still need to think about
>>>> that.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>>      29.04.2022 11:11, rui fan пишет:
>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov,
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum number
>>>> of
>>>> >>>>>>>> overdraft
>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, right?
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to don't
>>>> >>>> add
>>>> >>>>>> the
>>>> >>>>>>>> new
>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the community.
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Best wishes
>>>> >>>>>>>>>>>>> fanrui
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
>>>> >>>>> 1996fan...@gmail.com>
>>>> >>>>>>>>>> wrote:
>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov,
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are totally
>>>> >>>>>> right.
>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used
>>>> as
>>>> >>>>> the
>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
>>>> >>>>>>> configuration.Flink
>>>> >>>>>>>>>> users
>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the overdraft
>>>> >>>>>> buffer
>>>> >>>>>>>>>> size.
>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we
>>>> >>>> should
>>>> >>>>>>> limit
>>>> >>>>>>>>>> the
>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each
>>>> >>>>> LocalBufferPool
>>>> >>>>>>>>>>>>>> can apply for.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> Why do we limit it?
>>>> >>>>>>>>>>>>>> Some operators don't check the `recordWriter.isAvailable`
>>>> >>>>>> during
>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have
>>>> mentioned
>>>> >>>> it
>>>> >>>>>> in
>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use up
>>>> >>>> all
>>>> >>>>>>>>>> remaining
>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is
>>>> >>>>>> severe.
>>>> >>>>>>>>>>>>>> How to limit it?
>>>> >>>>>>>>>>>>>> I prefer to hard code the
>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
>>>> >>>>> maxOverdraftBuffers
>>>> >>>>>> is
>>>> >>>>>>>>>> just
>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink jobs.
>>>> Or
>>>> >>>>> we
>>>> >>>>>>> can
>>>> >>>>>>>>>> set
>>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)`
>>>> >>>> to
>>>> >>>>>>> handle
>>>> >>>>>>>>>>>>>> some jobs of low parallelism.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we
>>>> can
>>>> >>>>> set
>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
>>>> >>>> LocalBufferPool.
>>>> >>>>>>>> Because
>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> Best wishes
>>>> >>>>>>>>>>>>>> fanrui
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <
>>>> >>>>>>>>>> kaa....@yandex.com>
>>>> >>>>>>>>>>>>>> wrote:
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> Hi fanrui,
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it
>>>> >>>> should
>>>> >>>>>>> help
>>>> >>>>>>>> in
>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about
>>>> >>>>>> configuration:
>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand
>>>> right
>>>> >>>>> now
>>>> >>>>>>> we
>>>> >>>>>>>>>> have
>>>> >>>>>>>>>>>>>>> following calculation.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
>>>> >>>>> memory(calculated
>>>> >>>>>>> via
>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
>>>> >>>>>>>> taskmanager.memory.network.min,
>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) /
>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive
>>>> >>>> buffers
>>>> >>>>> *
>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in
>>>> >>>>>> TaskManager
>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used
>>>> at
>>>> >>>>>>> current
>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to
>>>> >>>>>>> maxBuffersNumber
>>>> >>>>>>>>>> which
>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if
>>>> >>>> requiredBuffersNumber
>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not good)
>>>> >>>>> since
>>>> >>>>>>> not
>>>> >>>>>>>>>> all
>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if Flink
>>>> >>>> can
>>>> >>>>>> not
>>>> >>>>>>>>>>>>>>> allocate floating buffers)
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I
>>>> >>>>>> understand
>>>> >>>>>>>>>> Flink
>>>> >>>>>>>>>>>>>>> just never use these leftovers buffers(maxBuffersNumber
>>>> -
>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we can
>>>> >>>>>> actualy
>>>> >>>>>>>> use
>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
>>>> >>>> buffersInUseNumber'
>>>> >>>>>>> since
>>>> >>>>>>>>>> if
>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators including
>>>> >>>>> 'window'
>>>> >>>>>>>> which
>>>> >>>>>>>>>> can
>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool).
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to
>>>> >>>> requesting
>>>> >>>>>>>> buffers
>>>> >>>>>>>>>>>>>>> during processing single record while switching to
>>>> >>>>>> unavalability
>>>> >>>>>>>>>>>> between
>>>> >>>>>>>>>>>>>>> records should be the same as we have it now):
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel
>>>> >>>>>> reached,
>>>> >>>>>>>>>> then
>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers
>>>> from
>>>> >>>>> any
>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet
>>>> >>>> otherwise
>>>> >>>>>>> from
>>>> >>>>>>>>>>>>>>> NetworkBufferPool)
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally
>>>> >>>> allocate
>>>> >>>>>> it
>>>> >>>>>>>> from
>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't work,
>>>> >>>> but I
>>>> >>>>>>> like
>>>> >>>>>>>> it
>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch without
>>>> >>>> any
>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be
>>>> configuration
>>>> >>>> by
>>>> >>>>>>>>>> changing
>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and
>>>> requiredBuffersNumber.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really want
>>>> to
>>>> >>>>>>>> implement
>>>> >>>>>>>>>> new
>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to
>>>> >>>>> correctly
>>>> >>>>>>>>>> configure
>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I don't
>>>> >>>> want
>>>> >>>>>> to
>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to
>>>> >>>> resolve
>>>> >>>>>> the
>>>> >>>>>>>>>> problem
>>>> >>>>>>>>>>>>>>> automatically(as described above).
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers
>>>> >>>> correct?
>>>> >>>>>>>>>>>>>>> --
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> Best regards,
>>>> >>>>>>>>>>>>>>> Anton Kalashnikov
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
>>>> >>>>>>>>>>>>>>>> Hi everyone,
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature
>>>> of
>>>> >>>>>> Flink.
>>>> >>>>>>>> It
>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout or
>>>> >>>>> slow
>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work
>>>> well
>>>> >>>>>> when
>>>> >>>>>>>> the
>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers are
>>>> >>>>>>> required
>>>> >>>>>>>> to
>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also mentioned
>>>> >>>>> this
>>>> >>>>>>>> issue
>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve it.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the
>>>> >>>>>>> overdraft
>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
>>>> >>>> Kalashnikov,
>>>> >>>>>>> there
>>>> >>>>>>>>>> are
>>>> >>>>>>>>>>>>>>>> still some points to discuss:
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>>       * There are already a lot of buffer-related
>>>> >>>>>> configurations.
>>>> >>>>>>>> Do
>>>> >>>>>>>>>> we
>>>> >>>>>>>>>>>>>>>>         need to add a new configuration for the
>>>> overdraft
>>>> >>>>>> buffer?
>>>> >>>>>>>>>>>>>>>>       * Where should the overdraft buffer use memory?
>>>> >>>>>>>>>>>>>>>>       * If the overdraft-buffer uses the memory
>>>> remaining
>>>> >>>> in
>>>> >>>>>> the
>>>> >>>>>>>>>>>>>>>>         NetworkBufferPool, no new configuration needs
>>>> to be
>>>> >>>>>>> added.
>>>> >>>>>>>>>>>>>>>>       * If adding a new configuration:
>>>> >>>>>>>>>>>>>>>>           o Should we set the overdraft-memory-size at
>>>> the
>>>> >>>> TM
>>>> >>>>>>> level
>>>> >>>>>>>>>> or
>>>> >>>>>>>>>>>> the
>>>> >>>>>>>>>>>>>>>>             Task level?
>>>> >>>>>>>>>>>>>>>>           o Or set overdraft-buffers to indicate the
>>>> number
>>>> >>>>> of
>>>> >>>>>>>>>>>>>>>>             memory-segments that can be overdrawn.
>>>> >>>>>>>>>>>>>>>>           o What is the default value? How to set
>>>> sensible
>>>> >>>>>>>> defaults?
>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it
>>>> using
>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers at
>>>> >>>>> Task
>>>> >>>>>>>> level,
>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool
>>>> >>>> can
>>>> >>>>>>>>>> overdraw up
>>>> >>>>>>>>>>>>>>>> to 10 memory-segments.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback!
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> Thanks,
>>>> >>>>>>>>>>>>>>>> fanrui
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> [1]
>>>> >>>>>>>>>>>>>>>>
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>>>> >>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396
>>>> >>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762
>>>> >>>>>>>>>>>>>>>> [4]
>>>> >>>>>>>>>>>>>>>>
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>>>> >>>>>>>>>>>>>>>> [5]
>>>> >>>>>>>>>>>>>>>>
>>>> >>
>>>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
>>>> >>>>>>>>>>>>>>>> [6] https://github.com/apache/flink-benchmarks/pull/54
>>>> >>>>>>>>>>>> --
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> Best regards,
>>>> >>>>>>>>>>>> Anton Kalashnikov
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>> --
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Best regards,
>>>> >>>>>>>>>> Anton Kalashnikov
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >> --
>>>> >>
>>>> >> Best regards,
>>>> >> Anton Kalashnikov
>>>> >>
>>>> >>
>>>>
>>>

Reply via email to