From: Sebastian Andrzej Siewior <sebast...@breakpoint.cc>

This is an incremental update against the outq patch. I can send an
all-in-one patch against the current master branch, I hope this makes
the review easier.

I wanted to address all issues raised in the previous mail. The "no
progress" state is hopefully properly addressed. I added `tmp_buf' which
is used if the output buffer is exhausted. Should the additional buffer
be used then it is considered as LZMA_DATA_ERROR. if the block decoder
returns with LZMA_OK. I think bad-1-lzma2-10.xz tried this.

Signed-off-by: Sebastian Andrzej Siewior <sebast...@breakpoint.cc>
---
 src/liblzma/common/stream_decoder_mt.c | 128 +++++++++++++++----------
 1 file changed, 78 insertions(+), 50 deletions(-)

diff --git a/src/liblzma/common/stream_decoder_mt.c 
b/src/liblzma/common/stream_decoder_mt.c
index 8e361c38f27a2..bbe22c461e52f 100644
--- a/src/liblzma/common/stream_decoder_mt.c
+++ b/src/liblzma/common/stream_decoder_mt.c
@@ -45,7 +45,6 @@ struct worker_thread {
        /// Bytes consumed of ->in (worker)
        size_t in_pos;
 
-       lzma_ret thread_error;
        worker_state state;
 
        /// Pointer to the main structure is needed when putting this
@@ -98,6 +97,7 @@ struct lzma_stream_coder {
 
        /// Current thread compressed data is written to
        struct worker_thread *thr_write;
+       lzma_ret thread_error;
 
        lzma_outq outq;
 
@@ -142,6 +142,23 @@ static void thr_do_partial_update(void *thr_ptr)
        mythread_mutex_unlock(&thr->mutex);
 }
 
+static void worker_set_error(struct worker_thread *thr, lzma_ret err_code)
+{
+       mythread_mutex_lock(&thr->mutex);
+       if (thr->state == THR_RUN)
+               thr->state = THR_IDLE;
+       mythread_mutex_unlock(&thr->mutex);
+
+       mythread_mutex_lock(&thr->coder->mutex);
+       if (thr->coder->thread_error == LZMA_OK)
+               thr->coder->thread_error = err_code;
+
+       thr->next = thr->coder->threads_free;
+       thr->coder->threads_free = thr;
+       mythread_cond_signal(&thr->coder->cond);
+       mythread_mutex_unlock(&thr->coder->mutex);
+}
+
 /// Use smaller chunks so cancellation attempts don't block for long
 #define CHUNK_SIZE     16384
 static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
@@ -149,6 +166,8 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
        struct worker_thread *thr = thr_ptr;
        size_t in_filled;
        size_t out_pos;
+       unsigned char tmp_buf;
+       size_t tmp_buf_pos = 0;
        lzma_ret ret;
 
 next_loop_lock:
@@ -176,12 +195,10 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
                goto next_loop_unlocked;
        } else if (thr->state != THR_RUN) {
                thr->state = THR_IDLE;
-               thr->thread_error = LZMA_PROG_ERROR;
                mythread_mutex_unlock(&thr->mutex);
 
-               mythread_mutex_lock(&thr->coder->mutex);
-               mythread_cond_signal(&thr->coder->cond);
-               mythread_mutex_unlock(&thr->coder->mutex);
+               worker_set_error(thr, LZMA_PROG_ERROR);
+
                goto next_loop_lock;
        }
 
@@ -199,60 +216,74 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
 
        out_pos = thr->secret_progress;
 
