Hi

this breaks tests (and breaks in the sense of previously repeatable out -> non 
repeatable)

./ffmpeg -y -i ~/tickets/2414/wmv2decerror-short.asf -vframes 30 -bitexact -f 
framecrc -

end of the output changes each time:
0,         34,         34,        1,   115200, 0xa46b5581
0,         35,         35,        1,   115200, 0x43cf4578
1,      28670,      28670,     7168,    14336, 0x7b551f3b
1,      35838,      35838,     7168,    14336, 0x8eab5055
1,      43006,      43006,     6144,    12288, 0x4d208b18


0,         34,         34,        1,   115200, 0xa46b5581
0,         35,         35,        1,   115200, 0x43cf4578
1,      28670,      28670,     7168,    14336, 0x7b551f3b
1,      35838,      35838,     7168,    14336, 0x8eab5055

0,         33,         33,        1,   115200, 0xbf1e6a72
0,         34,         34,        1,   115200, 0xa46b5581
0,         35,         35,        1,   115200, 0x43cf4578
1,      28670,      28670,     7168,    14336, 0x7b551f3b


I understand this likely is a bug that latently exists prior to this
commit. But still a command line even with -bitexact should not behave that way

its also not great for the user to have random amounts of extra
stuff in a stream

thx

On Sat, May 23, 2026 at 11:41:45AM +0300, Niklas Haas via ffmpeg-cvslog wrote:
> This is an automated email from the git hooks/post-receive script.
> 
> Git pushed a commit to branch master
> in repository ffmpeg.
> 
> commit 03dfac563018e6e8b81e331ebae0732d8edfe754
> Author:     Niklas Haas <[email protected]>
> AuthorDate: Fri Dec 19 15:17:53 2025 +0100
> Commit:     Niklas Haas <[email protected]>
> CommitDate: Sat May 23 08:41:12 2026 +0000
> 
>     fftools/ffmpeg_sched: allow throttling decoder outputs
>     
>     This is a departure from the conventional idea of decoders always 
> outputting
>     data as fast as possible. Instead, this allows decoders to be throttled 
> in the
>     same way filter graphs can be.
>     
>     This comes into play when e.g. a demuxer is feeding into two decoders, but
>     only one of the two decoders is actually currently needed (e.g. due to
>     A/V misalignment). In that case, what typically happens is that the 
> unneeded
>     decoder alse decodes all frames, and then piles them up on the "buffersrc"
>     filter's downstream link (growing indefinitely).
>     
>     Another issue this solves manifests when e.g. a single demuxer is feeding 
> many
>     decoders that all try to feed frames to the same filter graph. In this 
> case,
>     all decoders run as fast as posssible, leading to lock contention on the
>     filter graph input queue; resulting in (again) many frames piling up on 
> the
>     buffersrc (or downstream filters) for the unneeded inputs that are not 
> actually
>     the bottleneck, while the input that's actually undersatisfied can end up
>     starved for CPU time, possibly for long enough to exhaust memory limits. 
> The
>     normal rate limiting fails to apply in this scenario because all decoders 
> share
>     a single demuxer, and are hence rate-limited only by the demuxer speed; 
> whereas
>     the demuxer is not choked because from the PoV of the scheduler, the 
> filter
>     graph is simply not getting enough frames.
>     
>     In a more general sense, there's a philosophical argument to be made here.
>     Since a decoder is typically also a decompressor, it produces more data 
> than
>     it consumes. So, it a sense, it's acting like a type of producer also - in
>     the same way that a filter graph can produce more input that outputs.
>     
>     Solve all of these issues by allowing decoders to be output-choked, which
>     gives the scheduler control over when decoders are allowed to output 
> frames.
>     This does mean we have to add some sort of internal packet queue, because 
> the
>     decoder thread may need to continue *accepting* upstream packets from the
>     demuxer (or else we risk stalling the demuxer), but defer the actual 
> decoding
>     by placing them inside an internal "overflow" queue.
>     
>     This effectively simulates a sort of "filter graph"-type semantics but
>     for the decoder queue.
>     
>     This overflow logic is fairly self-contained inside `sch_dec_receive`, 
> though
>     it is quite nontrivial. I have added as much documentation as is hopefully
>     needed to understand the logic.
>     
>     Importantly, we cannot simply unlimit the decoder input thread queue 
> because
>     the demuxer relies on backpressure from the decoder to rate limit itself. 
> (Note
>     that demuxers may only be active if there is at least one downstream 
> decoder
>     that is alse active, so we always have at least one decoder providing
>     backpressure)
>     
>     Sponsored-by: nxtedition AB
>     Signed-off-by: Niklas Haas <[email protected]>
> ---
>  fftools/ffmpeg_sched.c | 70 
> +++++++++++++++++++++++++++++++++++++++++++++-----
>  1 file changed, 63 insertions(+), 7 deletions(-)
> 
> diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
> index dddd1d2c67..8ec3bdaf80 100644
> --- a/fftools/ffmpeg_sched.c
> +++ b/fftools/ffmpeg_sched.c
> @@ -32,6 +32,7 @@
>  #include "libavcodec/packet.h"
>  
>  #include "libavutil/avassert.h"
> +#include "libavutil/container_fifo.h"
>  #include "libavutil/error.h"
>  #include "libavutil/fifo.h"
>  #include "libavutil/frame.h"
> @@ -86,6 +87,7 @@ typedef struct SchDec {
>      unsigned         nb_outputs;
>  
>      SchTask             task;
> +    SchWaiter           waiter;
>      // Queue for receiving input packets, one stream.
>      ThreadQueue        *queue;
>  
> @@ -95,6 +97,9 @@ typedef struct SchDec {
>  
>      // temporary storage used by sch_dec_send()
>      AVFrame            *send_frame;
> +
> +    // internal queue of undecoded packets used by sch_dec_receive()
> +    AVContainerFifo    *overflow;
>  } SchDec;
>  
>  typedef struct SchSyncQueue {
> @@ -548,6 +553,7 @@ void sch_free(Scheduler **psch)
>          tq_free(&dec->queue);
>  
>          av_thread_message_queue_free(&dec->queue_end_ts);
> +        av_container_fifo_free(&dec->overflow);
>  
>          for (unsigned j = 0; j < dec->nb_outputs; j++) {
>              SchDecOutput *o = &dec->outputs[j];
> @@ -559,6 +565,8 @@ void sch_free(Scheduler **psch)
>          av_freep(&dec->outputs);
>  
>          av_frame_free(&dec->send_frame);
> +
> +        waiter_uninit(&dec->waiter);
>      }
>      av_freep(&sch->dec);
>  
> @@ -809,6 +817,14 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void 
> *ctx, int send_end_ts)
>              return ret;
>      }
>  
> +    dec->overflow = av_container_fifo_alloc_avpacket(0);
> +    if (!dec->overflow)
> +        return AVERROR(ENOMEM);
> +
> +    ret = waiter_init(&dec->waiter);
> +    if (ret < 0)
> +        return ret;
> +
>      return idx;
>  }
>  
> @@ -1305,8 +1321,9 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned 
> mux_idx, unsigned stream_
>  enum {
>      UNCHOKE_DEMUX  = (1 << 0),
>      UNCHOKE_FILTER = (1 << 1),
> +    UNCHOKE_DECODE = (1 << 2),
>  
> -    UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER,
> +    UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER | UNCHOKE_DECODE,
>  };
>  
>  static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags);
> @@ -1321,8 +1338,10 @@ static void unchoke_downstream(Scheduler *sch, 
> SchedulerNode *dst)
>      switch (dst->type) {
>      case SCH_NODE_TYPE_DEC:
>          dec = &sch->dec[dst->idx];
> -        for (int i = 0; i < dec->nb_outputs; i++)
> -            unchoke_downstream(sch, dec->outputs[i].dst);
> +        if (!dec->waiter.choked_next) {
> +            for (int i = 0; i < dec->nb_outputs; i++)
> +                unchoke_downstream(sch, dec->outputs[i].dst);
> +        }
>          break;
>      case SCH_NODE_TYPE_ENC:
>          enc = &sch->enc[dst->idx];
> @@ -1353,6 +1372,7 @@ static void unchoke_for_stream(Scheduler *sch, 
> SchedulerNode src, int flags)
>      while (1) {
>          SchFilterGraph *fg;
>          SchDemux *demux;
> +        SchDec *dec;
>          switch (src.type) {
>          case SCH_NODE_TYPE_DEMUX:
>              // fed directly by a demuxer (i.e. not through a filtergraph)
> @@ -1366,7 +1386,11 @@ static void unchoke_for_stream(Scheduler *sch, 
> SchedulerNode src, int flags)
>              }
>              return;
>          case SCH_NODE_TYPE_DEC:
> -            src = sch->dec[src.idx].src;
> +            dec = &sch->dec[src.idx];
> +            if (!(flags & UNCHOKE_DECODE))
> +                return;
> +            dec->waiter.choked_next = 0;
> +            src = dec->src;
>              continue;
>          case SCH_NODE_TYPE_ENC:
>              src = sch->enc[src.idx].src;
> @@ -1445,6 +1469,7 @@ static void schedule_update_locked(Scheduler *sch)
>  
>      RESET_WAITER(demux);
>      RESET_WAITER(filters);
> +    RESET_WAITER(dec);
>  
>      // figure out the sources that are allowed to proceed
>      for (unsigned i = 0; i < sch->nb_mux; i++) {
> @@ -1455,8 +1480,11 @@ static void schedule_update_locked(Scheduler *sch)
>  
>              // unblock sources for output streams that are not finished
>              // and not too far ahead of the trailing stream
> -            if (ms->source_finished)
> +            if (ms->source_finished) {
> +                // still allow decoders to drain
> +                unchoke_for_stream(sch, ms->src, UNCHOKE_DECODE);
>                  continue;
> +            }
>              if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
>                  continue;
>              if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= 
> SCHEDULE_TOLERANCE)
> @@ -1510,6 +1538,7 @@ static void schedule_update_locked(Scheduler *sch)
>  
>      UPDATE_WAITER(demux);
>      UPDATE_WAITER(filters);
> +    UPDATE_WAITER(dec);
>  }
>  
>  enum {
> @@ -2312,6 +2341,12 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, 
> AVPacket *pkt)
>      av_assert0(dec_idx < sch->nb_dec);
>      dec = &sch->dec[dec_idx];
>  
> +retry:
> +    // Pull a packet from the overflow FIFO while unchoked or expecting EOF 
> ts
> +    if (av_container_fifo_can_read(dec->overflow) &&
> +        (!atomic_load(&dec->waiter.choked) || dec->expect_end_ts))
> +        return av_container_fifo_read(dec->overflow, pkt, 0);
> +
>      // the decoder should have given us post-flush end timestamp in pkt
>      if (dec->expect_end_ts) {
>          Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
> @@ -2325,11 +2360,29 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, 
> AVPacket *pkt)
>      ret = tq_receive(dec->queue, &dummy, pkt, 0);
>      av_assert0(dummy <= 0);
>  
> +    // drain packets from overflow queue before returning EOF
> +    if (ret == AVERROR_EOF && av_container_fifo_can_read(dec->overflow)) {
> +        int terminate = waiter_wait(sch, &dec->waiter);
> +        if (terminate)
> +            return ret;
> +        return av_container_fifo_read(dec->overflow, pkt, 0);
> +    } else if (ret < 0)
> +        return ret;
> +
>      // got a flush packet, on the next call to this function the decoder
> -    // will give us post-flush end timestamp
> -    if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
> +    // should give us post-flush end timestamp (after draining overflow fifo)
> +    if (!pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
>          dec->expect_end_ts = 1;
>  
> +    // we got a packet, but we're currently choked or have existing overflow
> +    // packets; so push it to the FIFO first
> +    if (atomic_load(&dec->waiter.choked) || 
> av_container_fifo_can_read(dec->overflow)) {
> +        ret = av_container_fifo_write(dec->overflow, pkt, 0);
> +        if (ret < 0)
> +            return ret;
> +        goto retry;
> +    }
> +
>      return ret;
>  }
>  
> @@ -2785,6 +2838,9 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
>                  choke_demux(sch, i, 0); // unfreeze to allow draining
>          }
>  
> +    for (unsigned i = 0; i < sch->nb_dec; i++)
> +        waiter_set(&sch->dec[i].waiter, 0); // unfreeze to allow draining
> +
>      pthread_mutex_unlock(&sch->schedule_lock);
>  
>      for (unsigned i = 0; i < sch->nb_demux; i++) {
> 
> _______________________________________________
> ffmpeg-cvslog mailing list -- [email protected]
> To unsubscribe send an email to [email protected]
> 

-- 
Michael     GnuPG fingerprint: 9FF2128B147EF6730BADF133611EC787040B0FAB

Never trust a computer, one day, it may think you are the virus. -- Compn

Attachment: signature.asc
Description: PGP signature

_______________________________________________
ffmpeg-devel mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to