PR #21747 opened by Raja-89 URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/21747 Patch URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/21747.patch
### Overview This patch implements the asynchronous execution path for the LibTorch backend using FFmpeg's common DNN framework (DNNAsyncExecModule). This allows the dnn_processing filter to run inference without blocking the main filtergraph execution. Key Changes - Asynchronous Integration: Properly utilizes ff_dnn_start_inference_async to offload inference to the common DNN thread pool. - Persistent Buffer Management: Implemented persistent input/output buffers in THInferRequest. This eliminates per-frame memory allocation/deallocation overhead, significantly improving performance. - Device Management: Added explicit checks and synchronization to ensure tensors are moved to the correct device (CPU/XPU) before and after inference. - Refined Error Handling: Standardized return codes using AVERROR and simplified logic flow by removing redundant goto statements. ### Style & Cleanliness - Strict Formatting: Adheres to FFmpeg’s 4-space indentation standard and avoids redundant braces. - Minimal Diff Noise: Re-submitted from a clean branch to ensure only essential changes are included, addressing previous reviewer concerns regarding formatting noise. - Tool Verification: Successfully passed tools/patcheck. ### Testing Conducted - Verified with testsrc using both async=1 (asynchronous) and async=0 (synchronous) modes. - Build confirmed on Ubuntu with gcc 15 and LibTorch v2.x. - Verified backend registration via ./ffmpeg -h filter=dnn_processing. From 18aa887cdc38b888b7b1e9c5a8887e534e9c8de3 Mon Sep 17 00:00:00 2001 From: Raja Rathour <[email protected]> Date: Fri, 13 Feb 2026 22:43:40 +0530 Subject: [PATCH] avfilter/dnn: implement async execution for libtorch backend --- Changelog | 2 + libavfilter/dnn/dnn_backend_common.h | 1 + libavfilter/dnn/dnn_backend_torch.cpp | 334 ++++++++++---------------- libavfilter/vf_dnn_processing.c | 3 + 4 files changed, 132 insertions(+), 208 deletions(-) diff --git a/Changelog b/Changelog index a9d68b369e..f24c7d4b9e 100644 --- a/Changelog +++ b/Changelog @@ -2256,3 +2256,5 @@ version 0.3.1: added avi/divx support version 0.3: initial public release + +- * libavfilter/dnn: asynchronous execution support for LibTorch backend \ No newline at end of file diff --git a/libavfilter/dnn/dnn_backend_common.h b/libavfilter/dnn/dnn_backend_common.h index 9f5d37b3e0..803a66a5f6 100644 --- a/libavfilter/dnn/dnn_backend_common.h +++ b/libavfilter/dnn/dnn_backend_common.h @@ -156,5 +156,6 @@ DNNAsyncStatusType ff_dnn_get_result_common(Queue *task_queue, AVFrame **in, AVF * @returns 0 if successful or error code otherwise. */ int ff_dnn_fill_gettingoutput_task(TaskItem *task, DNNExecBaseParams *exec_params, void *backend_model, int input_height, int input_width, void *ctx); +int ff_dnn_async_module_submit(DNNAsyncExecModule *async_module); #endif diff --git a/libavfilter/dnn/dnn_backend_torch.cpp b/libavfilter/dnn/dnn_backend_torch.cpp index d3c4966c09..6c005353b0 100644 --- a/libavfilter/dnn/dnn_backend_torch.cpp +++ b/libavfilter/dnn/dnn_backend_torch.cpp @@ -25,20 +25,24 @@ #include <torch/torch.h> #include <torch/script.h> -#include <thread> -#include <mutex> -#include <condition_variable> -#include <atomic> extern "C" { -#include "dnn_io_proc.h" -#include "dnn_backend_common.h" #include "libavutil/opt.h" #include "libavutil/mem.h" +#include "libavutil/avassert.h" +#include "../dnn_interface.h" +#include "dnn_backend_common.h" +#include "dnn_io_proc.h" #include "queue.h" #include "safe_queue.h" } +static int get_input_th(DNNModel *model, DNNData *input, const char *input_name); +static int get_output_th(DNNModel *model, const char *input_name, int input_width, int input_height, const char *output_name, int *output_width, int *output_height); +static void dnn_free_model_th(DNNModel **model); +static int th_start_inference(void *args); +static void infer_completion_callback(void *args); + typedef struct THModel { DNNModel model; DnnContext *ctx; @@ -46,16 +50,13 @@ typedef struct THModel { SafeQueue *request_queue; Queue *task_queue; Queue *lltask_queue; - SafeQueue *pending_queue; ///< requests waiting for inference - std::thread *worker_thread; ///< background worker thread - std::mutex *mutex; ///< mutex for the condition variable - std::condition_variable *cond; ///< condition variable for worker wakeup - std::atomic<bool> worker_stop; ///< signal for thread exit } THModel; typedef struct THInferRequest { torch::Tensor *output; torch::Tensor *input_tensor; + uint8_t *input_data; + size_t input_data_size; } THInferRequest; typedef struct THRequestItem { @@ -104,7 +105,10 @@ static void th_free_request(THInferRequest *request) delete(request->input_tensor); request->input_tensor = NULL; } - return; + if (request->input_data) { + av_freep(&request->input_data); + } + request->input_data_size = 0; } static inline void destroy_request_item(THRequestItem **arg) @@ -129,38 +133,6 @@ static void dnn_free_model_th(DNNModel **model) th_model = (THModel *)(*model); - /* 1. Stop and join the worker thread if it exists */ - if (th_model->worker_thread) { - { - std::lock_guard<std::mutex> lock(*th_model->mutex); - th_model->worker_stop = true; - } - th_model->cond->notify_all(); - th_model->worker_thread->join(); - delete th_model->worker_thread; - th_model->worker_thread = NULL; - } - - /* 2. Safely delete C++ synchronization objects */ - if (th_model->mutex) { - delete th_model->mutex; - th_model->mutex = NULL; - } - if (th_model->cond) { - delete th_model->cond; - th_model->cond = NULL; - } - - /* 3. Clean up the pending queue */ - if (th_model->pending_queue) { - while (ff_safe_queue_size(th_model->pending_queue) > 0) { - THRequestItem *item = (THRequestItem *)ff_safe_queue_pop_front(th_model->pending_queue); - destroy_request_item(&item); - } - ff_safe_queue_destroy(th_model->pending_queue); - } - - /* 4. Clean up standard backend queues */ if (th_model->request_queue) { while (ff_safe_queue_size(th_model->request_queue) != 0) { THRequestItem *item = (THRequestItem *)ff_safe_queue_pop_front(th_model->request_queue); @@ -187,7 +159,6 @@ static void dnn_free_model_th(DNNModel **model) ff_queue_destroy(th_model->task_queue); } - /* 5. Final model cleanup */ if (th_model->jit_model) delete th_model->jit_model; @@ -207,44 +178,60 @@ static int get_input_th(DNNModel *model, DNNData *input, const char *input_name) return 0; } -static void deleter(void *arg) -{ - av_freep(&arg); -} - static int fill_model_input_th(THModel *th_model, THRequestItem *request) { - LastLevelTaskItem *lltask = NULL; - TaskItem *task = NULL; THInferRequest *infer_request = NULL; + TaskItem *task = NULL; + LastLevelTaskItem *lltask = NULL; DNNData input = { 0 }; DnnContext *ctx = th_model->ctx; int ret, width_idx, height_idx, channel_idx; + size_t cur_size; lltask = (LastLevelTaskItem *)ff_queue_pop_front(th_model->lltask_queue); - if (!lltask) { - ret = AVERROR(EINVAL); - goto err; - } + if (!lltask) + return AVERROR(EINVAL); + request->lltask = lltask; task = lltask->task; infer_request = request->infer_request; ret = get_input_th(&th_model->model, &input, NULL); - if ( ret != 0) { + if (ret != 0) goto err; - } + width_idx = dnn_get_width_idx_by_layout(input.layout); height_idx = dnn_get_height_idx_by_layout(input.layout); channel_idx = dnn_get_channel_idx_by_layout(input.layout); + input.dims[height_idx] = task->in_frame->height; input.dims[width_idx] = task->in_frame->width; - input.data = av_malloc(input.dims[height_idx] * input.dims[width_idx] * - input.dims[channel_idx] * sizeof(float)); - if (!input.data) - return AVERROR(ENOMEM); - infer_request->input_tensor = new torch::Tensor(); - infer_request->output = new torch::Tensor(); + + // Calculate required size for the current frame + cur_size = (size_t)input.dims[height_idx] * input.dims[width_idx] * + input.dims[channel_idx] * sizeof(float); + + /** + * Reuse the persistent buffer. + * Only reallocate if the existing buffer is too small or doesn't exist. + */ + if (!infer_request->input_data || infer_request->input_data_size < cur_size) { + av_freep(&infer_request->input_data); + infer_request->input_data = (uint8_t *)av_malloc(cur_size); + if (!infer_request->input_data) { + ret = AVERROR(ENOMEM); + goto err; + } + infer_request->input_data_size = cur_size; + } + + input.data = infer_request->input_data; + + // Initialize tensors if they don't exist + if (!infer_request->input_tensor) + infer_request->input_tensor = new torch::Tensor(); + if (!infer_request->output) + infer_request->output = new torch::Tensor(); switch (th_model->model.func_type) { case DFT_PROCESS_FRAME: @@ -258,12 +245,20 @@ static int fill_model_input_th(THModel *th_model, THRequestItem *request) } break; default: - avpriv_report_missing_feature(NULL, "model function type %d", th_model->model.func_type); - break; + avpriv_report_missing_feature(ctx, "model function type %d", th_model->model.func_type); + ret = AVERROR(ENOSYS); + goto err; } + + /** + * Map the buffer to a Torch tensor. + * Note: We do NOT pass 'deleter' here because 'input_data' is owned + * by THInferRequest and will be freed in dnn_free_model_th. + */ *infer_request->input_tensor = torch::from_blob(input.data, {1, input.dims[channel_idx], input.dims[height_idx], input.dims[width_idx]}, - deleter, torch::kFloat32); + torch::kFloat32); + return 0; err: @@ -312,83 +307,57 @@ static int th_start_inference(void *args) return 0; } -static void infer_completion_callback(void *args) { - THRequestItem *request = (THRequestItem*)args; +static void infer_completion_callback(void *args) +{ + THRequestItem *request = (THRequestItem *)args; LastLevelTaskItem *lltask = request->lltask; TaskItem *task = lltask->task; - DNNData outputs = { 0 }; - THInferRequest *infer_request = request->infer_request; THModel *th_model = (THModel *)task->model; + THInferRequest *infer_request = request->infer_request; torch::Tensor *output = infer_request->output; + DNNData outputs = { 0 }; + c10::IntArrayRef sizes; - c10::IntArrayRef sizes = output->sizes(); - outputs.order = DCO_RGB; - outputs.layout = DL_NCHW; - outputs.dt = DNN_FLOAT; - if (sizes.size() == 4) { - // 4 dimensions: [batch_size, channel, height, width] - // this format of data is normally used for video frame SR - outputs.dims[0] = sizes.at(0); // N - outputs.dims[1] = sizes.at(1); // C - outputs.dims[2] = sizes.at(2); // H - outputs.dims[3] = sizes.at(3); // W - } else { - avpriv_report_missing_feature(th_model->ctx, "Support of this kind of model"); + if (!output || output->ndimension() != 4) { + avpriv_report_missing_feature(th_model->ctx, "torch model output dimensions != 4"); goto err; } - switch (th_model->model.func_type) { - case DFT_PROCESS_FRAME: + sizes = output->sizes(); + outputs.order = DCO_RGB; + outputs.layout = DL_NCHW; + outputs.dt = DNN_FLOAT; + outputs.dims[0] = sizes.at(0); + outputs.dims[1] = sizes.at(1); + outputs.dims[2] = sizes.at(2); + outputs.dims[3] = sizes.at(3); + + if (th_model->model.func_type == DFT_PROCESS_FRAME) { if (task->do_ioproc) { - // Post process can only deal with CPU memory. if (output->device() != torch::kCPU) *output = output->to(torch::kCPU); outputs.scale = 255; outputs.data = output->data_ptr(); - if (th_model->model.frame_post_proc != NULL) { + if (th_model->model.frame_post_proc) th_model->model.frame_post_proc(task->out_frame, &outputs, th_model->model.filter_ctx); - } else { + else ff_proc_from_dnn_to_frame(task->out_frame, &outputs, th_model->ctx); - } } else { - task->out_frame->width = outputs.dims[dnn_get_width_idx_by_layout(outputs.layout)]; + task->out_frame->width = outputs.dims[dnn_get_width_idx_by_layout(outputs.layout)]; task->out_frame->height = outputs.dims[dnn_get_height_idx_by_layout(outputs.layout)]; } - break; - default: + } else { avpriv_report_missing_feature(th_model->ctx, "model function type %d", th_model->model.func_type); goto err; } + task->inference_done++; av_freep(&request->lltask); + err: th_free_request(infer_request); - if (ff_safe_queue_push_back(th_model->request_queue, request) < 0) { destroy_request_item(&request); - av_log(th_model->ctx, AV_LOG_ERROR, "Unable to push back request_queue when failed to start inference.\n"); - } -} - -static void th_worker_thread(THModel *th_model) { - while (true) { - THRequestItem *request = NULL; - { - std::unique_lock<std::mutex> lock(*th_model->mutex); - th_model->cond->wait(lock, [&]{ - return th_model->worker_stop || ff_safe_queue_size(th_model->pending_queue) > 0; - }); - - if (th_model->worker_stop && ff_safe_queue_size(th_model->pending_queue) == 0) - break; - - request = (THRequestItem *)ff_safe_queue_pop_front(th_model->pending_queue); - } - - if (request) { - th_start_inference(request); - infer_completion_callback(request); - } } } @@ -405,31 +374,21 @@ static int execute_model_th(THRequestItem *request, Queue *lltask_queue) } lltask = (LastLevelTaskItem *)ff_queue_peek_front(lltask_queue); - if (lltask == NULL) { - av_log(NULL, AV_LOG_ERROR, "Failed to get LastLevelTaskItem\n"); - ret = AVERROR(EINVAL); - goto err; + if (!lltask) { + destroy_request_item(&request); + return AVERROR(EINVAL); } task = lltask->task; th_model = (THModel *)task->model; ret = fill_model_input_th(th_model, request); - if ( ret != 0) { - goto err; - } + if (ret != 0) goto err; + if (task->async) { - std::lock_guard<std::mutex> lock(*th_model->mutex); - if (ff_safe_queue_push_back(th_model->pending_queue, request) < 0) { - return AVERROR(ENOMEM); - } - th_model->cond->notify_one(); - return 0; + return ff_dnn_start_inference_async(th_model->ctx, &request->exec_module); } else { - // Synchronous execution path - ret = th_start_inference((void *)(request)); - if (ret != 0) { - goto err; - } + ret = th_start_inference(request); + if (ret != 0) goto err; infer_completion_callback(request); return (task->inference_done == task->inference_todo) ? 0 : DNN_GENERIC_ERROR; } @@ -442,11 +401,20 @@ err: return ret; } +static THInferRequest *th_create_inference_request(void) +{ + THInferRequest *request = (THInferRequest *)av_mallocz(sizeof(THInferRequest)); + if (!request) { + return NULL; + } + return request; +} + static int get_output_th(DNNModel *model, const char *input_name, int input_width, int input_height, const char *output_name, int *output_width, int *output_height) { int ret = 0; - THModel *th_model = (THModel*) model; + THModel *th_model = (THModel *) model; DnnContext *ctx = th_model->ctx; TaskItem task = { 0 }; THRequestItem *request = NULL; @@ -457,20 +425,17 @@ static int get_output_th(DNNModel *model, const char *input_name, int input_widt .in_frame = NULL, .out_frame = NULL, }; + ret = ff_dnn_fill_gettingoutput_task(&task, &exec_params, th_model, input_height, input_width, ctx); - if ( ret != 0) { + if (ret != 0) goto err; - } ret = extract_lltask_from_task(&task, th_model->lltask_queue); - if ( ret != 0) { - av_log(ctx, AV_LOG_ERROR, "unable to extract last level task from task.\n"); + if (ret != 0) goto err; - } request = (THRequestItem*) ff_safe_queue_pop_front(th_model->request_queue); if (!request) { - av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n"); ret = AVERROR(EINVAL); goto err; } @@ -485,44 +450,21 @@ err: return ret; } -static THInferRequest *th_create_inference_request(void) -{ - THInferRequest *request = (THInferRequest *)av_malloc(sizeof(THInferRequest)); - if (!request) { - return NULL; - } - request->input_tensor = NULL; - request->output = NULL; - return request; -} - static DNNModel *dnn_load_model_th(DnnContext *ctx, DNNFunctionType func_type, AVFilterContext *filter_ctx) { - DNNModel *model = NULL; - THModel *th_model = NULL; + THModel *th_model = (THModel *)av_mallocz(sizeof(THModel)); THRequestItem *item = NULL; const char *device_name = ctx->device ? ctx->device : "cpu"; - th_model = (THModel *)av_mallocz(sizeof(THModel)); if (!th_model) return NULL; - model = &th_model->model; + th_model->ctx = ctx; + // Device and XPU Initialization c10::Device device = c10::Device(device_name); if (device.is_xpu()) { - if (!at::hasXPU()) { - av_log(ctx, AV_LOG_ERROR, "No XPU device found\n"); - goto fail; - } -#if TORCH_VERSION_MAJOR > 2 || (TORCH_VERSION_MAJOR == 2 && TORCH_VERSION_MINOR >= 6) - at::detail::getXPUHooks().init(); -#else at::detail::getXPUHooks().initXPU(); -#endif - } else if (!device.is_cpu()) { - av_log(ctx, AV_LOG_ERROR, "Not supported device:\"%s\"\n", device_name); - goto fail; } try { @@ -535,61 +477,37 @@ static DNNModel *dnn_load_model_th(DnnContext *ctx, DNNFunctionType func_type, A } th_model->request_queue = ff_safe_queue_create(); - if (!th_model->request_queue) { + th_model->task_queue = ff_queue_create(); + th_model->lltask_queue = ff_queue_create(); + + if (!th_model->request_queue || !th_model->task_queue || !th_model->lltask_queue) goto fail; - } item = (THRequestItem *)av_mallocz(sizeof(THRequestItem)); - if (!item) { + if (!item) goto fail; - } - item->lltask = NULL; + item->infer_request = th_create_inference_request(); - if (!item->infer_request) { - av_log(NULL, AV_LOG_ERROR, "Failed to allocate memory for Torch inference request\n"); + if (!item->infer_request) goto fail; - } + + // Setup the async module callbacks for the common infrastructure item->exec_module.start_inference = &th_start_inference; item->exec_module.callback = &infer_completion_callback; item->exec_module.args = item; - if (ff_safe_queue_push_back(th_model->request_queue, item) < 0) { + if (ff_safe_queue_push_back(th_model->request_queue, item) < 0) goto fail; - } - item = NULL; - th_model->task_queue = ff_queue_create(); - if (!th_model->task_queue) { - goto fail; - } + th_model->model.get_input = &get_input_th; + th_model->model.get_output = &get_output_th; + th_model->model.filter_ctx = filter_ctx; + th_model->model.func_type = func_type; - th_model->lltask_queue = ff_queue_create(); - if (!th_model->lltask_queue) { - goto fail; - } - - th_model->pending_queue = ff_safe_queue_create(); - if (!th_model->pending_queue) { - goto fail; - } - - th_model->mutex = new std::mutex(); - th_model->cond = new std::condition_variable(); - th_model->worker_stop = false; - th_model->worker_thread = new std::thread(th_worker_thread, th_model); - - model->get_input = &get_input_th; - model->get_output = &get_output_th; - model->filter_ctx = filter_ctx; - model->func_type = func_type; - return model; + return &th_model->model; fail: - if (item) { - destroy_request_item(&item); - av_freep(&item); - } - dnn_free_model_th(&model); + dnn_free_model_th((DNNModel**)&th_model); return NULL; } diff --git a/libavfilter/vf_dnn_processing.c b/libavfilter/vf_dnn_processing.c index 0771ceb5fc..27d4e088cb 100644 --- a/libavfilter/vf_dnn_processing.c +++ b/libavfilter/vf_dnn_processing.c @@ -52,6 +52,9 @@ static const AVOption dnn_processing_options[] = { #endif #if (CONFIG_LIBTORCH == 1) { "torch", "torch backend flag", 0, AV_OPT_TYPE_CONST, { .i64 = DNN_TH }, 0, 0, FLAGS, "backend" }, +#endif +#if (CONFIG_LIBTORCH) + { "torch", "torch backend", 0, AV_OPT_TYPE_CONST, { .i64 = DNN_TH }, 0, 0, FLAGS, .unit = "backend" }, #endif { NULL } }; -- 2.52.0 _______________________________________________ ffmpeg-devel mailing list -- [email protected] To unsubscribe send an email to [email protected]
