On 08/15/2016 11:50 PM, Nicolas George wrote:

L'octidi 28 thermidor, an CCXXIV, sebechlebsky...@gmail.com a écrit :
[...]
+s@item recovery_wait_streamtime @var{bool}
    ^
Strange.

Sorry, that is obviously a typo. Strange thing is it had not produced any kind of warning/error.
[...]
+#define FIFO_DEFAULT_QUEUE_SIZE              60
+#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
+#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 second
Not very useful, but not harmful either.

Nit: plural for seconds.

+
+typedef struct FifoContext {
+    const AVClass *class;
+    AVFormatContext *avf;
+
+    char *format;
+    AVOutputFormat *oformat;
Does not seem to be needed in the context, could be a local variable.
You're probably right, I'll check it and make it local variable if so.
+    // Check for options unrecognized by underlying muxer
+    if (format_options) {
+        AVDictionaryEntry *entry = NULL;
+        while ((entry = av_dict_get(format_options, "", entry, 
AV_DICT_IGNORE_SUFFIX)))
+            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
+        ret = AVERROR(EINVAL);
+    }
This snippet seems to pop at quite a few places, it could become a helper
function. But do not consider it necessary for now.
I'll try to keep it in mind and add helper function later.
[...]
+        ctx->last_recovery_ts = pkt->pts;
+    } else {
+        ctx->last_recovery_ts = av_gettime_relative();
In my experience, this kind of construct is slightly simpler if you compute
the next timestamp immediately instead of keeping the last one. I mean,
instead of this:

        last_ts = now();
        ...
        sleep(last_ts + delay - now());

you can write this:

        next_ts = now() + delay;
        ...
        sleep(next_ts - now());
I'll give it a try and see if it is more cleaner in context of this code.
+    }
+
+    if (fifo->max_recovery_attempts &&
+        ctx->recovery_nr >= fifo->max_recovery_attempts) {
+        av_log(avf, AV_LOG_ERROR,
+               "Maximal number of %d recovery attempts reached.\n",
+               fifo->max_recovery_attempts);
+        ret = err_no;
+    } else {
+        ret = AVERROR(EAGAIN);
+    }
+
+    return ret;
+}
+
+static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage 
*msg, int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVPacket *pkt = &msg->pkt;
+    int64_t time_since_recovery;
+    int ret;
+
+    if (!is_recoverable(fifo, err_no)) {
+        ret = err_no;
+        goto fail;
+    }
+
+    if (ctx->header_written) {
+        fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
+        ctx->header_written = 0;
+    }
+
+    if (!ctx->recovery_nr) {
+        ctx->last_recovery_ts = 0;
AV_NOPTS_VALUE? And maybe an av_assert1() when you use it to be sure it was
actually set.
I'll take a look at it. Using zero will delay the initial recovery attempt if failure happens at
the pts = 0, so treating it specially seems like a good idea.
[...]
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
+        if (fifo->overflow_flag) {
+            av_thread_message_flush(queue);
+            if (fifo->restart_with_keyframe)
+                fifo_thread_ctx.drop_until_keyframe = 1;
+            fifo->overflow_flag = 0;
+            just_flushed = 1;
+        }
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
I wonder if this whole thing could not be made simpler. This is just a
suggestion, so do not feel obligated to act on it if you lack time.

This has the feel of an out-of-band message. This is a rather useful
feature. This could be added to the thread message queue rather easily.
I am not sure if I understand this. Do you mean thread queue function which would set certain flag that the queue should be flushed and flushed the queue at certain point in time (next receive call?)?
But in fact, I think it could be made even simpler. See below where
overflow_flag is set.

+
+        if (just_flushed)
+            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
+
+        ret = av_thread_message_queue_recv(queue, &msg, 0);
[...]
+        if (ret < 0) {
+            av_thread_message_queue_set_err_send(queue, ret);
+            break;
I do not think this is necessary: the only error possible here is the one
you sent yourself using av_thread_message_queue_set_err_recv().
You're right. I'll remove it.
+        }
+    }
+
+    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
+
+    return NULL;
+}
+
+static int fifo_mux_init(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2;
+    int ret = 0, i;
+
+    ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
+    if (ret < 0) {
+        return ret;
+    }
+
+    fifo->avf = avf2;
+
+    avf2->interrupt_callback = avf->interrupt_callback;
You have not documented the fact that the interrupt callback needs to be
thread-safe. And if users can select the fifo muxer, this becomes a problem.
I will submit another patch adding that to the API documentation. Thanks for reminding.
[...]
+    if (ret == AVERROR(EAGAIN)) {
+        uint8_t overflow_set = 0;
+
+        /* Queue is full, set fifo->overflow_flag to 1
+         * to let consumer thread know the queue should
+         * be flushed. */
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
+        if (!fifo->overflow_flag)
+            fifo->overflow_flag = overflow_set = 1;
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+        if (overflow_set)
+            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
+        ret = 0;
+        goto fail;
Can you explain why you do not just drop the packet here and now? If you do
that and move the discard until keyframe here, you can dispense with the
overflow flag and the lock. The consumer thread becomes much simpler and can
focus on the retries themselves. Plus, the whole process becomes slightly
more efficient since packets are discarded earlier.

Also, why do you flush the whole 60 packets when it blocks? If the output is
only lagging a little, discarding a single packet could be enough. And if
not, it will discard the next packets.
The current packet is discarded here. We discussed with Marton how to drop packets and agreed that flushing the whole queue should be OK - it is done within single critical section in av_thread_queue_flush(), flushing just single packet might not be enough in some situations. The reason why is it done in worker thread and not in this call is that flushing the queue can get quite costly -> dereferencing larger number of packets can lead to many free() calls which may be costly, the idea was to move potentially costly operation to worker thread. It may be a good thing to make this configurable (allow user to set how many messages to flush on overflow) and add function to thread queue which would flush certain number of messages in single critical section.
[...]
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, 
AV_OPT_FLAG_ENCODING_PARAM},
1024 seems a bit arbitrary.
I'll change that to INT_MAX.
[...]
+#define LIBAVFORMAT_VERSION_MINOR  47
  #define LIBAVFORMAT_VERSION_MICRO 101
Reset micro when bumping minor.
I'll fix that.

Thanks for review! I'll fix the issues and resend the patch soon.

Regards,
Jan
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
http://ffmpeg.org/mailman/listinfo/ffmpeg-devel

Reply via email to