This is an automated email from the git hooks/post-receive script.
Git pushed a commit to branch master
in repository ffmpeg.
The following commit(s) were added to refs/heads/master by this push:
new 27cdf70086 avfilter/dnn: implement asynchronous execution for LibTorch
backend
27cdf70086 is described below
commit 27cdf70086f1205c7fa7a93ceaf4146ef71355fc
Author: Raja-89 <[email protected]>
AuthorDate: Sun Feb 22 21:14:29 2026 +0530
Commit: Guo Yejun <[email protected]>
CommitDate: Mon Mar 2 21:35:17 2026 +0800
avfilter/dnn: implement asynchronous execution for LibTorch backend
This patch implements the DNNAsyncExecModule for the LibTorch backend,
enabling non-blocking inference using the common infrastructure instead
of custom threading (th_async_module_submit) to align with the
TensorFlow and OpenVINO backends.
The implementation uses ff_dnn_start_inference_async which provides
unified async logic across all DNN backends, eliminating the need for
backend-specific threading code.
Verified with:
ffmpeg -f lavfi -i testsrc=duration=5:size=320x240:rate=30 -vf
dnn_processing=dnn_backend=torch:model=model.pt -y output.mp4
Signed-off-by: Raja Rathour <[email protected]>
---
libavfilter/dnn/dnn_backend_torch.cpp | 114 +++-------------------------------
1 file changed, 7 insertions(+), 107 deletions(-)
diff --git a/libavfilter/dnn/dnn_backend_torch.cpp
b/libavfilter/dnn/dnn_backend_torch.cpp
index d3c4966c09..99f55165f2 100644
--- a/libavfilter/dnn/dnn_backend_torch.cpp
+++ b/libavfilter/dnn/dnn_backend_torch.cpp
@@ -25,10 +25,6 @@
#include <torch/torch.h>
#include <torch/script.h>
-#include <thread>
-#include <mutex>
-#include <condition_variable>
-#include <atomic>
extern "C" {
#include "dnn_io_proc.h"
@@ -46,11 +42,6 @@ typedef struct THModel {
SafeQueue *request_queue;
Queue *task_queue;
Queue *lltask_queue;
- SafeQueue *pending_queue; ///< requests waiting for inference
- std::thread *worker_thread; ///< background worker thread
- std::mutex *mutex; ///< mutex for the condition variable
- std::condition_variable *cond; ///< condition variable for worker wakeup
- std::atomic<bool> worker_stop; ///< signal for thread exit
} THModel;
typedef struct THInferRequest {
@@ -129,38 +120,6 @@ static void dnn_free_model_th(DNNModel **model)
th_model = (THModel *)(*model);
- /* 1. Stop and join the worker thread if it exists */
- if (th_model->worker_thread) {
- {
- std::lock_guard<std::mutex> lock(*th_model->mutex);
- th_model->worker_stop = true;
- }
- th_model->cond->notify_all();
- th_model->worker_thread->join();
- delete th_model->worker_thread;
- th_model->worker_thread = NULL;
- }
-
- /* 2. Safely delete C++ synchronization objects */
- if (th_model->mutex) {
- delete th_model->mutex;
- th_model->mutex = NULL;
- }
- if (th_model->cond) {
- delete th_model->cond;
- th_model->cond = NULL;
- }
-
- /* 3. Clean up the pending queue */
- if (th_model->pending_queue) {
- while (ff_safe_queue_size(th_model->pending_queue) > 0) {
- THRequestItem *item = (THRequestItem
*)ff_safe_queue_pop_front(th_model->pending_queue);
- destroy_request_item(&item);
- }
- ff_safe_queue_destroy(th_model->pending_queue);
- }
-
- /* 4. Clean up standard backend queues */
if (th_model->request_queue) {
while (ff_safe_queue_size(th_model->request_queue) != 0) {
THRequestItem *item = (THRequestItem
*)ff_safe_queue_pop_front(th_model->request_queue);
@@ -169,25 +128,11 @@ static void dnn_free_model_th(DNNModel **model)
ff_safe_queue_destroy(th_model->request_queue);
}
- if (th_model->lltask_queue) {
- while (ff_queue_size(th_model->lltask_queue) != 0) {
- LastLevelTaskItem *item = (LastLevelTaskItem
*)ff_queue_pop_front(th_model->lltask_queue);
- av_freep(&item);
- }
+ if (th_model->lltask_queue)
ff_queue_destroy(th_model->lltask_queue);
- }
-
- if (th_model->task_queue) {
- while (ff_queue_size(th_model->task_queue) != 0) {
- TaskItem *item = (TaskItem
*)ff_queue_pop_front(th_model->task_queue);
- av_frame_free(&item->in_frame);
- av_frame_free(&item->out_frame);
- av_freep(&item);
- }
+ if (th_model->task_queue)
ff_queue_destroy(th_model->task_queue);
- }
- /* 5. Final model cleanup */
if (th_model->jit_model)
delete th_model->jit_model;
@@ -370,28 +315,6 @@ err:
}
}
-static void th_worker_thread(THModel *th_model) {
- while (true) {
- THRequestItem *request = NULL;
- {
- std::unique_lock<std::mutex> lock(*th_model->mutex);
- th_model->cond->wait(lock, [&]{
- return th_model->worker_stop ||
ff_safe_queue_size(th_model->pending_queue) > 0;
- });
-
- if (th_model->worker_stop &&
ff_safe_queue_size(th_model->pending_queue) == 0)
- break;
-
- request = (THRequestItem
*)ff_safe_queue_pop_front(th_model->pending_queue);
- }
-
- if (request) {
- th_start_inference(request);
- infer_completion_callback(request);
- }
- }
-}
-
static int execute_model_th(THRequestItem *request, Queue *lltask_queue)
{
THModel *th_model = NULL;
@@ -414,16 +337,12 @@ static int execute_model_th(THRequestItem *request, Queue
*lltask_queue)
th_model = (THModel *)task->model;
ret = fill_model_input_th(th_model, request);
- if ( ret != 0) {
+ if (ret != 0) {
goto err;
}
+
if (task->async) {
- std::lock_guard<std::mutex> lock(*th_model->mutex);
- if (ff_safe_queue_push_back(th_model->pending_queue, request) < 0) {
- return AVERROR(ENOMEM);
- }
- th_model->cond->notify_one();
- return 0;
+ return ff_dnn_start_inference_async(th_model->ctx,
&request->exec_module);
} else {
// Synchronous execution path
ret = th_start_inference((void *)(request));
@@ -543,12 +462,11 @@ static DNNModel *dnn_load_model_th(DnnContext *ctx,
DNNFunctionType func_type, A
if (!item) {
goto fail;
}
- item->lltask = NULL;
item->infer_request = th_create_inference_request();
if (!item->infer_request) {
- av_log(NULL, AV_LOG_ERROR, "Failed to allocate memory for Torch
inference request\n");
goto fail;
}
+
item->exec_module.start_inference = &th_start_inference;
item->exec_module.callback = &infer_completion_callback;
item->exec_module.args = item;
@@ -559,24 +477,7 @@ static DNNModel *dnn_load_model_th(DnnContext *ctx,
DNNFunctionType func_type, A
item = NULL;
th_model->task_queue = ff_queue_create();
- if (!th_model->task_queue) {
- goto fail;
- }
-
th_model->lltask_queue = ff_queue_create();
- if (!th_model->lltask_queue) {
- goto fail;
- }
-
- th_model->pending_queue = ff_safe_queue_create();
- if (!th_model->pending_queue) {
- goto fail;
- }
-
- th_model->mutex = new std::mutex();
- th_model->cond = new std::condition_variable();
- th_model->worker_stop = false;
- th_model->worker_thread = new std::thread(th_worker_thread, th_model);
model->get_input = &get_input_th;
model->get_output = &get_output_th;
@@ -587,7 +488,6 @@ static DNNModel *dnn_load_model_th(DnnContext *ctx,
DNNFunctionType func_type, A
fail:
if (item) {
destroy_request_item(&item);
- av_freep(&item);
}
dnn_free_model_th(&model);
return NULL;
@@ -613,7 +513,7 @@ static int dnn_execute_model_th(const DNNModel *model,
DNNExecBaseParams *exec_p
return AVERROR(ENOMEM);
}
- ret = ff_dnn_fill_task(task, exec_params, th_model, 0, 1);
+ ret = ff_dnn_fill_task(task, exec_params, th_model, ctx->async, 1);
if (ret != 0) {
av_freep(&task);
av_log(ctx, AV_LOG_ERROR, "unable to fill task.\n");
_______________________________________________
ffmpeg-cvslog mailing list -- [email protected]
To unsubscribe send an email to [email protected]