It enabled multiple simple filter graph concurrency, which bring above about 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
Below are some test cases and comparison as reference. (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz) (Software: Intel iHD driver - 16.9.00100, CentOS 7) For 1:N transcode by GPU acceleration with vaapi: ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \ -hwaccel_output_format vaapi \ -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \ -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null test results: 2 encoders 5 encoders 10 encoders Improved 6.1% 6.9% 5.5% For 1:N transcode by GPU acceleration with QSV: ./ffmpeg -hwaccel qsv -c:v h264_qsv \ -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \ -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null test results: 2 encoders 5 encoders 10 encoders Improved 6% 4% 15% For Intel GPU acceleration case, 1 decode to N scaling, by QSV: ./ffmpeg -hwaccel qsv -c:v h264_qsv \ -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \ -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null test results: 2 scale 5 scale 10 scale Improved 12% 21% 21% For CPU only 1 decode to N scaling: ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \ -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null test results: 2 scale 5 scale 10 scale Improved 25% 107% 148% Signed-off-by: Wang, Shaofei <shaofei.w...@intel.com> --- Passed fate and refine the possible data race. The patch will only effect on multiple SIMPLE filter graphs pipeline fftools/ffmpeg.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++------ fftools/ffmpeg.h | 13 +++++ 2 files changed, 169 insertions(+), 16 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 544f1a1..c0c9ca8 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -164,7 +164,13 @@ static struct termios oldtty; static int restore_tty; #endif +/* enable abr threads when there were multiple simple filter graphs*/ +static int abr_threads_enabled = 0; + #if HAVE_THREADS +pthread_mutex_t fg_config_mutex; +pthread_mutex_t ost_init_mutex; + static void free_input_threads(void); #endif @@ -509,6 +515,17 @@ static void ffmpeg_cleanup(int ret) } av_fifo_freep(&fg->inputs[j]->ist->sub2video.sub_queue); } +#if HAVE_THREADS + if (abr_threads_enabled) { + av_frame_free(&fg->inputs[j]->input_frm); + pthread_mutex_lock(&fg->inputs[j]->process_mutex); + fg->inputs[j]->waited_frm = NULL; + fg->inputs[j]->t_end = 1; + pthread_cond_signal(&fg->inputs[j]->process_cond); + pthread_mutex_unlock(&fg->inputs[j]->process_mutex); + pthread_join(fg->inputs[j]->abr_thread, NULL); + } +#endif av_buffer_unref(&fg->inputs[j]->hw_frames_ctx); av_freep(&fg->inputs[j]->name); av_freep(&fg->inputs[j]); @@ -1419,12 +1436,13 @@ static void finish_output_stream(OutputStream *ost) * * @return 0 for success, <0 for severe errors */ -static int reap_filters(int flush) +static int reap_filters(int flush, InputFilter * ifilter) { AVFrame *filtered_frame = NULL; int i; - /* Reap all buffers present in the buffer sinks */ + /* Reap all buffers present in the buffer sinks or just reap specified + * buffer which related with the filter graph who got ifilter as input*/ for (i = 0; i < nb_output_streams; i++) { OutputStream *ost = output_streams[i]; OutputFile *of = output_files[ost->file_index]; @@ -1432,13 +1450,25 @@ static int reap_filters(int flush) AVCodecContext *enc = ost->enc_ctx; int ret = 0; + if (ifilter && abr_threads_enabled) + if (ost != ifilter->graph->outputs[0]) + continue; + if (!ost->filter || !ost->filter->graph->graph) continue; filter = ost->filter->filter; if (!ost->initialized) { char error[1024] = ""; +#if HAVE_THREADS + if (abr_threads_enabled) + pthread_mutex_lock(&ost_init_mutex); +#endif ret = init_output_stream(ost, error, sizeof(error)); +#if HAVE_THREADS + if (abr_threads_enabled) + pthread_mutex_unlock(&ost_init_mutex); +#endif if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error initializing output stream %d:%d -- %s\n", ost->file_index, ost->index, error); @@ -2179,13 +2209,22 @@ static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame) } } - ret = reap_filters(1); + ret = (HAVE_THREADS && abr_threads_enabled) ? reap_filters(1, ifilter) : reap_filters(1, NULL); + if (ret < 0 && ret != AVERROR_EOF) { av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); return ret; } +#if HAVE_THREADS + if (abr_threads_enabled) + pthread_mutex_lock(&fg_config_mutex); +#endif ret = configure_filtergraph(fg); +#if HAVE_THREADS + if (abr_threads_enabled) + pthread_mutex_unlock(&fg_config_mutex); +#endif if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error reinitializing filters!\n"); return ret; @@ -2252,29 +2291,98 @@ static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke return 0; } +#if HAVE_THREADS +static void *filter_pipeline(void *arg) +{ + InputFilter *fl = arg; + AVFrame *frm; + int ret; + while(1) { + pthread_mutex_lock(&fl->process_mutex); + while (fl->waited_frm == NULL && !fl->t_end) + pthread_cond_wait(&fl->process_cond, &fl->process_mutex); + pthread_mutex_unlock(&fl->process_mutex); + + if (fl->t_end) break; + + frm = fl->waited_frm; + pthread_mutex_lock(&fl->ifilter_mutex); + ret = ifilter_send_frame(fl, frm); + pthread_mutex_unlock(&fl->ifilter_mutex); + if (ret == AVERROR_EOF) + ret = 0; + else if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Failed to inject frame into filter network: %s\n", av_err2str(ret)); + } else { + ret = reap_filters(1, fl); + } + fl->t_error = ret; + + pthread_mutex_lock(&fl->finish_mutex); + pthread_cond_signal(&fl->finish_cond); + fl->waited_frm = NULL; + pthread_mutex_unlock(&fl->finish_mutex); + } + fl->waited_frm = NULL; + pthread_mutex_lock(&fl->finish_mutex); + pthread_cond_signal(&fl->finish_cond); + pthread_mutex_unlock(&fl->finish_mutex); + return fl; +} +#endif + static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) { - int i, ret; + int i, ret = 0; AVFrame *f; av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */ + for (i = 0; i < ist->nb_filters; i++) { if (i < ist->nb_filters - 1) { - f = ist->filter_frame; + f = (HAVE_THREADS && abr_threads_enabled) ? ist->filters[i]->input_frm : ist->filter_frame; ret = av_frame_ref(f, decoded_frame); if (ret < 0) break; } else f = decoded_frame; - ret = ifilter_send_frame(ist->filters[i], f); - if (ret == AVERROR_EOF) - ret = 0; /* ignore */ - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Failed to inject frame into filter network: %s\n", av_err2str(ret)); - break; + if (!HAVE_THREADS || !abr_threads_enabled) { + ret = ifilter_send_frame(ist->filters[i], f); + if (ret == AVERROR_EOF) + ret = 0; /* ignore */ + if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Failed to inject frame into filter network: %s\n", av_err2str(ret)); + break; + } + } +#if HAVE_THREADS + if (abr_threads_enabled) { + pthread_mutex_lock(&ist->filters[i]->process_mutex); + ist->filters[i]->waited_frm = f; + pthread_cond_signal(&ist->filters[i]->process_cond); + pthread_mutex_unlock(&ist->filters[i]->process_mutex); + } +#endif + } +#if HAVE_THREADS + if (abr_threads_enabled && ret >= 0) { + for (i = 0; i < ist->nb_filters; i++) { + pthread_mutex_lock(&ist->filters[i]->finish_mutex); + while(ist->filters[i]->waited_frm != NULL) + pthread_cond_wait(&ist->filters[i]->finish_cond, + &ist->filters[i]->finish_mutex); + pthread_mutex_unlock(&ist->filters[i]->finish_mutex); + } + for (i = 0; i < ist->nb_filters; i++) { + if (ist->filters[i]->t_error < 0) { + ret = ist->filters[i]->t_error; + break; + } } } +#endif return ret; } @@ -2334,7 +2442,6 @@ static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output, (AVRational){1, avctx->sample_rate}); ist->nb_samples = decoded_frame->nb_samples; err = send_frame_to_filters(ist, decoded_frame); - av_frame_unref(ist->filter_frame); av_frame_unref(decoded_frame); return err < 0 ? err : ret; @@ -3680,6 +3787,8 @@ static int transcode_init(void) break; ofilter->ost->source_index = k; } + if (i >= 1 && filtergraph_is_simple(fg)) + abr_threads_enabled = 1; } /* init framerate emulation */ @@ -3737,6 +3846,37 @@ static int transcode_init(void) } } +#if HAVE_THREADS + if (abr_threads_enabled) { + for (i = 0; i < nb_input_streams; i++) { + ist = input_streams[i]; + for (j = 0; j < ist->nb_filters; j++) { + pthread_mutex_init(&ist->filters[j]->process_mutex, NULL); + pthread_mutex_init(&ist->filters[j]->finish_mutex, NULL); + pthread_cond_init(&ist->filters[j]->process_cond, NULL); + pthread_cond_init(&ist->filters[j]->finish_cond, NULL); + pthread_mutex_init(&ist->filters[j]->ifilter_mutex, NULL); + if (i == 0) { + pthread_mutex_init(&fg_config_mutex, NULL); + pthread_mutex_init(&ost_init_mutex, NULL); + } + ist->filters[j]->t_end = 0; + ist->filters[j]->t_error = 0; + ist->filters[j]->input_frm = av_frame_alloc(); + if (!ist->filters[j]->input_frm) + return AVERROR(ENOMEM); + + if ((ret = pthread_create(&ist->filters[j]->abr_thread, NULL, filter_pipeline, + ist->filters[j]))) { + av_log(NULL, AV_LOG_ERROR, + "abr pipeline pthread_create failed.\n"); + return AVERROR(ret); + } + } + } + } +#endif + dump_format: /* dump the stream mapping */ av_log(NULL, AV_LOG_INFO, "Stream mapping:\n"); @@ -4537,10 +4677,10 @@ static int transcode_from_filter(FilterGraph *graph, InputStream **best_ist) *best_ist = NULL; ret = avfilter_graph_request_oldest(graph->graph); if (ret >= 0) - return reap_filters(0); + return reap_filters(0, NULL); if (ret == AVERROR_EOF) { - ret = reap_filters(1); + ret = reap_filters(1, NULL); for (i = 0; i < graph->nb_outputs; i++) close_output_stream(graph->outputs[i]->ost); return ret; @@ -4642,7 +4782,7 @@ static int transcode_step(void) if (ret < 0) return ret == AVERROR_EOF ? 0 : ret; - return reap_filters(0); + return (HAVE_THREADS && abr_threads_enabled) ? ret : reap_filters(0, NULL); } /* diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index eb1eaf6..a0f11d3 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -253,6 +253,19 @@ typedef struct InputFilter { AVBufferRef *hw_frames_ctx; + // for abr pipeline + AVFrame *waited_frm; + AVFrame *input_frm; +#if HAVE_THREADS + pthread_t abr_thread; + pthread_cond_t process_cond; + pthread_cond_t finish_cond; + pthread_mutex_t process_mutex; + pthread_mutex_t finish_mutex; + pthread_mutex_t ifilter_mutex; +#endif + int t_end; + int t_error; int eof; } InputFilter; -- 1.8.3.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel