From: Sebastian Andrzej Siewior <sebast...@breakpoint.cc> This is an incremental update against the outq patch. I can send an all-in-one patch against the current master branch, I hope this makes the review easier.
I wanted to address all issues raised in the previous mail. The "no progress" state is hopefully properly addressed. I added `tmp_buf' which is used if the output buffer is exhausted. Should the additional buffer be used then it is considered as LZMA_DATA_ERROR. if the block decoder returns with LZMA_OK. I think bad-1-lzma2-10.xz tried this. Signed-off-by: Sebastian Andrzej Siewior <sebast...@breakpoint.cc> --- src/liblzma/common/stream_decoder_mt.c | 128 +++++++++++++++---------- 1 file changed, 78 insertions(+), 50 deletions(-) diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c index 8e361c38f27a2..bbe22c461e52f 100644 --- a/src/liblzma/common/stream_decoder_mt.c +++ b/src/liblzma/common/stream_decoder_mt.c @@ -45,7 +45,6 @@ struct worker_thread { /// Bytes consumed of ->in (worker) size_t in_pos; - lzma_ret thread_error; worker_state state; /// Pointer to the main structure is needed when putting this @@ -98,6 +97,7 @@ struct lzma_stream_coder { /// Current thread compressed data is written to struct worker_thread *thr_write; + lzma_ret thread_error; lzma_outq outq; @@ -142,6 +142,23 @@ static void thr_do_partial_update(void *thr_ptr) mythread_mutex_unlock(&thr->mutex); } +static void worker_set_error(struct worker_thread *thr, lzma_ret err_code) +{ + mythread_mutex_lock(&thr->mutex); + if (thr->state == THR_RUN) + thr->state = THR_IDLE; + mythread_mutex_unlock(&thr->mutex); + + mythread_mutex_lock(&thr->coder->mutex); + if (thr->coder->thread_error == LZMA_OK) + thr->coder->thread_error = err_code; + + thr->next = thr->coder->threads_free; + thr->coder->threads_free = thr; + mythread_cond_signal(&thr->coder->cond); + mythread_mutex_unlock(&thr->coder->mutex); +} + /// Use smaller chunks so cancellation attempts don't block for long #define CHUNK_SIZE 16384 static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) @@ -149,6 +166,8 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) struct worker_thread *thr = thr_ptr; size_t in_filled; size_t out_pos; + unsigned char tmp_buf; + size_t tmp_buf_pos = 0; lzma_ret ret; next_loop_lock: @@ -176,12 +195,10 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) goto next_loop_unlocked; } else if (thr->state != THR_RUN) { thr->state = THR_IDLE; - thr->thread_error = LZMA_PROG_ERROR; mythread_mutex_unlock(&thr->mutex); - mythread_mutex_lock(&thr->coder->mutex); - mythread_cond_signal(&thr->coder->cond); - mythread_mutex_unlock(&thr->coder->mutex); + worker_set_error(thr, LZMA_PROG_ERROR); + goto next_loop_lock; } @@ -199,60 +216,74 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) out_pos = thr->secret_progress; - ret = thr->block_decoder.code(thr->block_decoder.coder, - thr->allocator, - thr->in, &thr->in_pos, in_filled, - thr->outbuf->buf, &out_pos, thr->outbuf->allocated, - LZMA_RUN); - if (ret == LZMA_OK || ret == LZMA_STREAM_END) { + // Check if it attempts to write more than written in the header. + if (out_pos == thr->outbuf->allocated) { + + ret = thr->block_decoder.code(thr->block_decoder.coder, + thr->allocator, + thr->in, &thr->in_pos, in_filled, + &tmp_buf, &tmp_buf_pos, 1, + LZMA_RUN); + } else { + ret = thr->block_decoder.code(thr->block_decoder.coder, + thr->allocator, + thr->in, &thr->in_pos, in_filled, + thr->outbuf->buf, &out_pos, thr->outbuf->allocated, + LZMA_RUN); + } + if (ret == LZMA_OK) { bool partial_update; mythread_mutex_lock(&thr->mutex); - if (thr->in_pos == thr->in_block_size) { - if (thr->state == THR_RUN) - thr->state = THR_IDLE; - } partial_update = thr->partial_update; mythread_mutex_unlock(&thr->mutex); - if ((partial_update && (out_pos != thr->outbuf->pos)) || - out_pos == thr->outbuf->allocated) { + if (partial_update && (out_pos != thr->outbuf->pos)) { mythread_mutex_lock(&thr->coder->mutex); thr->outbuf->pos = out_pos; - if (partial_update) - mythread_cond_signal(&thr->coder->cond); - - if (out_pos == thr->outbuf->allocated) { - - thr->outbuf->unpadded_size = lzma_block_unpadded_size(&thr->block_options); - thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; - thr->outbuf->finished = true; - thr->outbuf = NULL; - - thr->next = thr->coder->threads_free; - thr->coder->threads_free = thr; - - mythread_mutex_unlock(&thr->coder->mutex); - goto next_loop_lock; - } - + mythread_cond_signal(&thr->coder->cond); mythread_mutex_unlock(&thr->coder->mutex); } thr->secret_progress = out_pos; + + /* + * If the input buffer has been fully consumed and we made no + * progress then something is wrong. + */ + if (thr->in_pos == thr->in_block_size || tmp_buf_pos != 0) { + tmp_buf_pos = 0; + worker_set_error(thr, LZMA_DATA_ERROR); + } + goto next_loop_lock; - } else { + } else if (ret == LZMA_STREAM_END) { mythread_mutex_lock(&thr->mutex); - thr->state = THR_IDLE; - thr->thread_error = ret; + if (thr->state == THR_RUN) + thr->state = THR_IDLE; mythread_mutex_unlock(&thr->mutex); mythread_mutex_lock(&thr->coder->mutex); + + thr->outbuf->pos = out_pos; + thr->outbuf->unpadded_size = lzma_block_unpadded_size(&thr->block_options); + thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; + thr->outbuf->finished = true; + thr->outbuf->pos = out_pos; + thr->outbuf = NULL; + + thr->next = thr->coder->threads_free; + thr->coder->threads_free = thr; + mythread_cond_signal(&thr->coder->cond); mythread_mutex_unlock(&thr->coder->mutex); + goto next_loop_lock; + } else { + worker_set_error(thr, ret); + goto next_loop_lock; } return MYTHREAD_RET_VALUE; @@ -309,18 +340,9 @@ static lzma_ret stream_decode_in(struct lzma_stream_coder *coder, struct worker_thread *thr = coder->thr_write; size_t old_filled; size_t cur_in_infilled; - lzma_ret ret; mythread_mutex_lock(&thr->mutex); - if (thr->state == THR_IDLE) { - ret = thr->thread_error; - if (ret != LZMA_OK) { - mythread_mutex_unlock(&thr->mutex); - return ret; - } - } - old_filled = thr->in_filled; mythread_mutex_unlock(&thr->mutex); cur_in_infilled = old_filled; @@ -360,7 +382,6 @@ static lzma_ret initialize_new_thread(struct lzma_stream_coder *coder, goto error_cond; thr->state = THR_IDLE; - thr->thread_error = LZMA_OK; thr->allocator = allocator; thr->coder = coder; thr->block_decoder = LZMA_NEXT_CODER_INIT; @@ -481,6 +502,13 @@ static lzma_ret try_copy_decoded(struct lzma_stream_coder *coder, lzma_vli uncompressed_size; mythread_mutex_lock(&coder->mutex); + + ret = coder->thread_error; + if (ret != LZMA_OK) { + mythread_mutex_unlock(&coder->mutex); + return ret; + } + if (!lzma_outq_is_readable(&coder->outq)) { mythread_mutex_unlock(&coder->mutex); return LZMA_OK; @@ -810,9 +838,8 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, coder->memusage += coder->exp_block_size; thr->in_size = thr->in_block_size; } - if (thr->outbuf == coder->outq.head) - lzma_outq_enable_partial_output(&coder->outq, - thr_do_partial_update); + lzma_outq_enable_partial_output(&coder->outq, + thr_do_partial_update); } coder->sequence = SEQ_BLOCK; @@ -1060,6 +1087,7 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, coder->timeout = options->timeout; coder->sequence = SEQ_STREAM_HEADER; + coder->thread_error = LZMA_OK; coder->memlimit = my_max(1, options->memlimit); coder->memusage = LZMA_MEMUSAGE_BASE; -- 2.30.0