PR #21241 opened by Niklas Haas (haasn) URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/21241 Patch URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/21241.patch
This comes into play when e.g. a demuxer is feeding into two decoders, but only one of the two decoders is actually currently needed (e.g. due to A/V misalignment). In that case, what typically happens is that the unneeded decoder alse decodes all frames, and then piles them up on the "buffersrc" filter's downstream link (growing indefinitely). In this scenario, it would be much better to have the decoder continue accepting packets, but defer actually decoding them until the output is needed. This effectively simulates a sort of "filter graph"-type semantics but for the decoder queue. However, to avoid letting the demuxer run freely, we still need to maintain backpressure on the demuxer by having it continue blocking on decoders which are actively decoding. This requires separating the choke status into an "input choke" and an "output choke". When the decoder is output-choked but not input-choked, we should continue accepting packets but store them in an internal overflow FIFO instead of decoding them right away. This allows decoders to avoid blocking demuxers for unrelated (not yet needed) streams; while also avoiding the excessive usage of memory when such decoders are feeding into filter inputs. This overflow logic is fairly self-contained inside `sch_dec_receive`, though it is quite nontrivial. I have added as much documentation as is hopefully needed to understand the logic. ## Further information This is a cleaner fix for the apad/atrim deadlock issues, while also fixing a different race condition that I observed in practice with the previous logic. (I will submit a fix / FATE sample for that as well, shortly) >From 2929da4320b541e01f45465d41011d216921b247 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Tue, 9 Dec 2025 15:52:49 +0100 Subject: [PATCH 1/6] Revert "fftools/ffmpeg_sched: forward demuxer choke status to dst queues" I want to change the way the choking logic works; where instead of directly choking downstream queues, we should instead schedule those filter nodes directly via waiters. This reverts commit 9d0b88feb17dabfebcb10b801045c1285fa5e4bc. --- fftools/ffmpeg_sched.c | 41 +++-------------------------------------- 1 file changed, 3 insertions(+), 38 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index c5ee6971f1..eadbcd2372 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1359,36 +1359,6 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) } } -static void choke_demux(const Scheduler *sch, int demux_id, int choked) -{ - av_assert1(demux_id < sch->nb_demux); - SchDemux *demux = &sch->demux[demux_id]; - - for (int i = 0; i < demux->nb_streams; i++) { - SchedulerNode *dst = demux->streams[i].dst; - SchFilterGraph *fg; - - switch (dst->type) { - case SCH_NODE_TYPE_DEC: - tq_choke(sch->dec[dst->idx].queue, choked); - break; - case SCH_NODE_TYPE_ENC: - tq_choke(sch->enc[dst->idx].queue, choked); - break; - case SCH_NODE_TYPE_MUX: - break; - case SCH_NODE_TYPE_FILTER_IN: - fg = &sch->filters[dst->idx]; - if (fg->nb_inputs == 1) - tq_choke(fg->queue, choked); - break; - default: - av_unreachable("Invalid destination node type?"); - break; - } - } -} - static void schedule_update_locked(Scheduler *sch) { int64_t dts; @@ -1457,16 +1427,13 @@ static void schedule_update_locked(Scheduler *sch) } } - for (unsigned type = 0; type < 2; type++) { + + for (unsigned type = 0; type < 2; type++) for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - if (w->choked_prev != w->choked_next) { + if (w->choked_prev != w->choked_next) waiter_set(w, w->choked_next); - if (!type) - choke_demux(sch, i, w->choked_next); - } } - } } @@ -2722,8 +2689,6 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts) for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; waiter_set(w, 1); - if (type) - choke_demux(sch, i, 0); // unfreeze to allow draining } for (unsigned i = 0; i < sch->nb_demux; i++) { -- 2.49.1 >From 6884940b607ee552489ade7c00a00a0cdb463a82 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Tue, 9 Dec 2025 15:53:43 +0100 Subject: [PATCH 2/6] Revert "fftools/thread_queue: allow choking thread queues directly" This reverts commit 59a847a23718a87a2bd1e2eae893d9784ed34b0f. --- fftools/thread_queue.c | 17 ----------------- fftools/thread_queue.h | 9 --------- 2 files changed, 26 deletions(-) diff --git a/fftools/thread_queue.c b/fftools/thread_queue.c index eb33431c98..b035ffe11d 100644 --- a/fftools/thread_queue.c +++ b/fftools/thread_queue.c @@ -38,7 +38,6 @@ enum { }; struct ThreadQueue { - int choked; int *finished; unsigned int nb_streams; @@ -158,9 +157,6 @@ static int receive_locked(ThreadQueue *tq, int *stream_idx, { unsigned int nb_finished = 0; - if (tq->choked) - return AVERROR(EAGAIN); - while (av_container_fifo_read(tq->fifo, data, 0) >= 0) { unsigned idx; int ret; @@ -234,7 +230,6 @@ void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx) * next time the consumer thread tries to read this stream it will get * an EOF and recv-finished flag will be set */ tq->finished[stream_idx] |= FINISHED_SEND; - tq->choked = 0; pthread_cond_broadcast(&tq->cond); pthread_mutex_unlock(&tq->lock); @@ -254,15 +249,3 @@ void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx) pthread_mutex_unlock(&tq->lock); } - -void tq_choke(ThreadQueue *tq, int choked) -{ - pthread_mutex_lock(&tq->lock); - - int prev_choked = tq->choked; - tq->choked = choked; - if (choked != prev_choked) - pthread_cond_broadcast(&tq->cond); - - pthread_mutex_unlock(&tq->lock); -} diff --git a/fftools/thread_queue.h b/fftools/thread_queue.h index ad7669f131..cc01c8a2c9 100644 --- a/fftools/thread_queue.h +++ b/fftools/thread_queue.h @@ -58,15 +58,6 @@ int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data); */ void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx); -/** - * Prevent further reads from the thread queue until it is unchoked. Threads - * attempting to read from the queue will block, similar to when the queue is - * empty. - * - * @param choked 1 to choke, 0 to unchoke - */ -void tq_choke(ThreadQueue *tq, int choked); - /** * Read the next item from the queue. * -- 2.49.1 >From bef889bf9d40442e751a33a58fa847c0d0051455 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Fri, 12 Dec 2025 11:32:01 +0100 Subject: [PATCH 3/6] fftools/ffmpeg_sched: use macros for schedule_update_locked() loops Instead of awkwardly looping over the type, just split this up into multiple loops. The loss in complexity seems worth the loss in conciseness to me, and more importantly, this allows us to easily add more waiter types. --- fftools/ffmpeg_sched.c | 56 +++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index eadbcd2372..74d0be0916 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1374,12 +1374,17 @@ static void schedule_update_locked(Scheduler *sch) atomic_store(&sch->last_dts, dts); // initialize our internal state - for (unsigned type = 0; type < 2; type++) - for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { - SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - w->choked_prev = atomic_load(&w->choked); - w->choked_next = 1; - } +#define RESET_WAITER(field, waiter) \ + do { \ + for (unsigned i = 0; i < sch->nb_##field; i++) { \ + SchWaiter *w = &sch->field[i].waiter; \ + w->choked_prev = atomic_load(&w->choked); \ + w->choked_next = 1; \ + } \ + } while (0) + + RESET_WAITER(demux, waiter); + RESET_WAITER(filters, waiter); // figure out the sources that are allowed to proceed for (unsigned i = 0; i < sch->nb_mux; i++) { @@ -1416,25 +1421,32 @@ static void schedule_update_locked(Scheduler *sch) } // make sure to unchoke at least one source, if still available - for (unsigned type = 0; !have_unchoked && type < 2; type++) - for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { - int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited; - SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - if (!exited) { - w->choked_next = 0; - have_unchoked = 1; - break; - } - } +#define UNCHOKE_ONCE(field) \ + do { \ + for (unsigned i = 0; !have_unchoked && i < sch->nb_##field; i++) { \ + SchWaiter *w = &sch->field[i].waiter; \ + if (!sch->field[i].task_exited) { \ + w->choked_next = 0; \ + have_unchoked = 1; \ + break; \ + } \ + } \ + } while (0) + UNCHOKE_ONCE(demux); + UNCHOKE_ONCE(filters); - for (unsigned type = 0; type < 2; type++) - for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { - SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - if (w->choked_prev != w->choked_next) - waiter_set(w, w->choked_next); - } +#define UPDATE_WAITER(field, waiter) \ + do { \ + for (unsigned i = 0; i < sch->nb_##field; i++) { \ + SchWaiter *w = &sch->field[i].waiter; \ + if (w->choked_prev != w->choked_next) \ + waiter_set(w, w->choked_next); \ + } \ + } while (0) + UPDATE_WAITER(demux, waiter); + UPDATE_WAITER(filters, waiter); } enum { -- 2.49.1 >From 6cdae87937e08f2d36b0efba39de947bc0e21fd7 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Fri, 19 Dec 2025 15:59:23 +0100 Subject: [PATCH 4/6] fftools/ffmpeg_sched: fix sch_stop() and schedule_update_locked() race schedule_update_locked() is supposed to be a no-op when `sch->terminate` is 1. However, there is a TOCTOU error here, where a different thread may currently be executing schedule_update_locked(), having successfully passed the sch->terminate check but without actually updating the choke status. This does not matter for the current code, but will matter with the following commit, where it creates the theoretical possibility of a race where sch_stop() is trying to choke the demuxers (and unchoke the decoders) while schedule_update_locked() is simultaneously trying to choke the decoders, leading to a deadlock if the last decoder is left choked and unable to propagate EOF downstream. The cleanest solution is to just take the scheduler lock while updating the choke status here. This ensures that any other schedule_update_locked() calls will have completed. --- fftools/ffmpeg_sched.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 74d0be0916..0b0b8cf019 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -2697,12 +2697,18 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts) atomic_store(&sch->terminate, 1); + /* Ensure no other thread is currently in schedule_update_locked while + * we are choking all demuxers */ + pthread_mutex_lock(&sch->schedule_lock); + for (unsigned type = 0; type < 2; type++) for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; waiter_set(w, 1); } + pthread_mutex_unlock(&sch->schedule_lock); + for (unsigned i = 0; i < sch->nb_demux; i++) { SchDemux *d = &sch->demux[i]; -- 2.49.1 >From b04fc22904f3ffc1d1a4692a448537f3c467de69 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Fri, 19 Dec 2025 16:00:34 +0100 Subject: [PATCH 5/6] fftools/ffmpeg_sched: directly schedule decoders This is an IMHO cleaner solution to the decoder choking problem than the previous somewhat convoluted approach of choking the thread queue internally. The one downside of this compared to the previous approach is that it does not apply to encoders and muxers. But I think that was of rather limited usefulness anyway, since decoders are the main source of excess memory usage as a result of unneeded progress. (And we could fairly trivially replicate this logic onto the encoders if we really wanted to) Name this field `waiter_in` to distinguish it from the more typical pattern of waiters blocking the *output*, whereas this blocks the input. This is a distinction that will matter in the following commit. --- fftools/ffmpeg_sched.c | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 0b0b8cf019..cf96582be3 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -86,6 +86,7 @@ typedef struct SchDec { unsigned nb_outputs; SchTask task; + SchWaiter waiter_in; // whether allowed to receive packets // Queue for receiving input packets, one stream. ThreadQueue *queue; @@ -539,6 +540,8 @@ void sch_free(Scheduler **psch) av_freep(&dec->outputs); av_frame_free(&dec->send_frame); + + waiter_uninit(&dec->waiter_in); } av_freep(&sch->dec); @@ -789,6 +792,10 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts) return ret; } + ret = waiter_init(&dec->waiter_in); + if (ret < 0) + return ret; + return idx; } @@ -1294,6 +1301,7 @@ static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) switch (dst->type) { case SCH_NODE_TYPE_DEC: dec = &sch->dec[dst->idx]; + dec->waiter_in.choked_next = 0; for (int i = 0; i < dec->nb_outputs; i++) unchoke_downstream(sch, dec->outputs[i].dst); break; @@ -1326,6 +1334,8 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) while (1) { SchFilterGraph *fg; SchDemux *demux; + SchDec *dec; + switch (src.type) { case SCH_NODE_TYPE_DEMUX: // fed directly by a demuxer (i.e. not through a filtergraph) @@ -1337,7 +1347,9 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) unchoke_downstream(sch, demux->streams[i].dst); return; case SCH_NODE_TYPE_DEC: - src = sch->dec[src.idx].src; + dec = &sch->dec[src.idx]; + dec->waiter_in.choked_next = 0; + src = dec->src; continue; case SCH_NODE_TYPE_ENC: src = sch->enc[src.idx].src; @@ -1385,6 +1397,7 @@ static void schedule_update_locked(Scheduler *sch) RESET_WAITER(demux, waiter); RESET_WAITER(filters, waiter); + RESET_WAITER(dec, waiter_in); // figure out the sources that are allowed to proceed for (unsigned i = 0; i < sch->nb_mux; i++) { @@ -1447,6 +1460,7 @@ static void schedule_update_locked(Scheduler *sch) UPDATE_WAITER(demux, waiter); UPDATE_WAITER(filters, waiter); + UPDATE_WAITER(dec, waiter_in); } enum { @@ -2249,6 +2263,10 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) av_assert0(dec_idx < sch->nb_dec); dec = &sch->dec[dec_idx]; + int terminate = waiter_wait(sch, &dec->waiter_in); + if (terminate) + return AVERROR_EXIT; + // the decoder should have given us post-flush end timestamp in pkt if (dec->expect_end_ts) { Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base }; @@ -2707,6 +2725,9 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts) waiter_set(w, 1); } + for (unsigned i = 0; i < sch->nb_dec; i++) + waiter_set(&sch->dec[i].waiter_in, 0); // allow draining + pthread_mutex_unlock(&sch->schedule_lock); for (unsigned i = 0; i < sch->nb_demux; i++) { -- 2.49.1 >From 39ae7d5e8e9703e880285bf2b2d7b4682d38ae23 Mon Sep 17 00:00:00 2001 From: Niklas Haas <[email protected]> Date: Fri, 19 Dec 2025 15:17:53 +0100 Subject: [PATCH 6/6] fftools/ffmpeg_sched: decouple decoder input and output This comes into play when e.g. a demuxer is feeding into two decoders, but only one of the two decoders is actually currently needed (e.g. due to A/V misalignment). In that case, what typically happens is that the unneeded decoder alse decodes all frames, and then piles them up on the "buffersrc" filter's downstream link (growing indefinitely). In this scenario, it would be much better to have the decoder continue accepting packets, but defer actually decoding them until the output is needed. This effectively simulates a sort of "filter graph"-type semantics but for the decoder queue. However, to avoid letting the demuxer run freely, we still need to maintain backpressure on the demuxer by having it continue blocking on decoders which are actively decoding. This requires separating the choke status into an "input choke" and an "output choke". When the decoder is output-choked but not input-choked, we should continue accepting packets but store them in an internal overflow FIFO instead of decoding them right away. This allows decoders to avoid blocking demuxers for unrelated (not yet needed) streams; while also avoiding the excessive usage of memory when such decoders are feeding into filter inputs. This overflow logic is fairly self-contained inside `sch_dec_receive`, though it is quite nontrivial. I have added as much documentation as is hopefully needed to understand the logic. --- fftools/ffmpeg_sched.c | 49 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index cf96582be3..448f8eacfc 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -32,6 +32,7 @@ #include "libavcodec/packet.h" #include "libavutil/avassert.h" +#include "libavutil/container_fifo.h" #include "libavutil/error.h" #include "libavutil/fifo.h" #include "libavutil/frame.h" @@ -87,6 +88,7 @@ typedef struct SchDec { SchTask task; SchWaiter waiter_in; // whether allowed to receive packets + SchWaiter waiter_out; // whether allowed to decode packets // Queue for receiving input packets, one stream. ThreadQueue *queue; @@ -96,6 +98,9 @@ typedef struct SchDec { // temporary storage used by sch_dec_send() AVFrame *send_frame; + + // internal queue of undecoded packets used by sch_dec_receive() + AVContainerFifo *overflow; } SchDec; typedef struct SchSyncQueue { @@ -529,6 +534,7 @@ void sch_free(Scheduler **psch) tq_free(&dec->queue); av_thread_message_queue_free(&dec->queue_end_ts); + av_container_fifo_free(&dec->overflow); for (unsigned j = 0; j < dec->nb_outputs; j++) { SchDecOutput *o = &dec->outputs[j]; @@ -542,6 +548,7 @@ void sch_free(Scheduler **psch) av_frame_free(&dec->send_frame); waiter_uninit(&dec->waiter_in); + waiter_uninit(&dec->waiter_out); } av_freep(&sch->dec); @@ -792,10 +799,18 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts) return ret; } + dec->overflow = av_container_fifo_alloc_avpacket(0); + if (!dec->overflow) + return AVERROR(ENOMEM); + ret = waiter_init(&dec->waiter_in); if (ret < 0) return ret; + ret = waiter_init(&dec->waiter_out); + if (ret < 0) + return ret; + return idx; } @@ -1302,8 +1317,10 @@ static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) case SCH_NODE_TYPE_DEC: dec = &sch->dec[dst->idx]; dec->waiter_in.choked_next = 0; - for (int i = 0; i < dec->nb_outputs; i++) - unchoke_downstream(sch, dec->outputs[i].dst); + if (!dec->waiter_out.choked_next) { + for (int i = 0; i < dec->nb_outputs; i++) + unchoke_downstream(sch, dec->outputs[i].dst); + } break; case SCH_NODE_TYPE_ENC: enc = &sch->enc[dst->idx]; @@ -1348,6 +1365,7 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) return; case SCH_NODE_TYPE_DEC: dec = &sch->dec[src.idx]; + dec->waiter_out.choked_next = 0; dec->waiter_in.choked_next = 0; src = dec->src; continue; @@ -1398,6 +1416,7 @@ static void schedule_update_locked(Scheduler *sch) RESET_WAITER(demux, waiter); RESET_WAITER(filters, waiter); RESET_WAITER(dec, waiter_in); + RESET_WAITER(dec, waiter_out); // figure out the sources that are allowed to proceed for (unsigned i = 0; i < sch->nb_mux; i++) { @@ -1461,6 +1480,7 @@ static void schedule_update_locked(Scheduler *sch) UPDATE_WAITER(demux, waiter); UPDATE_WAITER(filters, waiter); UPDATE_WAITER(dec, waiter_in); + UPDATE_WAITER(dec, waiter_out); } enum { @@ -2267,6 +2287,12 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) if (terminate) return AVERROR_EXIT; +retry: + // Pull a packet from the overflow FIFO while unchoked or expecting EOF ts + if (av_container_fifo_can_read(dec->overflow) && + (!atomic_load(&dec->waiter_out.choked) || dec->expect_end_ts)) + return av_container_fifo_read(dec->overflow, pkt, 0); + // the decoder should have given us post-flush end timestamp in pkt if (dec->expect_end_ts) { Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base }; @@ -2280,11 +2306,26 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) ret = tq_receive(dec->queue, &dummy, pkt); av_assert0(dummy <= 0); + // drain packets from overflow queue before returning EOF + if (ret == AVERROR_EOF && av_container_fifo_can_read(dec->overflow)) + return av_container_fifo_read(dec->overflow, pkt, 0); + else if (ret < 0) + return ret; + // got a flush packet, on the next call to this function the decoder - // will give us post-flush end timestamp - if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts) + // should give us post-flush end timestamp (after draining overflow fifo) + if (!pkt->data && !pkt->side_data_elems && dec->queue_end_ts) dec->expect_end_ts = 1; + // we got a packet, but we're currently choked or have existing overflow + // packets; so push it to the FIFO first + if (atomic_load(&dec->waiter_out.choked) || av_container_fifo_can_read(dec->overflow)) { + ret = av_container_fifo_write(dec->overflow, pkt, 0); + if (ret < 0) + return ret; + goto retry; + } + return ret; } -- 2.49.1 _______________________________________________ ffmpeg-devel mailing list -- [email protected] To unsubscribe send an email to [email protected]
