PR #23030 opened by Niklas Haas (haasn) URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/23030 Patch URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/23030.patch
This is a departure from the conventional idea of decoders always outputting data as fast as possible. Instead, this allows decoders to be throttled in the same way filter graphs can be. This comes into play when e.g. a demuxer is feeding into two decoders, but only one of the two decoders is actually currently needed (e.g. due to A/V misalignment). In that case, what typically happens is that the unneeded decoder alse decodes all frames, and then piles them up on the "buffersrc" filter's downstream link (growing indefinitely). Another issue this solves manifests when e.g. a single demuxer is feeding many decoders that all try to feed frames to the same filter graph. In this case, all decoders run as fast as posssible, leading to lock contention on the filter graph input queue; resulting in (again) many frames piling up on the buffersrc (or downstream filters) for the unneeded inputs that are not actually the bottleneck, while the input that's actually undersatisfied can end up starved for CPU time, possibly for long enough to exhaust memory limits. The normal rate limiting fails to apply in this scenario because all decoders share a single demuxer, and are hence rate-limited only by the demuxer speed; whereas the demuxer is not choked because from the PoV of the scheduler, the filter graph is simply not getting enough frames. In a more general sense, there's a philosophical argument to be made here. Since a decoder is typically also a decompressor, it produces more data than it consumes. So, it a sense, it's acting like a type of producer also - in the same way that a filter graph can produce more input that outputs. Solve all of these issues by allowing decoders to be output-choked, which gives the scheduler control over when decoders are allowed to output frames. This does mean we have to add some sort of internal packet queue, because the decoder thread may need to continue *accepting* upstream packets from the demuxer (or else we risk stalling the demuxer), but defer the actual decoding by placing them inside an internal "overflow" queue. This effectively simulates a sort of "filter graph"-type semantics but for the decoder queue. This overflow logic is fairly self-contained inside `sch_dec_receive`, though it is quite nontrivial. I have added as much documentation as is hopefully needed to understand the logic. Importantly, we cannot simply unlimit the decoder input thread queue because the demuxer relies on backpressure from the decoder to rate limit itself. (Note that demuxers may only be active if there is at least one downstream decoder that is alse active, so we always have at least one decoder providing backpressure) Supersedes: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/21241 >From 48ceadb3095f3fc5a6d9928cbb353e5c2b259482 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Tue, 5 May 2026 15:27:05 +0200 Subject: [PATCH 1/4] fftools/ffmpeg_sched: use macros for schedule_update_locked() loops Instead of awkwardly looping over the type, just split this up into multiple loops. The loss in complexity seems worth the loss in conciseness to me, and more importantly, this allows us to easily add more waiter types. Sponsored-by: nxtedition AB Signed-off-by: Niklas Haas <[email protected]> --- fftools/ffmpeg_sched.c | 64 +++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index dc5800d094..b01cf8b6ad 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1424,12 +1424,17 @@ static void schedule_update_locked(Scheduler *sch) atomic_store(&sch->last_dts, progressing_dts(sch, 0)); // initialize our internal state - for (unsigned type = 0; type < 2; type++) - for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { - SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - w->choked_prev = atomic_load(&w->choked); - w->choked_next = 1; - } +#define RESET_WAITER(field) \ + do { \ + for (unsigned i = 0; i < sch->nb_##field; i++) { \ + SchWaiter *w = &sch->field[i].waiter; \ + w->choked_prev = atomic_load(&w->choked); \ + w->choked_next = 1; \ + } \ + } while (0) + + RESET_WAITER(demux); + RESET_WAITER(filters); // figure out the sources that are allowed to proceed for (unsigned i = 0; i < sch->nb_mux; i++) { @@ -1466,28 +1471,35 @@ static void schedule_update_locked(Scheduler *sch) } // make sure to unchoke at least one source, if still available - for (unsigned type = 0; !have_unchoked && type < 2; type++) - for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { - int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited; - SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - if (!exited) { - w->choked_next = 0; - have_unchoked = 1; - break; - } - } +#define UNCHOKE_ONCE(field) \ + do { \ + for (unsigned i = 0; !have_unchoked && i < sch->nb_##field; i++) { \ + SchWaiter *w = &sch->field[i].waiter; \ + if (!sch->field[i].task_exited) { \ + w->choked_next = 0; \ + have_unchoked = 1; \ + break; \ + } \ + } \ + } while (0) - for (unsigned type = 0; type < 2; type++) { - for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { - SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - if (w->choked_prev != w->choked_next) { - waiter_set(w, w->choked_next); - if (!type) - choke_demux(sch, i, w->choked_next); - } - } - } + UNCHOKE_ONCE(demux); + UNCHOKE_ONCE(filters); +#define UPDATE_WAITER(field) \ + do { \ + for (unsigned i = 0; i < sch->nb_##field; i++) { \ + SchWaiter *w = &sch->field[i].waiter; \ + if (w->choked_prev != w->choked_next) { \ + waiter_set(w, w->choked_next); \ + if (offsetof(Scheduler, field) == offsetof(Scheduler, demux)) \ + choke_demux(sch, i, w->choked_next); \ + } \ + } \ + } while (0) + + UPDATE_WAITER(demux); + UPDATE_WAITER(filters); } enum { -- 2.52.0 >From 300470d1ee808b32e3c4f868e46e13593fdbd240 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Fri, 19 Dec 2025 15:59:23 +0100 Subject: [PATCH 2/4] fftools/ffmpeg_sched: fix sch_stop() and schedule_update_locked() race schedule_update_locked() is supposed to be a no-op when `sch->terminate` is 1. However, there is a TOCTOU error here, where a different thread may currently be executing schedule_update_locked(), having successfully passed the sch->terminate check but without actually updating the choke status. This does not matter for the current code, but will matter with the following commit, where it creates the theoretical possibility of a race where sch_stop() is trying to choke the demuxers (and unchoke the decoders) while schedule_update_locked() is simultaneously trying to choke the decoders, leading to a deadlock if the last decoder is left choked and unable to propagate EOF downstream. The cleanest solution is to just take the scheduler lock while updating the choke status here. This ensures that any other schedule_update_locked() calls will have completed. Sponsored-by: nxtedition AB Signed-off-by: Niklas Haas <[email protected]> --- fftools/ffmpeg_sched.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index b01cf8b6ad..ac95ece794 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -2756,6 +2756,10 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts) atomic_store(&sch->terminate, 1); + // Ensure no other thread is currently in schedule_update_locked while + // we are choking all demuxers + pthread_mutex_lock(&sch->schedule_lock); + for (unsigned type = 0; type < 2; type++) for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; @@ -2764,6 +2768,8 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts) choke_demux(sch, i, 0); // unfreeze to allow draining } + pthread_mutex_unlock(&sch->schedule_lock); + for (unsigned i = 0; i < sch->nb_demux; i++) { SchDemux *d = &sch->demux[i]; -- 2.52.0 >From 4028080e968c3602b8e8b873b50ac4940e64575f Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Tue, 5 May 2026 15:11:41 +0200 Subject: [PATCH 3/4] fftools/ffmpeg_sched: allow choosing nodes to unchoke This level of granularity will help for the upcoming patch. Sponsored-by: nxtedition AB Signed-off-by: Niklas Haas <[email protected]> --- fftools/ffmpeg_sched.c | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index ac95ece794..201f708a9e 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1302,7 +1302,14 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ return 0; } -static void unchoke_for_stream(Scheduler *sch, SchedulerNode src); +enum { + UNCHOKE_DEMUX = (1 << 0), + UNCHOKE_FILTER = (1 << 1), + + UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER, +}; + +static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags); // Unchoke any filter graphs that are downstream of this node, to prevent it // from getting stuck trying to push data to a full queue @@ -1331,8 +1338,8 @@ static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) fg->waiter.choked_next = 0; } else { // ensure that this filter graph is not stuck waiting for - // input from a different upstream demuxer - unchoke_for_stream(sch, fg->inputs[fg->best_input].src); + // input from a different upstream source + unchoke_for_stream(sch, fg->inputs[fg->best_input].src, UNCHOKE_ALL); } break; default: @@ -1341,7 +1348,7 @@ static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) } } -static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) +static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags) { while (1) { SchFilterGraph *fg; @@ -1352,9 +1359,11 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) demux = &sch->demux[src.idx]; if (demux->waiter.choked_next == 0) return; // prevent infinite loop - demux->waiter.choked_next = 0; - for (int i = 0; i < demux->nb_streams; i++) - unchoke_downstream(sch, demux->streams[i].dst); + if (flags & UNCHOKE_DEMUX) { + demux->waiter.choked_next = 0; + for (int i = 0; i < demux->nb_streams; i++) + unchoke_downstream(sch, demux->streams[i].dst); + } return; case SCH_NODE_TYPE_DEC: src = sch->dec[src.idx].src; @@ -1367,7 +1376,8 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) // the filtergraph contains internal sources and // requested to be scheduled directly if (fg->best_input == fg->nb_inputs) { - fg->waiter.choked_next = 0; + if (flags & UNCHOKE_FILTER) + fg->waiter.choked_next = 0; return; } src = fg->inputs[fg->best_input].src; @@ -1453,7 +1463,7 @@ static void schedule_update_locked(Scheduler *sch) continue; // resolve the source to unchoke - unchoke_for_stream(sch, ms->src); + unchoke_for_stream(sch, ms->src, UNCHOKE_ALL); have_unchoked = 1; } } @@ -1466,7 +1476,7 @@ static void schedule_update_locked(Scheduler *sch) for (unsigned j = 0; j < fg->nb_inputs; j++) { SchFilterIn *fi = &fg->inputs[j]; if (fi->receive_finished && !fi->send_finished) - unchoke_for_stream(sch, fi->src); + unchoke_for_stream(sch, fi->src, UNCHOKE_ALL); } } -- 2.52.0 >From b83dcbb590358eb74d624f8741547dfb33f36926 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Fri, 19 Dec 2025 15:17:53 +0100 Subject: [PATCH 4/4] fftools/ffmpeg_sched: allow throttling decoder outputs This is a departure from the conventional idea of decoders always outputting data as fast as possible. Instead, this allows decoders to be throttled in the same way filter graphs can be. This comes into play when e.g. a demuxer is feeding into two decoders, but only one of the two decoders is actually currently needed (e.g. due to A/V misalignment). In that case, what typically happens is that the unneeded decoder alse decodes all frames, and then piles them up on the "buffersrc" filter's downstream link (growing indefinitely). Another issue this solves manifests when e.g. a single demuxer is feeding many decoders that all try to feed frames to the same filter graph. In this case, all decoders run as fast as posssible, leading to lock contention on the filter graph input queue; resulting in (again) many frames piling up on the buffersrc (or downstream filters) for the unneeded inputs that are not actually the bottleneck, while the input that's actually undersatisfied can end up starved for CPU time, possibly for long enough to exhaust memory limits. The normal rate limiting fails to apply in this scenario because all decoders share a single demuxer, and are hence rate-limited only by the demuxer speed; whereas the demuxer is not choked because from the PoV of the scheduler, the filter graph is simply not getting enough frames. In a more general sense, there's a philosophical argument to be made here. Since a decoder is typically also a decompressor, it produces more data than it consumes. So, it a sense, it's acting like a type of producer also - in the same way that a filter graph can produce more input that outputs. Solve all of these issues by allowing decoders to be output-choked, which gives the scheduler control over when decoders are allowed to output frames. This does mean we have to add some sort of internal packet queue, because the decoder thread may need to continue *accepting* upstream packets from the demuxer (or else we risk stalling the demuxer), but defer the actual decoding by placing them inside an internal "overflow" queue. This effectively simulates a sort of "filter graph"-type semantics but for the decoder queue. This overflow logic is fairly self-contained inside `sch_dec_receive`, though it is quite nontrivial. I have added as much documentation as is hopefully needed to understand the logic. Importantly, we cannot simply unlimit the decoder input thread queue because the demuxer relies on backpressure from the decoder to rate limit itself. (Note that demuxers may only be active if there is at least one downstream decoder that is alse active, so we always have at least one decoder providing backpressure) Sponsored-by: nxtedition AB Signed-off-by: Niklas Haas <[email protected]> --- fftools/ffmpeg_sched.c | 68 +++++++++++++++++++++++++++++++++++------- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 201f708a9e..70a939b267 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -32,6 +32,7 @@ #include "libavcodec/packet.h" #include "libavutil/avassert.h" +#include "libavutil/container_fifo.h" #include "libavutil/error.h" #include "libavutil/fifo.h" #include "libavutil/frame.h" @@ -86,6 +87,7 @@ typedef struct SchDec { unsigned nb_outputs; SchTask task; + SchWaiter waiter; // Queue for receiving input packets, one stream. ThreadQueue *queue; @@ -95,6 +97,9 @@ typedef struct SchDec { // temporary storage used by sch_dec_send() AVFrame *send_frame; + + // internal queue of undecoded packets used by sch_dec_receive() + AVContainerFifo *overflow; } SchDec; typedef struct SchSyncQueue { @@ -548,6 +553,7 @@ void sch_free(Scheduler **psch) tq_free(&dec->queue); av_thread_message_queue_free(&dec->queue_end_ts); + av_container_fifo_free(&dec->overflow); for (unsigned j = 0; j < dec->nb_outputs; j++) { SchDecOutput *o = &dec->outputs[j]; @@ -559,6 +565,8 @@ void sch_free(Scheduler **psch) av_freep(&dec->outputs); av_frame_free(&dec->send_frame); + + waiter_uninit(&dec->waiter); } av_freep(&sch->dec); @@ -809,6 +817,14 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts) return ret; } + dec->overflow = av_container_fifo_alloc_avpacket(0); + if (!dec->overflow) + return AVERROR(ENOMEM); + + ret = waiter_init(&dec->waiter); + if (ret < 0) + return ret; + return idx; } @@ -1305,8 +1321,9 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ enum { UNCHOKE_DEMUX = (1 << 0), UNCHOKE_FILTER = (1 << 1), + UNCHOKE_DECODE = (1 << 2), - UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER, + UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER | UNCHOKE_DECODE, }; static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags); @@ -1321,8 +1338,10 @@ static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) switch (dst->type) { case SCH_NODE_TYPE_DEC: dec = &sch->dec[dst->idx]; - for (int i = 0; i < dec->nb_outputs; i++) - unchoke_downstream(sch, dec->outputs[i].dst); + if (!dec->waiter.choked) { + for (int i = 0; i < dec->nb_outputs; i++) + unchoke_downstream(sch, dec->outputs[i].dst); + } break; case SCH_NODE_TYPE_ENC: enc = &sch->enc[dst->idx]; @@ -1353,6 +1372,7 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags) while (1) { SchFilterGraph *fg; SchDemux *demux; + SchDec *dec; switch (src.type) { case SCH_NODE_TYPE_DEMUX: // fed directly by a demuxer (i.e. not through a filtergraph) @@ -1366,7 +1386,11 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags) } return; case SCH_NODE_TYPE_DEC: - src = sch->dec[src.idx].src; + dec = &sch->dec[src.idx]; + if (!(flags & UNCHOKE_DECODE)) + return; + dec->waiter.choked_next = 0; + src = dec->src; continue; case SCH_NODE_TYPE_ENC: src = sch->enc[src.idx].src; @@ -1445,6 +1469,7 @@ static void schedule_update_locked(Scheduler *sch) RESET_WAITER(demux); RESET_WAITER(filters); + RESET_WAITER(dec); // figure out the sources that are allowed to proceed for (unsigned i = 0; i < sch->nb_mux; i++) { @@ -1455,11 +1480,12 @@ static void schedule_update_locked(Scheduler *sch) // unblock sources for output streams that are not finished // and not too far ahead of the trailing stream - if (ms->source_finished) + if ((dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) || + ms->source_finished) { + // still allow decoders to drain + unchoke_for_stream(sch, ms->src, UNCHOKE_DECODE); continue; - if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE) - continue; - if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) + } else if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE) continue; // resolve the source to unchoke @@ -1510,6 +1536,7 @@ static void schedule_update_locked(Scheduler *sch) UPDATE_WAITER(demux); UPDATE_WAITER(filters); + UPDATE_WAITER(dec); } enum { @@ -2312,6 +2339,12 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) av_assert0(dec_idx < sch->nb_dec); dec = &sch->dec[dec_idx]; +retry: + // Pull a packet from the overflow FIFO while unchoked or expecting EOF ts + if (av_container_fifo_can_read(dec->overflow) && + (!atomic_load(&dec->waiter.choked) || dec->expect_end_ts)) + return av_container_fifo_read(dec->overflow, pkt, 0); + // the decoder should have given us post-flush end timestamp in pkt if (dec->expect_end_ts) { Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base }; @@ -2325,11 +2358,26 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) ret = tq_receive(dec->queue, &dummy, pkt); av_assert0(dummy <= 0); + // drain packets from overflow queue before returning EOF + if (ret == AVERROR_EOF && av_container_fifo_can_read(dec->overflow)) + return av_container_fifo_read(dec->overflow, pkt, 0); + else if (ret < 0) + return ret; + // got a flush packet, on the next call to this function the decoder - // will give us post-flush end timestamp - if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts) + // should give us post-flush end timestamp (after draining overflow fifo) + if (!pkt->data && !pkt->side_data_elems && dec->queue_end_ts) dec->expect_end_ts = 1; + // we got a packet, but we're currently choked or have existing overflow + // packets; so push it to the FIFO first + if (atomic_load(&dec->waiter.choked) || av_container_fifo_can_read(dec->overflow)) { + ret = av_container_fifo_write(dec->overflow, pkt, 0); + if (ret < 0) + return ret; + goto retry; + } + return ret; } -- 2.52.0 _______________________________________________ ffmpeg-devel mailing list -- [email protected] To unsubscribe send an email to [email protected]