-       ret = thr->block_decoder.code(thr->block_decoder.coder,
-                                     thr->allocator,
-                                     thr->in, &thr->in_pos, in_filled,
-                                     thr->outbuf->buf, &out_pos, 
thr->outbuf->allocated,
-                                     LZMA_RUN);
-       if (ret == LZMA_OK || ret == LZMA_STREAM_END) {
+       // Check if it attempts to write more than written in the header.
+       if (out_pos == thr->outbuf->allocated) {
+
+               ret = thr->block_decoder.code(thr->block_decoder.coder,
+                                             thr->allocator,
+                                             thr->in, &thr->in_pos, in_filled,
+                                             &tmp_buf, &tmp_buf_pos, 1,
+                                             LZMA_RUN);
+       } else {
+               ret = thr->block_decoder.code(thr->block_decoder.coder,
+                                             thr->allocator,
+                                             thr->in, &thr->in_pos, in_filled,
+                                             thr->outbuf->buf, &out_pos, 
thr->outbuf->allocated,
+                                             LZMA_RUN);
+       }
+       if (ret == LZMA_OK) {
                bool partial_update;
 
                mythread_mutex_lock(&thr->mutex);
-               if (thr->in_pos == thr->in_block_size) {
-                       if (thr->state == THR_RUN)
-                               thr->state = THR_IDLE;
-               }
                partial_update = thr->partial_update;
                mythread_mutex_unlock(&thr->mutex);
 
-               if ((partial_update && (out_pos != thr->outbuf->pos)) ||
-                   out_pos == thr->outbuf->allocated) {
+               if (partial_update && (out_pos != thr->outbuf->pos)) {
 
                        mythread_mutex_lock(&thr->coder->mutex);
                        thr->outbuf->pos = out_pos;
 
-                       if (partial_update)
-                               mythread_cond_signal(&thr->coder->cond);
-
-                       if (out_pos == thr->outbuf->allocated) {
-
-                               thr->outbuf->unpadded_size = 
lzma_block_unpadded_size(&thr->block_options);
-                               thr->outbuf->uncompressed_size = 
thr->block_options.uncompressed_size;
-                               thr->outbuf->finished = true;
-                               thr->outbuf = NULL;
-
-                               thr->next = thr->coder->threads_free;
-                               thr->coder->threads_free = thr;
-
-                               mythread_mutex_unlock(&thr->coder->mutex);
-                               goto next_loop_lock;
-                       }
-
+                       mythread_cond_signal(&thr->coder->cond);
                        mythread_mutex_unlock(&thr->coder->mutex);
                }
 
                thr->secret_progress = out_pos;
+
+               /*
+                * If the input buffer has been fully consumed and we made no
+                * progress then something is wrong.
+                */
+               if (thr->in_pos == thr->in_block_size || tmp_buf_pos != 0) {
+                       tmp_buf_pos = 0;
+                       worker_set_error(thr, LZMA_DATA_ERROR);
+               }
+
                goto next_loop_lock;
-       } else {
+       } else if (ret == LZMA_STREAM_END) {
 
                mythread_mutex_lock(&thr->mutex);
-               thr->state = THR_IDLE;
-               thr->thread_error = ret;
+               if (thr->state == THR_RUN)
+                       thr->state = THR_IDLE;
                mythread_mutex_unlock(&thr->mutex);
 
                mythread_mutex_lock(&thr->coder->mutex);
+
+               thr->outbuf->pos = out_pos;
+               thr->outbuf->unpadded_size = 
lzma_block_unpadded_size(&thr->block_options);
+               thr->outbuf->uncompressed_size = 
thr->block_options.uncompressed_size;
+               thr->outbuf->finished = true;
+               thr->outbuf->pos = out_pos;
+               thr->outbuf = NULL;
+
+               thr->next = thr->coder->threads_free;
+               thr->coder->threads_free = thr;
+
                mythread_cond_signal(&thr->coder->cond);
                mythread_mutex_unlock(&thr->coder->mutex);
+               goto next_loop_lock;
+       } else {
+               worker_set_error(thr, ret);
+
                goto next_loop_lock;
        }
        return MYTHREAD_RET_VALUE;
@@ -309,18 +340,9 @@ static lzma_ret stream_decode_in(struct lzma_stream_coder 
*coder,
        struct worker_thread *thr = coder->thr_write;
        size_t old_filled;
        size_t cur_in_infilled;
-       lzma_ret ret;
 
        mythread_mutex_lock(&thr->mutex);
 
-       if (thr->state == THR_IDLE) {
-               ret = thr->thread_error;
-               if (ret != LZMA_OK) {
-                       mythread_mutex_unlock(&thr->mutex);
-                       return ret;
-               }
-       }
-
        old_filled = thr->in_filled;
        mythread_mutex_unlock(&thr->mutex);
        cur_in_infilled = old_filled;
@@ -360,7 +382,6 @@ static lzma_ret initialize_new_thread(struct 
lzma_stream_coder *coder,
                goto error_cond;
 
        thr->state = THR_IDLE;
-       thr->thread_error = LZMA_OK;
        thr->allocator = allocator;
        thr->coder = coder;
        thr->block_decoder = LZMA_NEXT_CODER_INIT;
@@ -481,6 +502,13 @@ static lzma_ret try_copy_decoded(struct lzma_stream_coder 
*coder,
                lzma_vli uncompressed_size;
 
                mythread_mutex_lock(&coder->mutex);
+
+               ret = coder->thread_error;
+               if (ret != LZMA_OK) {
+                       mythread_mutex_unlock(&coder->mutex);
+                       return ret;
+               }
+
                if (!lzma_outq_is_readable(&coder->outq)) {
                        mythread_mutex_unlock(&coder->mutex);
                        return LZMA_OK;
@@ -810,9 +838,8 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator 
*allocator,
                                       coder->memusage += coder->exp_block_size;
                               thr->in_size = thr->in_block_size;
                       }
-                      if (thr->outbuf == coder->outq.head)
-                              lzma_outq_enable_partial_output(&coder->outq,
-                                                              
thr_do_partial_update);
+                      lzma_outq_enable_partial_output(&coder->outq,
+                                                      thr_do_partial_update);
               }
 
               coder->sequence = SEQ_BLOCK;
@@ -1060,6 +1087,7 @@ stream_decoder_mt_init(lzma_next_coder *next, const 
lzma_allocator *allocator,
 
        coder->timeout = options->timeout;
        coder->sequence = SEQ_STREAM_HEADER;
+       coder->thread_error = LZMA_OK;
 
        coder->memlimit = my_max(1, options->memlimit);
        coder->memusage = LZMA_MEMUSAGE_BASE;
-- 
2.30.0


Reply via email to