From: Clément Bœsch <clem...@stupeflix.com> --- tests/api/Makefile | 1 + tests/api/api-threadmessage-test.c | 202 +++++++++++++++++++++++++++++++++++++ tests/fate/api.mak | 6 ++ 3 files changed, 209 insertions(+) create mode 100644 tests/api/api-threadmessage-test.c
diff --git a/tests/api/Makefile b/tests/api/Makefile index c48c34a..3556a9b 100644 --- a/tests/api/Makefile +++ b/tests/api/Makefile @@ -3,6 +3,7 @@ APITESTPROGS-$(call DEMDEC, H264, H264) += api-h264 APITESTPROGS-yes += api-seek APITESTPROGS-yes += api-codec-param APITESTPROGS-$(call DEMDEC, H263, H263) += api-band +APITESTPROGS-yes += api-threadmessage APITESTPROGS += $(APITESTPROGS-yes) APITESTOBJS := $(APITESTOBJS:%=$(APITESTSDIR)%) $(APITESTPROGS:%=$(APITESTSDIR)/%-test.o) diff --git a/tests/api/api-threadmessage-test.c b/tests/api/api-threadmessage-test.c new file mode 100644 index 0000000..abf0154 --- /dev/null +++ b/tests/api/api-threadmessage-test.c @@ -0,0 +1,202 @@ +/* + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +/** + * Thread message API test + */ + +#include <pthread.h> + +#include "libavutil/frame.h" +#include "libavutil/avstring.h" +#include "libavutil/threadmessage.h" + +struct tdata { + int id; + pthread_t tid; + int workload; + AVThreadMessageQueue *queue; +}; + +static void free_frame(void *arg) +{ + AVFrame *frame = arg; + av_frame_free(&frame); +} + +/* Frame producing thread. Will flush the queue half way just to be a jerk */ +static void *worker_thread(void *arg) +{ + int i, ret; + struct tdata *td = arg; + + av_log(NULL, AV_LOG_INFO, "worker #%d: workload=%d\n", td->id, td->workload); + for (i = 0; i < td->workload; i++) { + if (i == td->workload/2) { + av_log(NULL, AV_LOG_INFO, "worker #%d: flushing the queue\n", td->id); + av_thread_message_flush(td->queue); + } else { + char *val; + AVDictionary *meta = NULL; + AVFrame *frame = av_frame_alloc(); + if (!frame) { + ret = AVERROR(ENOMEM); + break; + } + + /* we add some metadata to identify the frames */ + val = av_asprintf("frame from worker %d", td->id); + if (!val) { + av_frame_free(&frame); + ret = AVERROR(ENOMEM); + break; + } + ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); + if (ret < 0) { + av_frame_free(&frame); + break; + } + av_frame_set_metadata(frame, meta); + + /* allocate a real frame in order to simulate "real" work */ + frame->format = AV_PIX_FMT_RGBA; + frame->width = 320; + frame->height = 240; + ret = av_frame_get_buffer(frame, 32); + if (ret < 0) { + av_frame_free(&frame); + break; + } + + /* push the frame in the common queue */ + av_log(NULL, AV_LOG_INFO, "worker #%d: sending my work (%p), %d left\n", + td->id, frame, td->workload - i - 1); + ret = av_thread_message_queue_send(td->queue, &frame, 0); + if (ret < 0) { + av_frame_free(&frame); + break; + } + } + } + av_log(NULL, AV_LOG_INFO, "worker #%d: my work is done here (%s)\n", + td->id, av_err2str(ret)); + av_thread_message_queue_set_err_recv(td->queue, ret < 0 ? ret : AVERROR_EOF); + return NULL; +} + +static int consume_queue(AVThreadMessageQueue *q, int n) +{ + int i, ret = 0; + + for (i = 0; i < n; i++) { + AVFrame *frame; + AVDictionary *meta; + AVDictionaryEntry *e; + + ret = av_thread_message_queue_recv(q, &frame, 0); + if (ret < 0) + break; + meta = av_frame_get_metadata(frame); + e = av_dict_get(meta, "sig", NULL, 0); + av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, frame); + av_frame_free(&frame); + } + av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i); + av_thread_message_queue_set_err_send(q, ret < 0 ? ret : AVERROR_EOF); + return ret; +} + +int main(int ac, char **av) +{ + int i, ret = 0; + int nb_workers, max_queue_size; + int worker_min_production, worker_max_production; + int stop_production; + struct tdata *workers; + AVThreadMessageQueue *queue = NULL; + + if (ac != 6) { + av_log(NULL, AV_LOG_ERROR, "%s <nb_workers> <max_queue_size> " + "<worker_min_production> <worker_max_production> " + "<stop_production>\n", av[0]); + return 1; + } + + nb_workers = atoi(av[1]); + max_queue_size = atoi(av[2]); + worker_min_production = atoi(av[3]); + worker_max_production = atoi(av[4]); + stop_production = atoi(av[5]); + + av_log(NULL, AV_LOG_INFO, "%d workers with producing range [%d;%d] on a queue of size %d, " + "will stop after %d is globally produced\n", + nb_workers, worker_min_production, worker_max_production, + max_queue_size, stop_production); + + workers = av_mallocz_array(nb_workers, sizeof(*workers)); + if (!workers) { + ret = AVERROR(ENOMEM); + goto end; + } + + ret = av_thread_message_queue_alloc2(&queue, max_queue_size, sizeof(AVFrame*), free_frame); + if (ret < 0) + goto end; + + for (i = 0; i < nb_workers; i++) { + struct tdata *td = &workers[i]; + + td->id = i; + td->queue = queue; + td->workload = worker_max_production == worker_min_production ? worker_max_production + : rand() % (worker_max_production - worker_min_production) + worker_min_production; + + ret = pthread_create(&td->tid, NULL, worker_thread, td); + if (ret) { + const int err = AVERROR(ret); + av_log(NULL, AV_LOG_ERROR, "Unable to start worker thread: %s\n", av_err2str(err)); + break; + } + } + + av_log(NULL, AV_LOG_INFO, "All workers spawned, start consuming\n"); + ret = consume_queue(queue, stop_production); + + for (i = 0; i < nb_workers; i++) { + struct tdata *td = &workers[i]; + + ret = pthread_join(td->tid, NULL); + if (ret) { + const int err = AVERROR(ret); + av_log(NULL, AV_LOG_ERROR, "Unable to join worker thread: %s\n", av_err2str(err)); + break; + } + } + +end: + av_thread_message_queue_free(&queue); + av_freep(&workers); + + if (ret < 0 && ret != AVERROR_EOF) { + av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret)); + return 1; + } + return 0; +} diff --git a/tests/fate/api.mak b/tests/fate/api.mak index 325f64a..1c51dd3 100644 --- a/tests/fate/api.mak +++ b/tests/fate/api.mak @@ -28,6 +28,12 @@ FATE_API_SAMPLES_LIBAVFORMAT-yes += fate-api-jpeg-codec-param fate-api-jpeg-codec-param: $(APITESTSDIR)/api-codec-param-test$(EXESUF) fate-api-jpeg-codec-param: CMD = run $(APITESTSDIR)/api-codec-param-test $(TARGET_SAMPLES)/exif/image_small.jpg +FATE_API-$(CONFIG_AVUTIL) += fate-api-threadmessage +fate-api-threadmessage: $(APITESTSDIR)/api-threadmessage-test$(EXESUF) +fate-api-threadmessage: CMD = run $(APITESTSDIR)/api-threadmessage-test 5 10 50 100 300 +fate-api-threadmessage: CMP = null +fate-api-threadmessage: REF = /dev/null + FATE_API_SAMPLES-$(CONFIG_AVFORMAT) += $(FATE_API_SAMPLES_LIBAVFORMAT-yes) ifdef SAMPLES -- 2.6.2 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel