Signed-off-by: Stephan Holljes <klaxa1...@googlemail.com> --- Makefile | 7 ++- ffserver.c | 150 +++++++++++++++++++++++++++++++++++++++------------ fileserver.c | 97 +++++++++++++++++++++++++++++++++ fileserver.h | 63 ++++++++++++++++++++++ 4 files changed, 282 insertions(+), 35 deletions(-) create mode 100644 fileserver.c create mode 100644 fileserver.h
diff --git a/Makefile b/Makefile index 83bc4e0..18f3ac3 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,8 @@ LUA_FLAGS = $(shell pkg-config --libs --cflags lua5.3) CFLAGS=-fsanitize=address -fsanitize=undefined # LAV_FLAGS = -L/usr/local/lib -lavcodec -lavformat -lavutil -ffserver: segment.o publisher.o lavfhttpd.o configreader.o ffserver.c - cc -g -Wall $(CFLAGS) $(LAV_FLAGS) $(LUA_FLAGS) -lpthread -o ffserver segment.o publisher.o lavfhttpd.o configreader.o ffserver.c +ffserver: segment.o publisher.o fileserver.o lavfhttpd.o configreader.o ffserver.c + cc -g -Wall $(CFLAGS) $(LAV_FLAGS) $(LUA_FLAGS) -lpthread -o ffserver segment.o publisher.o fileserver.o lavfhttpd.o configreader.o ffserver.c segment.o: segment.c segment.h cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c segment.c @@ -13,6 +13,9 @@ segment.o: segment.c segment.h publisher.o: publisher.c publisher.h cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c publisher.c +fileserver.o: fileserver.c fileserver.h + cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c fileserver.c + lavfhttpd.o: lavfhttpd.c httpd.h cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c lavfhttpd.c diff --git a/ffserver.c b/ffserver.c index 4f42f74..91ad29a 100644 --- a/ffserver.c +++ b/ffserver.c @@ -37,9 +37,11 @@ #include <libavutil/opt.h> #include <libavformat/avformat.h> #include <libavcodec/avcodec.h> +#include <libavutil/file.h> #include "segment.h" #include "publisher.h" +#include "fileserver.h" #include "httpd.h" #include "configreader.h" @@ -64,6 +66,7 @@ struct AcceptInfo { struct HTTPDInterface *httpd; AVFormatContext **ifmt_ctxs; struct HTTPDConfig *config; + struct FileserverContext *fs; int nb_pub; /** number of publishers (streams) equal to number of ifmt_ctx */ }; @@ -448,8 +451,8 @@ void *accept_thread(void *arg) struct AcceptInfo *info = (struct AcceptInfo*) arg; struct FFServerInfo *ffinfo = NULL; struct PublisherContext *pub; - char status[4096]; - char *stream_name; + char status[4096], requested_file[1024], sanitized_file[1024]; + char *stream_name, *resource; struct HTTPClient *client = NULL; void *server = NULL; AVIOContext *client_ctx = NULL; @@ -493,6 +496,8 @@ void *accept_thread(void *arg) pub = NULL; ifmt_ctx = NULL; + resource = client->resource; + snprintf(requested_file, 1024, "%s", resource); for (i = 0; i < config->nb_streams; i++) { stream_name = info->pubs[i]->stream_name; // skip leading '/' ---v @@ -515,7 +520,8 @@ void *accept_thread(void *arg) } } - if (!pub || !ifmt_ctx) { + + if ((!pub || !ifmt_ctx) && !requested_file[0]) { av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n", client->resource ? client->resource : "(null)"); reply_code = 404; @@ -534,13 +540,6 @@ void *accept_thread(void *arg) continue; } - avio_buffer = av_malloc(AV_BUFSIZE); - if (!avio_buffer) { - av_log(client_ctx, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(pub); - info->httpd->close(server, client); - continue; - } ffinfo = av_malloc(sizeof(*ffinfo)); if (!ffinfo) { av_log(client_ctx, AV_LOG_ERROR, "Could not allocate FFServerInfo struct.\n"); @@ -551,6 +550,30 @@ void *accept_thread(void *arg) ffinfo->httpd = info->httpd; ffinfo->client = client; ffinfo->server = server; + + + // try to serve file + if (requested_file[0]) { + snprintf(sanitized_file, 1024, "%s", requested_file); + resource = requested_file; + while(resource && *resource == '/') { + resource++; + } + + snprintf(sanitized_file, 1024, "%s", resource); + fileserver_schedule(info->fs, ffinfo, sanitized_file); + continue; + } + + avio_buffer = av_malloc(AV_BUFSIZE); + if (!avio_buffer) { + av_log(client_ctx, AV_LOG_ERROR, "Could not allocate output format context.\n"); + publisher_cancel_reserve(pub); + info->httpd->close(server, client); + continue; + } + + client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL); if (!client_ctx) { av_log(NULL, AV_LOG_ERROR, "Could not allocate output format context.\n"); @@ -676,16 +699,52 @@ void *write_thread(void *arg) return NULL; } +void *fileserver_thread(void *arg) +{ + struct FileserverContext *fs = (struct FileserverContext*) arg; + int i, clients_served; + struct FileserverClient *c; + for (;;) { + usleep(500000); + av_log(NULL, AV_LOG_WARNING, "Checking clients, fileserver-thread %s: ", fs->server_name); + clients_served = 0; + for (i = 0; i < MAX_CLIENTS; i++) { + c = &fs->clients[i]; + pthread_mutex_lock(&c->client_lock); + if (c->buf) { + c->ffinfo->httpd->write(c->ffinfo->server, c->ffinfo->client, c->buf, c->size); + av_file_unmap(c->buf, c->size); + c->ffinfo->httpd->close(c->ffinfo->server, c->ffinfo->client); + c->buf = NULL; + c->size = 0; + av_freep(&c->ffinfo); + clients_served++; + } + pthread_mutex_unlock(&c->client_lock); + } + av_log(NULL, AV_LOG_WARNING, "%d/%d served\n", clients_served, MAX_CLIENTS); + if (fs->shutdown) { + printf("shutting down\n"); + break; + } + } + + return NULL; +} + void *run_server(void *arg) { struct AcceptInfo ainfo; struct ReadInfo *rinfos; struct WriteInfo **winfos_p; struct HTTPDConfig *config = (struct HTTPDConfig*) arg; struct PublisherContext **pubs; + struct FileserverContext *fs = NULL; AVFormatContext **ifmt_ctxs; int ret, i, stream_index; + int stream_formats[FMT_NB] = { 0 }; pthread_t *r_threads; pthread_t **w_threads_p; + pthread_t fs_thread = 0; pubs = av_mallocz_array(config->nb_streams, sizeof(struct PublisherContext*)); if (!pubs) { @@ -718,6 +777,7 @@ void *run_server(void *arg) { av_log_set_level(AV_LOG_INFO); ainfo.pubs = pubs; + ainfo.fs = fs; ainfo.ifmt_ctxs = ifmt_ctxs; ainfo.nb_pub = config->nb_streams; ainfo.httpd = &lavfhttpd; @@ -751,56 +811,75 @@ void *run_server(void *arg) { struct WriteInfo *winfos = NULL; pthread_t *w_threads = NULL; pthread_t r_thread; + int stream_formats[FMT_NB] = { 0 }; rinfo.input_uri = config->streams[stream_index].input_uri; rinfo.server_name = config->server_name; + for (i = 0; i < config->streams[stream_index].nb_formats; i++) + stream_formats[config->streams[stream_index].formats[i]] = 1; + if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) { av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n"); continue; } + ifmt_ctxs[stream_index] = ifmt_ctx; + if (stream_formats[FMT_MATROSKA]) + publisher_init(&pub, config->streams[stream_index].stream_name); - publisher_init(&pub, config->streams[stream_index].stream_name); pubs[stream_index] = pub; rinfo.ifmt_ctx = ifmt_ctx; rinfo.pub = pub; rinfos[stream_index] = rinfo; - - w_threads = av_mallocz_array(pub->nb_threads, sizeof(pthread_t)); - if (!w_threads) { - av_log(NULL, AV_LOG_ERROR, "Could not allocate write thread handles.\n"); - continue; - } - winfos = av_mallocz_array(pub->nb_threads, sizeof(struct WriteInfo)); - if (!winfos) { - av_log(NULL, AV_LOG_ERROR, "Could not allocate write infos.\n"); - continue; - } - w_threads_p[stream_index] = w_threads; - winfos_p[stream_index] = winfos; - - for (i = 0; i < pub->nb_threads; i++) { - winfos[i].pub = pub; - winfos[i].thread_id = i; - ret = pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]); + if (stream_formats[FMT_MATROSKA]) { + w_threads = av_mallocz_array(pub->nb_threads, sizeof(pthread_t)); + if (!w_threads) { + av_log(NULL, AV_LOG_ERROR, "Could not allocate write thread handles.\n"); + continue; + } + winfos = av_mallocz_array(pub->nb_threads, sizeof(struct WriteInfo)); + if (!winfos) { + av_log(NULL, AV_LOG_ERROR, "Could not allocate write infos.\n"); + continue; + } + w_threads_p[stream_index] = w_threads; + winfos_p[stream_index] = winfos; + + for (i = 0; i < pub->nb_threads; i++) { + winfos[i].pub = pub; + winfos[i].thread_id = i; + ret = pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]); + if (ret != 0) { + pub->shutdown = 1; + w_threads[i] = 0; + goto end; + } + } + w_threads_p[stream_index] = w_threads; + ret = pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]); if (ret != 0) { pub->shutdown = 1; + r_thread = 0; goto end; } + r_threads[stream_index] = r_thread; } - w_threads_p[stream_index] = w_threads; - ret = pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]); + + } + if (stream_formats[FMT_HLS] || stream_formats[FMT_DASH]) { + ret = pthread_create(&fs_thread, NULL, fileserver_thread, fs); if (ret != 0) { - pub->shutdown = 1; + fs->shutdown = 1; + fs_thread = 0; goto end; } - r_threads[stream_index] = r_thread; } + //pthread_create(&a_thread, NULL, accept_thread, &ainfo); accept_thread(&ainfo); @@ -823,6 +902,11 @@ end: publisher_free(pubs[stream_index]); } + if (fs_thread) { + fs->shutdown = 1; + pthread_join(fs_thread, NULL); + fileserver_free(fs); + } error_cleanup: av_free(rinfos); av_free(winfos_p); diff --git a/fileserver.c b/fileserver.c new file mode 100644 index 0000000..a3460e3 --- /dev/null +++ b/fileserver.c @@ -0,0 +1,97 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "fileserver.h" +#include "httpd.h" + +#include <pthread.h> + +#include <libavutil/log.h> +#include <libavutil/file.h> + +void fileserver_schedule(struct FileserverContext *fs, struct FFServerInfo *ffinfo, const char *filename) +{ + int i, ret; + char final_filename[1024]; + struct FileserverClient *fsc = NULL; + for (i = 0; i < MAX_CLIENTS; i++) { + pthread_mutex_lock(&fs->clients[i].client_lock); + if (!fs->clients[i].ffinfo) { + fsc = &fs->clients[i]; + break; + } + pthread_mutex_unlock(&fs->clients[i].client_lock); + } + // fsc still locked + if (fsc) { + fsc->ffinfo = ffinfo; + snprintf(final_filename, 1024, "%s/%s", fs->server_name, filename); + ret = av_file_map(final_filename, &fsc->buf, &fsc->size, 0, NULL); + if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, "Could not read file: %s: %s.\n", final_filename, av_err2str(ret)); + fsc->ffinfo->httpd->close(fsc->ffinfo->server, fsc->ffinfo->client); + av_freep(&fsc->ffinfo); + fsc->buf = NULL; + fsc->size = 0; + } + fsc->ofmt_ctx = ofmt_ctx; + pthread_mutex_unlock(&fsc->client_lock); + } else { + av_log(NULL, AV_LOG_WARNING, "Could not find free client slot.\n"); + ffinfo->httpd->close(ffinfo->server, ffinfo->client); + av_free(ffinfo); + } + av_log(NULL, AV_LOG_INFO, "Scheduled serving file: %s\n", final_filename); + return; +} + +void fileserver_init(struct FileserverContext **fs_p, const char *server_name) +{ + struct FileserverContext *fs = av_mallocz(sizeof(struct FileserverContext)); + int i; + *fs_p = NULL; + + if (!fs) { + av_log(NULL, AV_LOG_ERROR, "Could not allocate fileserver context.\n"); + return; + } + + for (i = 0; i < MAX_CLIENTS; i++) { + fs->clients[i].ffinfo = NULL; + fs->clients[i].buf = NULL; + fs->clients[i].size = 0; + pthread_mutex_init(&fs->clients[i].client_lock, NULL); + } + + fs->nb_threads = 1; + fs->server_name = server_name; + fs->shutdown = 0; + *fs_p = fs; + return; +} + +void fileserver_free(struct FileserverContext *fs) { + int i; + for (i = 0; i < MAX_CLIENTS; i++) { + pthread_mutex_lock(&fs->clients[i].client_lock); + av_free(fs->clients[i].ffinfo); + av_free(fs->clients[i].buf); + pthread_mutex_unlock(&fs->clients[i].client_lock); + } + av_free(fs); +} diff --git a/fileserver.h b/fileserver.h new file mode 100644 index 0000000..6e449c0 --- /dev/null +++ b/fileserver.h @@ -0,0 +1,63 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef FILESERVER_H +#define FILESERVER_H + +#define MAX_CLIENTS 16 + +#include "httpd.h" +#include <sys/types.h> + +struct FileserverClient { + struct FFServerInfo *ffinfo; + pthread_mutex_t client_lock; + unsigned char *buf; + size_t size; +}; + + +struct FileserverContext { + struct FileserverClient clients[MAX_CLIENTS]; + int nb_threads; + int shutdown; + const char *server_name; +}; + +/** + * Schedule file for sending to a client. It is added to a list of clients. + * @param fs pointer to the FileserverContext to schedule to + * @param ffinfo pointer to an FFServerInfo struct containing information for serving the file + * @param filename filename of the file to serve + */ +void fileserver_schedule(struct FileserverContext *fs, struct FFServerInfo *ffinfo, AVIOContext *ofmt_ctx, const char *filename); + +/** + * Initialize a Fileservercontext + * @param fs_p pointer to a pointer where the FileserverContext will be allocated + * @param server_name the name of the server + */ +void fileserver_init(struct FileserverContext **fs_p, const char *server_name); + +/** + * Free a Fileservercontext + */ +void fileserver_free(struct FileserverContext *fs); + + +#endif -- 2.18.0 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel