Hi fanrui, > How to identify legacySource?
legacy sources are always using the SourceStreamTask class and SourceStreamTask is used only for legacy sources. But I'm not sure how to enable/disable that. Adding `disableOverdraft()` call in SourceStreamTask would be better compared to relying on the `getAvailableFuture()` call (isn't it used for back pressure metric anyway?). Ideally we should enable/disable it in the constructors, but that might be tricky. > I prefer it to be between 5 and 10 I would vote for a smaller value because of FLINK-13203 Piotrek czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a): > Hi, > > Thanks a lot for your discussion. > > After several discussions, I think it's clear now. I updated the > "Proposed Changes" of FLIP-227[1]. If I have something > missing, please help to add it to FLIP, or add it in the mail > and I can add it to FLIP. If everything is OK, I will create a > new JIRA for the first task, and use FLINK-26762[2] as the > second task. > > About the legacy source, do we set maxOverdraftBuffersPerGate=0 > directly? How to identify legacySource? Or could we add > the overdraftEnabled in LocalBufferPool? The default value > is false. If the getAvailableFuture is called, change > overdraftEnabled=true. > It indicates whether there are checks isAvailable elsewhere. > It might be more general, it can cover more cases. > > Also, I think the default value of 'max-overdraft-buffers-per-gate' > needs to be confirmed. I prefer it to be between 5 and 10. How > do you think? > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > [2] https://issues.apache.org/jira/browse/FLINK-26762 > > Thanks > fanrui > > On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi again, >> >> After sleeping over this, if both versions (reserve and overdraft) have >> the same complexity, I would also prefer the overdraft. >> >> > `Integer.MAX_VALUE` as default value was my idea as well but now, as >> > Dawid mentioned, I think it is dangerous since it is too implicit for >> > the user and if the user submits one more job for the same TaskManger >> >> As I mentioned, it's not only an issue with multiple jobs. The same >> problem can happen with different subtasks from the same job, potentially >> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I would be >> in favour of Integer.MAX_VALUE to be the default value, but as it is, I >> think we should indeed play on the safe side and limit it. >> >> > I still don't understand how should be limited "reserve" implementation. >> > I mean if we have X buffers in total and the user sets overdraft equal >> > to X we obviously can not reserve all buffers, but how many we are >> > allowed to reserve? Should it be a different configuration like >> > percentegeForReservedBuffers? >> >> The reserve could be defined as percentage, or as a fixed number of >> buffers. But yes. In normal operation subtask would not use the reserve, as >> if numberOfAvailableBuffers < reserve, the output would be not available. >> Only in the flatMap/timers/huge records case the reserve could be used. >> >> > 1. If the total buffers of LocalBufferPool <= the reserve buffers, will >> LocalBufferPool never be available? Can't process data? >> >> Of course we would need to make sure that never happens. So the reserve >> should be < total buffer size. >> >> > 2. If the overdraft buffer use the extra buffers, when the downstream >> > task inputBuffer is insufficient, it should fail to start the job, and >> then >> > restart? When the InputBuffer is initialized, it will apply for enough >> > buffers, right? >> >> The failover if downstream can not allocate buffers is already >> implemented FLINK-14872 [2]. There is a timeout for how long the task is >> waiting for buffer allocation. However this doesn't prevent many >> (potentially infinitely many) deadlock/restarts cycles. IMO the propper >> solution for [1] would be 2b described in the ticket: >> >> > 2b. Assign extra buffers only once all of the tasks are RUNNING. This >> is a simplified version of 2a, without tracking the tasks sink-to-source. >> >> But that's a pre-existing problem and I don't think we have to solve it >> before implementing overdraft. I think we would need to solve it only >> before setting Integer.MAX_VALUE as the default for the overdraft. Maybe I >> would hesitate setting the overdraft to anything more then a couple of >> buffers by default for the same reason. >> >> > Actually, I totally agree that we don't need a lot of buffers for >> overdraft >> >> and >> >> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. >> > When we finish this feature and after users use it, if users feedback >> > this issue we can discuss again. >> >> +1 >> >> Piotrek >> >> [1] https://issues.apache.org/jira/browse/FLINK-13203 >> [2] https://issues.apache.org/jira/browse/FLINK-14872 >> >> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a): >> >>> Hi everyone, >>> >>> I still have some questions. >>> >>> 1. If the total buffers of LocalBufferPool <= the reserve buffers, will >>> LocalBufferPool never be available? Can't process data? >>> 2. If the overdraft buffer use the extra buffers, when the downstream >>> task inputBuffer is insufficient, it should fail to start the job, and >>> then >>> restart? When the InputBuffer is initialized, it will apply for enough >>> buffers, right? >>> >>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. >>> When we finish this feature and after users use it, if users feedback >>> this issue we can discuss again. >>> >>> Thanks >>> fanrui >>> >>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <dwysakow...@apache.org> >>> wrote: >>> >>>> Hey all, >>>> >>>> I have not replied in the thread yet, but I was following the >>>> discussion. >>>> >>>> Personally, I like Fanrui's and Anton's idea. As far as I understand it >>>> the idea to distinguish between inside flatMap & outside would be >>>> fairly >>>> simple, but maybe slightly indirect. The checkAvailability would remain >>>> unchanged and it is checked always between separate invocations of the >>>> UDF. Therefore the overdraft buffers would not apply there. However >>>> once >>>> the pool says it is available, it means it has at least an initial >>>> buffer. So any additional request without checking for availability can >>>> be considered to be inside of processing a single record. This does not >>>> hold just for the LegacySource as I don't think it actually checks for >>>> the availability of buffers in the LocalBufferPool. >>>> >>>> In the offline chat with Anton, we also discussed if we need a limit of >>>> the number of buffers we could overdraft (or in other words if the >>>> limit >>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to >>>> stay >>>> on the safe side and have it limited. The pool of network buffers is >>>> shared for the entire TaskManager, so it means it can be shared even >>>> across tasks of separate jobs. However, I might be just unnecessarily >>>> cautious here. >>>> >>>> Best, >>>> >>>> Dawid >>>> >>>> On 04/05/2022 10:54, Piotr Nowojski wrote: >>>> > Hi, >>>> > >>>> > Thanks for the answers. >>>> > >>>> >> we may still need to discuss whether the >>>> >> overdraft/reserve/spare should use extra buffers or buffers >>>> >> in (exclusive + floating buffers)? >>>> > and >>>> > >>>> >> These things resolve the different problems (at least as I see that). >>>> >> The current hardcoded "1" says that we switch "availability" to >>>> >> "unavailability" when one more buffer is left(actually a little less >>>> >> than one buffer since we write the last piece of data to this last >>>> >> buffer). The overdraft feature doesn't change this logic we still >>>> want >>>> >> to switch to "unavailability" in such a way but if we are already in >>>> >> "unavailability" and we want more buffers then we can take "overdraft >>>> >> number" more. So we can not avoid this hardcoded "1" since we need to >>>> >> understand when we should switch to "unavailability" >>>> > Ok, I see. So it seems to me that both of you have in mind to keep the >>>> > buffer pools as they are right now, but if we are in the middle of >>>> > processing a record, we can request extra overdraft buffers on top of >>>> > those? This is another way to implement the overdraft to what I was >>>> > thinking. I was thinking about something like keeping the "overdraft" >>>> or >>>> > more precisely buffer "reserve" in the buffer pool. I think my version >>>> > would be easier to implement, because it is just fiddling with min/max >>>> > buffers calculation and slightly modified `checkAvailability()` logic. >>>> > >>>> > On the other hand what you have in mind would better utilise the >>>> available >>>> > memory, right? It would require more code changes (how would we know >>>> when >>>> > we are allowed to request the overdraft?). However, in this case, I >>>> would >>>> > be tempted to set the number of overdraft buffers by default to >>>> > `Integer.MAX_VALUE`, and let the system request as many buffers as >>>> > necessary. The only downside that I can think of (apart of higher >>>> > complexity) would be higher chance of hitting a known/unsolved >>>> deadlock [1] >>>> > in a scenario: >>>> > - downstream task hasn't yet started >>>> > - upstream task requests overdraft and uses all available memory >>>> segments >>>> > from the global pool >>>> > - upstream task is blocked, because downstream task hasn't started >>>> yet and >>>> > can not consume any data >>>> > - downstream task tries to start, but can not, as there are no >>>> available >>>> > buffers >>>> > >>>> >> BTW, for watermark, the number of buffers it needs is >>>> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, >>>> >> the watermark won't block in requestMemory. >>>> > and >>>> > >>>> >> the best overdraft size will be equal to parallelism. >>>> > That's a lot of buffers. I don't think we need that many for >>>> broadcasting >>>> > watermarks. Watermarks are small, and remember that every >>>> subpartition has >>>> > some partially filled/empty WIP buffer, so the vast majority of >>>> > subpartitions will not need to request a new buffer. >>>> > >>>> > Best, >>>> > Piotrek >>>> > >>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203 >>>> > >>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> >>>> napisał(a): >>>> > >>>> >> Hi, >>>> >> >>>> >> >>>> >> >> Do you mean to ignore it while processing records, but keep >>>> using >>>> >> `maxBuffersPerChannel` when calculating the availability of the >>>> output? >>>> >> >>>> >> >>>> >> Yes, it is correct. >>>> >> >>>> >> >>>> >> >> Would it be a big issue if we changed it to check if at least >>>> >> "overdraft number of buffers are available", where "overdraft >>>> number" is >>>> >> configurable, instead of the currently hardcoded value of "1"? >>>> >> >>>> >> >>>> >> These things resolve the different problems (at least as I see that). >>>> >> The current hardcoded "1" says that we switch "availability" to >>>> >> "unavailability" when one more buffer is left(actually a little less >>>> >> than one buffer since we write the last piece of data to this last >>>> >> buffer). The overdraft feature doesn't change this logic we still >>>> want >>>> >> to switch to "unavailability" in such a way but if we are already in >>>> >> "unavailability" and we want more buffers then we can take "overdraft >>>> >> number" more. So we can not avoid this hardcoded "1" since we need to >>>> >> understand when we should switch to "unavailability" >>>> >> >>>> >> >>>> >> -- About "reserve" vs "overdraft" >>>> >> >>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will be >>>> >> equal to parallelism. Also, the user can set any value he wants. So >>>> even >>>> >> if parallelism is small(~5) but the user's flatmap produces a lot of >>>> >> data, the user can set 10 or even more. Which almost double the max >>>> >> buffers and it will be impossible to reserve. At least we need to >>>> figure >>>> >> out how to protect from such cases (the limit for an overdraft?). So >>>> >> actually it looks even more difficult than increasing the maximum >>>> buffers. >>>> >> >>>> >> I want to emphasize that overdraft buffers are soft configuration >>>> which >>>> >> means it takes as many buffers as the global buffers pool has >>>> >> available(maybe zero) but less than this configured value. It is also >>>> >> important to notice that perhaps, not many subtasks in TaskManager >>>> will >>>> >> be using this feature so we don't actually need a lot of available >>>> >> buffers for every subtask(Here, I mean that if we have only one >>>> >> window/flatmap operator and many other operators, then one >>>> TaskManager >>>> >> will have many ordinary subtasks which don't actually need overdraft >>>> and >>>> >> several subtasks that needs this feature). But in case of >>>> reservation, >>>> >> we will reserve some buffers for all operators even if they don't >>>> really >>>> >> need it. >>>> >> >>>> >> >>>> >> -- Legacy source problem >>>> >> >>>> >> If we still want to change max buffers then it is problem for >>>> >> LegacySources(since every subtask of source will always use these >>>> >> overdraft). But right now, I think that we can force to set 0 >>>> overdraft >>>> >> buffers for legacy subtasks in configuration during execution(if it >>>> is >>>> >> not too late for changing configuration in this place). >>>> >> >>>> >> >>>> >> 03.05.2022 14:11, rui fan пишет: >>>> >>> Hi >>>> >>> >>>> >>> Thanks for Martijn Visser and Piotrek's feedback. I agree with >>>> >>> ignoring the legacy source, it will affect our design. User should >>>> >>> use the new Source Api as much as possible. >>>> >>> >>>> >>> Hi Piotrek, we may still need to discuss whether the >>>> >>> overdraft/reserve/spare should use extra buffers or buffers >>>> >>> in (exclusive + floating buffers)? They have some differences. >>>> >>> >>>> >>> If it uses extra buffers: >>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1 >>>> >>> <= currentPoolSize) and all subpartitions don't reach the >>>> >>> maxBuffersPerChannel. >>>> >>> >>>> >>> If it uses the buffers in (exclusive + floating buffers): >>>> >>> 1. The LocalBufferPool will be available when (usedBuffers + >>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions >>>> >>> don't reach the maxBuffersPerChannel. >>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), the >>>> >>> usedBuffers will be small. That is the LocalBufferPool will be >>>> >>> easily unavailable. For throughput, if users turn up the >>>> >>> overdraft buffers, they need to turn up exclusive or floating >>>> >>> buffers. It also affects the InputChannel, and it's is unfriendly >>>> >>> to users. >>>> >>> >>>> >>> So I prefer the overdraft to use extra buffers. >>>> >>> >>>> >>> >>>> >>> BTW, for watermark, the number of buffers it needs is >>>> >>> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, >>>> >>> the watermark won't block in requestMemory. But it has >>>> >>> 2 problems: >>>> >>> 1. It needs more overdraft buffers. If the overdraft uses >>>> >>> (exclusive + floating buffers), there will be fewer buffers >>>> >>> available. Throughput may be affected. >>>> >>> 2. The numberOfSubpartitions is different for each Task. >>>> >>> So if users want to cover watermark using this feature, >>>> >>> they don't know how to set the overdraftBuffers more r >>>> >>> easonably. And if the parallelism is changed, users still >>>> >>> need to change overdraftBuffers. It is unfriendly to users. >>>> >>> >>>> >>> So I propose we support overdraftBuffers=-1, It means >>>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions >>>> >>> in the Constructor of LocalBufferPool. >>>> >>> >>>> >>> Please correct me if I'm wrong. >>>> >>> >>>> >>> Thanks >>>> >>> fanrui >>>> >>> >>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <pnowoj...@apache.org >>>> > >>>> >> wrote: >>>> >>>> Hi fanrui, >>>> >>>> >>>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive >>>> >> buffers * >>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be >>>> available >>>> >>>> when >>>> >>>>> (usedBuffers+overdraftBuffers <= >>>> >>>> exclusiveBuffers*parallelism+floatingBuffers) >>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, right? >>>> >>>> I'm not sure. Definitely we would need to adjust the minimum >>>> number of >>>> >> the >>>> >>>> required buffers, just as we did when we were implementing the non >>>> >> blocking >>>> >>>> outputs and adding availability logic to LocalBufferPool. Back >>>> then we >>>> >>>> added "+ 1" to the minimum number of buffers. Currently this logic >>>> is >>>> >>>> located >>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition: >>>> >>>> >>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : >>>> numSubpartitions + 1; >>>> >>>> For performance reasons, we always require at least one buffer per >>>> >>>> sub-partition. Otherwise performance falls drastically. Now if we >>>> >> require 5 >>>> >>>> overdraft buffers for output to be available, we need to have them >>>> on >>>> >> top >>>> >>>> of those "one buffer per sub-partition". So the logic should be >>>> changed >>>> >> to: >>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : >>>> numSubpartitions + >>>> >>>> numOverdraftBuffers; >>>> >>>> >>>> >>>> Regarding increasing the number of max buffers I'm not sure. As >>>> long as >>>> >>>> "overdraft << max number of buffers", because all buffers on the >>>> outputs >>>> >>>> are shared across all sub-partitions. If we have 5 overdraft >>>> buffers, >>>> >> and >>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of >>>> things if >>>> >> we >>>> >>>> make the output available if at least one single buffer is >>>> available or >>>> >> at >>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So >>>> effects of >>>> >>>> increasing the overdraft from 1 to for example 5 should be >>>> negligible. >>>> >> For >>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 still >>>> >> increases >>>> >>>> the overdraft by only about 25%. So maybe we can keep the max as >>>> it is? >>>> >>>> >>>> >>>> If so, maybe we should change the name from "overdraft" to "buffer >>>> >> reserve" >>>> >>>> or "spare buffers"? And document it as "number of buffers kept in >>>> >> reserve >>>> >>>> in case of flatMap/firing timers/huge records"? >>>> >>>> >>>> >>>> What do you think Fenrui, Anton? >>>> >>>> >>>> >>>> Re LegacySources. I agree we can kind of ignore them in the new >>>> >> features, >>>> >>>> as long as we don't brake the existing deployments too much. >>>> >>>> >>>> >>>> Best, >>>> >>>> Piotrek >>>> >>>> >>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com> >>>> >> napisał(a): >>>> >>>>> Hi everyone, >>>> >>>>> >>>> >>>>> Just wanted to chip in on the discussion of legacy sources: IMHO, >>>> we >>>> >>>> should >>>> >>>>> not focus too much on improving/adding capabilities for legacy >>>> sources. >>>> >>>> We >>>> >>>>> want to persuade and push users to use the new Source API. Yes, >>>> this >>>> >>>> means >>>> >>>>> that there's work required by the end users to port any custom >>>> source >>>> >> to >>>> >>>>> the new interface. The benefits of the new Source API should >>>> outweigh >>>> >>>> this. >>>> >>>>> Anything that we build to support multiple interfaces means >>>> adding more >>>> >>>>> complexity and more possibilities for bugs. Let's try to make our >>>> >> lives a >>>> >>>>> little bit easier. >>>> >>>>> >>>> >>>>> Best regards, >>>> >>>>> >>>> >>>>> Martijn Visser >>>> >>>>> https://twitter.com/MartijnVisser82 >>>> >>>>> https://github.com/MartijnVisser >>>> >>>>> >>>> >>>>> >>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> >>>> wrote: >>>> >>>>> >>>> >>>>>> Hi Piotrek >>>> >>>>>> >>>> >>>>>>> Do you mean to ignore it while processing records, but keep >>>> using >>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the >>>> >>>> output? >>>> >>>>>> I think yes, and please Anton Kalashnikov to help double check. >>>> >>>>>> >>>> >>>>>>> +1 for just having this as a separate configuration. Is it a big >>>> >>>>> problem >>>> >>>>>>> that legacy sources would be ignoring it? Note that we already >>>> have >>>> >>>>>>> effectively hardcoded a single overdraft buffer. >>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single >>>> >>>> buffer >>>> >>>>>>> available and this works the same for all tasks (including >>>> legacy >>>> >>>>> source >>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at >>>> least >>>> >>>>>>> "overdraft number of buffers are available", where "overdraft >>>> number" >>>> >>>>> is >>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"? >>>> >>>>>> Do you mean don't add the extra buffers? We just use (exclusive >>>> >>>> buffers * >>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be >>>> available >>>> >>>>> when >>>> >>>>>> (usedBuffers+overdraftBuffers <= >>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers) >>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, >>>> right? >>>> >>>>>> >>>> >>>>>> If yes, I think it can solve the problem of legacy source. There >>>> may >>>> >> be >>>> >>>>>> some impact. If overdraftBuffers is large and only one buffer is >>>> used >>>> >>>> to >>>> >>>>>> process a single record, exclusive buffers*parallelism + floating >>>> >>>> buffers >>>> >>>>>> cannot be used. It may only be possible to use (exclusive >>>> buffers * >>>> >>>>>> parallelism >>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, if >>>> turn >>>> >> up >>>> >>>>> the >>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive or >>>> >>>> floating >>>> >>>>>> buffers. And it also affects the InputChannel. >>>> >>>>>> >>>> >>>>>> If not, I don't think it can solve the problem of legacy source. >>>> The >>>> >>>>> legacy >>>> >>>>>> source don't check isAvailable, If there are the extra buffers, >>>> legacy >>>> >>>>>> source >>>> >>>>>> will use them up until block in requestMemory. >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> Thanks >>>> >>>>>> fanrui >>>> >>>>>> >>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski < >>>> pnowoj...@apache.org> >>>> >>>>>> wrote: >>>> >>>>>> >>>> >>>>>>> Hi, >>>> >>>>>>> >>>> >>>>>>> +1 for the general proposal from my side. It would be a nice >>>> >>>> workaround >>>> >>>>>>> flatMaps, WindowOperators and large records issues with >>>> unaligned >>>> >>>>>>> checkpoints. >>>> >>>>>>> >>>> >>>>>>>> The first task is about ignoring max buffers per channel. This >>>> >>>> means >>>> >>>>> if >>>> >>>>>>>> we request a memory segment from LocalBufferPool and the >>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just >>>> ignore >>>> >>>> that >>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has >>>> it(it is >>>> >>>>>>>> actually not a overdraft). >>>> >>>>>>> Do you mean to ignore it while processing records, but keep >>>> using >>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the >>>> >>>> output? >>>> >>>>>>>> The second task is about the real overdraft. I am pretty >>>> convinced >>>> >>>>> now >>>> >>>>>>>> that we, unfortunately, need configuration for limitation of >>>> >>>>> overdraft >>>> >>>>>>>> number(because it is not ok if one subtask allocates all >>>> buffers of >>>> >>>>> one >>>> >>>>>>>> TaskManager considering that several different jobs can be >>>> >>>> submitted >>>> >>>>> on >>>> >>>>>>>> this TaskManager). So idea is to have >>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >>>> >>>>>> LocalBufferPool). >>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is >>>> >>>> reached, >>>> >>>>>>>> LocalBufferPool can request additionally from >>>> NetworkBufferPool up >>>> >>>> to >>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers. >>>> >>>>>>> +1 for just having this as a separate configuration. Is it a big >>>> >>>>> problem >>>> >>>>>>> that legacy sources would be ignoring it? Note that we already >>>> have >>>> >>>>>>> effectively hardcoded a single overdraft buffer. >>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single >>>> >>>> buffer >>>> >>>>>>> available and this works the same for all tasks (including >>>> legacy >>>> >>>>> source >>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at >>>> least >>>> >>>>>>> "overdraft number of buffers are available", where "overdraft >>>> number" >>>> >>>>> is >>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"? >>>> >>>>>>> >>>> >>>>>>> Best, >>>> >>>>>>> Piotrek >>>> >>>>>>> >>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> >>>> napisał(a): >>>> >>>>>>> >>>> >>>>>>>> Let me add some information about the LegacySource. >>>> >>>>>>>> >>>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource. >>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool? >>>> >>>>>>>> The default value is false. If the getAvailableFuture is >>>> called, >>>> >>>>>>>> change enableOverdraft=true. It indicates whether there are >>>> >>>>>>>> checks isAvailable elsewhere. >>>> >>>>>>>> >>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct me >>>> if >>>> >>>> I'm >>>> >>>>>>> wrong. >>>> >>>>>>>> Thanks >>>> >>>>>>>> fanrui >>>> >>>>>>>> >>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com >>>> > >>>> >>>>> wrote: >>>> >>>>>>>>> Hi, >>>> >>>>>>>>> >>>> >>>>>>>>> Thanks for your quick response. >>>> >>>>>>>>> >>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need to >>>> >>>>> discuss >>>> >>>>>>> the >>>> >>>>>>>>> default value in PR. >>>> >>>>>>>>> >>>> >>>>>>>>> For the legacy source, you are right. It's difficult for >>>> general >>>> >>>>>>>>> implementation. >>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in >>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common >>>> >>>>> LegacySource, >>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume >>>> >>>> kafka, >>>> >>>>> so >>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems. >>>> >>>>>>>>> >>>> >>>>>>>>> Core code: >>>> >>>>>>>>> ``` >>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() { >>>> >>>>>>>>> if (recordWriter == null >>>> >>>>>>>>> || >>>> >>>>>>>>> >>>> >> >>>> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, >>>> >>>>>>>>> false) >>>> >>>>>>>>> || recordWriter.isAvailable()) { >>>> >>>>>>>>> return; >>>> >>>>>>>>> } >>>> >>>>>>>>> >>>> >>>>>>>>> CompletableFuture<?> resumeFuture = >>>> >>>>>>>> recordWriter.getAvailableFuture(); >>>> >>>>>>>>> try { >>>> >>>>>>>>> resumeFuture.get(); >>>> >>>>>>>>> } catch (Throwable ignored) { >>>> >>>>>>>>> } >>>> >>>>>>>>> } >>>> >>>>>>>>> ``` >>>> >>>>>>>>> >>>> >>>>>>>>> LegacySource calls >>>> sourceContext.ensureRecordWriterIsAvailable() >>>> >>>>>>>>> before synchronized (checkpointLock) and collects records. >>>> >>>>>>>>> Please let me know if there is a better solution. >>>> >>>>>>>>> >>>> >>>>>>>>> Thanks >>>> >>>>>>>>> fanrui >>>> >>>>>>>>> >>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov < >>>> >>>>>> kaa....@yandex.com> >>>> >>>>>>>>> wrote: >>>> >>>>>>>>> >>>> >>>>>>>>>> Hi. >>>> >>>>>>>>>> >>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or two >>>> >>>>>> commits >>>> >>>>>>>> in a >>>> >>>>>>>>>> PR? >>>> >>>>>>>>>> >>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this task >>>> has >>>> >>>>>> fewer >>>> >>>>>>>>>> questions but we should find a solution for LegacySource >>>> first. >>>> >>>>>>>>>> >>>> >>>>>>>>>> -- 2. For the first task, if the flink user disables the >>>> >>>>> Unaligned >>>> >>>>>>>>>> Checkpoint, do we ignore max buffers per channel? >>>> Because >>>> >>>> the >>>> >>>>>>>>>> overdraft >>>> >>>>>>>>>> isn't useful for the Aligned Checkpoint, it still >>>> needs to >>>> >>>>> wait >>>> >>>>>>> for >>>> >>>>>>>>>> downstream Task to consume. >>>> >>>>>>>>>> >>>> >>>>>>>>>> I think that the logic should be the same for AC and UC. As I >>>> >>>>>>>> understand, >>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it >>>> doesn't >>>> >>>>> make >>>> >>>>>>> it >>>> >>>>>>>>>> worse as well. >>>> >>>>>>>>>> >>>> >>>>>>>>>> 3. For the second task >>>> >>>>>>>>>> -- - The default value of >>>> maxOverdraftBuffersPerPartition >>>> >>>> may >>>> >>>>>>> also >>>> >>>>>>>>>> need >>>> >>>>>>>>>> to be discussed. >>>> >>>>>>>>>> >>>> >>>>>>>>>> I think it should be a pretty small value or even 0 since it >>>> >>>> kind >>>> >>>>> of >>>> >>>>>>>>>> optimization and user should understand what they >>>> do(especially >>>> >>>> if >>>> >>>>>> we >>>> >>>>>>>>>> implement the first task). >>>> >>>>>>>>>> >>>> >>>>>>>>>> -- - If the user disables the Unaligned Checkpoint, can >>>> we >>>> >>>>> set >>>> >>>>>>> the >>>> >>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >>>> overdraft >>>> >>>>>> isn't >>>> >>>>>>>>>> useful for >>>> >>>>>>>>>> the Aligned Checkpoint. >>>> >>>>>>>>>> >>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make >>>> >>>>>> degradation >>>> >>>>>>>> for >>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make >>>> >>>>> difference >>>> >>>>>>>> between >>>> >>>>>>>>>> AC and UC. >>>> >>>>>>>>>> >>>> >>>>>>>>>> 4. For the legacy source >>>> >>>>>>>>>> -- - If enabling the Unaligned Checkpoint, it uses up to >>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>>> >>>>>>>>>> - If disabling the UC, it doesn't use the overdraft >>>> >>>> buffer. >>>> >>>>>>>>>> - Do you think it's ok? >>>> >>>>>>>>>> >>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at >>>> all >>>> >>>>> since >>>> >>>>>>> it >>>> >>>>>>>>>> can lead to undesirable results especially if the limit is >>>> high. >>>> >>>>> At >>>> >>>>>>>> least, >>>> >>>>>>>>>> as I understand, it will always work in overdraft mode and it >>>> >>>> will >>>> >>>>>>>> borrow >>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool >>>> >>>> which >>>> >>>>>> can >>>> >>>>>>>> lead >>>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager. >>>> >>>>>>>>>> >>>> >>>>>>>>>> -- - Actually, we added the checkAvailable logic for >>>> >>>>>> LegacySource >>>> >>>>>>>> in >>>> >>>>>>>>>> our >>>> >>>>>>>>>> internal version. It works well. >>>> >>>>>>>>>> >>>> >>>>>>>>>> I don't really understand how it is possible for general case >>>> >>>>>>>> considering >>>> >>>>>>>>>> that each user has their own implementation of >>>> >>>>> LegacySourceOperator >>>> >>>>>>>>>> -- 5. For the benchmark, do you have any suggestions? I >>>> >>>>> submitted >>>> >>>>>>> the >>>> >>>>>>>> PR >>>> >>>>>>>>>> [1]. >>>> >>>>>>>>>> >>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon. >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет: >>>> >>>>>>>>>>> Hi, >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions. >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> 1. Do you mean split this into two JIRAs or two PRs >>>> or two >>>> >>>>>>> commits >>>> >>>>>>>>>> in a >>>> >>>>>>>>>>> PR? >>>> >>>>>>>>>>> 2. For the first task, if the flink user disables the >>>> >>>>>> Unaligned >>>> >>>>>>>>>>> Checkpoint, do we ignore max buffers per channel? >>>> Because >>>> >>>>> the >>>> >>>>>>>>>> overdraft >>>> >>>>>>>>>>> isn't useful for the Aligned Checkpoint, it still >>>> needs to >>>> >>>>>> wait >>>> >>>>>>>> for >>>> >>>>>>>>>>> downstream Task to consume. >>>> >>>>>>>>>>> 3. For the second task >>>> >>>>>>>>>>> - The default value of >>>> maxOverdraftBuffersPerPartition >>>> >>>>> may >>>> >>>>>>> also >>>> >>>>>>>>>> need >>>> >>>>>>>>>>> to be discussed. >>>> >>>>>>>>>>> - If the user disables the Unaligned Checkpoint, >>>> can we >>>> >>>>> set >>>> >>>>>>> the >>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >>>> >>>> overdraft >>>> >>>>>>> isn't >>>> >>>>>>>>>> useful for >>>> >>>>>>>>>>> the Aligned Checkpoint. >>>> >>>>>>>>>>> 4. For the legacy source >>>> >>>>>>>>>>> - If enabling the Unaligned Checkpoint, it uses up >>>> to >>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>>> >>>>>>>>>>> - If disabling the UC, it doesn't use the overdraft >>>> >>>>> buffer. >>>> >>>>>>>>>>> - Do you think it's ok? >>>> >>>>>>>>>>> - Actually, we added the checkAvailable logic for >>>> >>>>>>> LegacySource >>>> >>>>>>>>>> in our >>>> >>>>>>>>>>> internal version. It works well. >>>> >>>>>>>>>>> 5. For the benchmark, do you have any suggestions? I >>>> >>>>> submitted >>>> >>>>>>> the >>>> >>>>>>>>>> PR >>>> >>>>>>>>>>> [1]. >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54 >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> Thanks >>>> >>>>>>>>>>> fanrui >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov < >>>> >>>>>>> kaa....@yandex.com >>>> >>>>>>>>>>> wrote: >>>> >>>>>>>>>>> >>>> >>>>>>>>>>>> Hi, >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here is >>>> >>>>> some >>>> >>>>>>>>>>>> conclusion: >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> First of all, let's split this into two tasks. >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> The first task is about ignoring max buffers per channel. >>>> >>>> This >>>> >>>>>>> means >>>> >>>>>>>> if >>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the >>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just >>>> >>>>> ignore >>>> >>>>>>> that >>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool has >>>> >>>> it(it >>>> >>>>>> is >>>> >>>>>>>>>>>> actually not a overdraft). >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty >>>> >>>>>> convinced >>>> >>>>>>>> now >>>> >>>>>>>>>>>> that we, unfortunately, need configuration for limitation >>>> of >>>> >>>>>>>> overdraft >>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all >>>> >>>>> buffers >>>> >>>>>> of >>>> >>>>>>>> one >>>> >>>>>>>>>>>> TaskManager considering that several different jobs can be >>>> >>>>>>> submitted >>>> >>>>>>>> on >>>> >>>>>>>>>>>> this TaskManager). So idea is to have >>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >>>> >>>>>>>>>> LocalBufferPool). >>>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool is >>>> >>>>>>> reached, >>>> >>>>>>>>>>>> LocalBufferPool can request additionally from >>>> >>>> NetworkBufferPool >>>> >>>>>> up >>>> >>>>>>> to >>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource since >>>> it >>>> >>>>>>>> actually >>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in >>>> >>>> overdraft >>>> >>>>>>> mode >>>> >>>>>>>>>>>> which is not a target. So we still need to think about >>>> that. >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет: >>>> >>>>>>>>>>>>> Hi Anton Kalashnikov, >>>> >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum number >>>> of >>>> >>>>>>>> overdraft >>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, right? >>>> >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to don't >>>> >>>> add >>>> >>>>>> the >>>> >>>>>>>> new >>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the community. >>>> >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> Best wishes >>>> >>>>>>>>>>>>> fanrui >>>> >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan < >>>> >>>>> 1996fan...@gmail.com> >>>> >>>>>>>>>> wrote: >>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov, >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are totally >>>> >>>>>> right. >>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used >>>> as >>>> >>>>> the >>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer >>>> >>>>>>> configuration.Flink >>>> >>>>>>>>>> users >>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the overdraft >>>> >>>>>> buffer >>>> >>>>>>>>>> size. >>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we >>>> >>>> should >>>> >>>>>>> limit >>>> >>>>>>>>>> the >>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each >>>> >>>>> LocalBufferPool >>>> >>>>>>>>>>>>>> can apply for. >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> Why do we limit it? >>>> >>>>>>>>>>>>>> Some operators don't check the `recordWriter.isAvailable` >>>> >>>>>> during >>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have >>>> mentioned >>>> >>>> it >>>> >>>>>> in >>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases. >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use up >>>> >>>> all >>>> >>>>>>>>>> remaining >>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is >>>> >>>>>> severe. >>>> >>>>>>>>>>>>>> How to limit it? >>>> >>>>>>>>>>>>>> I prefer to hard code the >>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions` >>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The >>>> >>>>> maxOverdraftBuffers >>>> >>>>>> is >>>> >>>>>>>>>> just >>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink jobs. >>>> Or >>>> >>>>> we >>>> >>>>>>> can >>>> >>>>>>>>>> set >>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` >>>> >>>> to >>>> >>>>>>> handle >>>> >>>>>>>>>>>>>> some jobs of low parallelism. >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we >>>> can >>>> >>>>> set >>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of >>>> >>>> LocalBufferPool. >>>> >>>>>>>> Because >>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint. >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot. >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759 >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> Best wishes >>>> >>>>>>>>>>>>>> fanrui >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov < >>>> >>>>>>>>>> kaa....@yandex.com> >>>> >>>>>>>>>>>>>> wrote: >>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> Hi fanrui, >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP. >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it >>>> >>>> should >>>> >>>>>>> help >>>> >>>>>>>> in >>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about >>>> >>>>>> configuration: >>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand >>>> right >>>> >>>>> now >>>> >>>>>>> we >>>> >>>>>>>>>> have >>>> >>>>>>>>>>>>>>> following calculation. >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network >>>> >>>>> memory(calculated >>>> >>>>>>> via >>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction, >>>> >>>>>>>> taskmanager.memory.network.min, >>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) / >>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size. >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive >>>> >>>> buffers >>>> >>>>> * >>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in >>>> >>>>>> TaskManager >>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used >>>> at >>>> >>>>>>> current >>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber) >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to >>>> >>>>>>> maxBuffersNumber >>>> >>>>>>>>>> which >>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if >>>> >>>> requiredBuffersNumber >>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not good) >>>> >>>>> since >>>> >>>>>>> not >>>> >>>>>>>>>> all >>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if Flink >>>> >>>> can >>>> >>>>>> not >>>> >>>>>>>>>>>>>>> allocate floating buffers) >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I >>>> >>>>>> understand >>>> >>>>>>>>>> Flink >>>> >>>>>>>>>>>>>>> just never use these leftovers buffers(maxBuffersNumber >>>> - >>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we can >>>> >>>>>> actualy >>>> >>>>>>>> use >>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber - >>>> >>>> buffersInUseNumber' >>>> >>>>>>> since >>>> >>>>>>>>>> if >>>> >>>>>>>>>>>>>>> one TaskManager contains several operators including >>>> >>>>> 'window' >>>> >>>>>>>> which >>>> >>>>>>>>>> can >>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool). >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to >>>> >>>> requesting >>>> >>>>>>>> buffers >>>> >>>>>>>>>>>>>>> during processing single record while switching to >>>> >>>>>> unavalability >>>> >>>>>>>>>>>> between >>>> >>>>>>>>>>>>>>> records should be the same as we have it now): >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel >>>> >>>>>> reached, >>>> >>>>>>>>>> then >>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers >>>> from >>>> >>>>> any >>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet >>>> >>>> otherwise >>>> >>>>>>> from >>>> >>>>>>>>>>>>>>> NetworkBufferPool) >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally >>>> >>>> allocate >>>> >>>>>> it >>>> >>>>>>>> from >>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't work, >>>> >>>> but I >>>> >>>>>>> like >>>> >>>>>>>> it >>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch without >>>> >>>> any >>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be >>>> configuration >>>> >>>> by >>>> >>>>>>>>>> changing >>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and >>>> requiredBuffersNumber. >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really want >>>> to >>>> >>>>>>>> implement >>>> >>>>>>>>>> new >>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to >>>> >>>>> correctly >>>> >>>>>>>>>> configure >>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I don't >>>> >>>> want >>>> >>>>>> to >>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to >>>> >>>> resolve >>>> >>>>>> the >>>> >>>>>>>>>> problem >>>> >>>>>>>>>>>>>>> automatically(as described above). >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers >>>> >>>> correct? >>>> >>>>>>>>>>>>>>> -- >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> Best regards, >>>> >>>>>>>>>>>>>>> Anton Kalashnikov >>>> >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет: >>>> >>>>>>>>>>>>>>>> Hi everyone, >>>> >>>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature >>>> of >>>> >>>>>> Flink. >>>> >>>>>>>> It >>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout or >>>> >>>>> slow >>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe. >>>> >>>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work >>>> well >>>> >>>>>> when >>>> >>>>>>>> the >>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers are >>>> >>>>>>> required >>>> >>>>>>>> to >>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also mentioned >>>> >>>>> this >>>> >>>>>>>> issue >>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve it. >>>> >>>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the >>>> >>>>>>> overdraft >>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton >>>> >>>> Kalashnikov, >>>> >>>>>>> there >>>> >>>>>>>>>> are >>>> >>>>>>>>>>>>>>>> still some points to discuss: >>>> >>>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>>> * There are already a lot of buffer-related >>>> >>>>>> configurations. >>>> >>>>>>>> Do >>>> >>>>>>>>>> we >>>> >>>>>>>>>>>>>>>> need to add a new configuration for the >>>> overdraft >>>> >>>>>> buffer? >>>> >>>>>>>>>>>>>>>> * Where should the overdraft buffer use memory? >>>> >>>>>>>>>>>>>>>> * If the overdraft-buffer uses the memory >>>> remaining >>>> >>>> in >>>> >>>>>> the >>>> >>>>>>>>>>>>>>>> NetworkBufferPool, no new configuration needs >>>> to be >>>> >>>>>>> added. >>>> >>>>>>>>>>>>>>>> * If adding a new configuration: >>>> >>>>>>>>>>>>>>>> o Should we set the overdraft-memory-size at >>>> the >>>> >>>> TM >>>> >>>>>>> level >>>> >>>>>>>>>> or >>>> >>>>>>>>>>>> the >>>> >>>>>>>>>>>>>>>> Task level? >>>> >>>>>>>>>>>>>>>> o Or set overdraft-buffers to indicate the >>>> number >>>> >>>>> of >>>> >>>>>>>>>>>>>>>> memory-segments that can be overdrawn. >>>> >>>>>>>>>>>>>>>> o What is the default value? How to set >>>> sensible >>>> >>>>>>>> defaults? >>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it >>>> using >>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers at >>>> >>>>> Task >>>> >>>>>>>> level, >>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool >>>> >>>> can >>>> >>>>>>>>>> overdraw up >>>> >>>>>>>>>>>>>>>> to 10 memory-segments. >>>> >>>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>>> Looking forward to your feedback! >>>> >>>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>>> Thanks, >>>> >>>>>>>>>>>>>>>> fanrui >>>> >>>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>>> [1] >>>> >>>>>>>>>>>>>>>> >>>> >> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints >>>> >>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396 >>>> >>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762 >>>> >>>>>>>>>>>>>>>> [4] >>>> >>>>>>>>>>>>>>>> >>>> >> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >>>> >>>>>>>>>>>>>>>> [5] >>>> >>>>>>>>>>>>>>>> >>>> >> >>>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe >>>> >>>>>>>>>>>>>>>> [6] https://github.com/apache/flink-benchmarks/pull/54 >>>> >>>>>>>>>>>> -- >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> Best regards, >>>> >>>>>>>>>>>> Anton Kalashnikov >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> >>>>>>>>>> -- >>>> >>>>>>>>>> >>>> >>>>>>>>>> Best regards, >>>> >>>>>>>>>> Anton Kalashnikov >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> >> -- >>>> >> >>>> >> Best regards, >>>> >> Anton Kalashnikov >>>> >> >>>> >> >>>> >>>