From: Sebastian Andrzej Siewior <sebast...@breakpoint.cc> This is against the MT decoder to use the outq instead the self cooked one buffer/thread to illustrate what had to be done. It appears to work.
The numbers changed from xz -tv 100 % 10,2 GiB / 40,0 GiB = 0,255 958 MiB/s 0:42 xz -dv | openssl sha1 100 % 10,2 GiB / 40,0 GiB = 0,255 813 MiB/s 0:50 to xz -tv 100 % 10,2 GiB / 40,0 GiB = 0,255 1,1 GiB/s 0:36 xz -dv | openssl sha1 100 % 10,2 GiB / 40,0 GiB = 0,255 914 MiB/s 0:44 Signed-off-by: Sebastian Andrzej Siewior <sebast...@breakpoint.cc> --- src/liblzma/common/outqueue.h | 9 + src/liblzma/common/stream_decoder_mt.c | 277 +++++++++++++------------ 2 files changed, 148 insertions(+), 138 deletions(-) diff --git a/src/liblzma/common/outqueue.h b/src/liblzma/common/outqueue.h index 355e0ced2cfc3..de630ee855c6b 100644 --- a/src/liblzma/common/outqueue.h +++ b/src/liblzma/common/outqueue.h @@ -203,6 +203,15 @@ lzma_outq_has_buf(const lzma_outq *outq) return outq->bufs_in_use < outq->bufs_limit; } +/// \brief Test if there is at least one preallocated buffer free +/// +/// This returns true then a new buffer will be pre-allocated. +/// +static inline bool +lzma_outq_has_buf_prealloc(const lzma_outq *outq) +{ + return outq->bufs_in_use < outq->bufs_allocated; +} /// \brief Test if the queue is completely empty static inline bool diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c index 1bfd2279c176b..8e361c38f27a2 100644 --- a/src/liblzma/common/stream_decoder_mt.c +++ b/src/liblzma/common/stream_decoder_mt.c @@ -14,6 +14,7 @@ #include "block_decoder.h" #include "stream_decoder.h" #include "index.h" +#include "outqueue.h" #include <stdio.h> @@ -33,16 +34,6 @@ typedef enum { } worker_state; -struct out_buffer { - uint8_t *out; - /// Size of ->out - size_t out_block_size; - /// Bytes written to ->out (worker) - size_t out_pos; - /// Bytes consumed of ->out (coordinator) - size_t out_filled; -}; - struct worker_thread { uint8_t *in; /// Size of ->in @@ -63,7 +54,9 @@ struct worker_thread { /// The allocator is set by the main thread. const lzma_allocator *allocator; - struct out_buffer out; + lzma_outbuf *outbuf; + bool partial_update; + size_t secret_progress; lzma_next_coder block_decoder; lzma_block block_options; @@ -102,12 +95,12 @@ struct lzma_stream_coder { /// are created only when actually needed. struct worker_thread *threads_free; /// Current thread decompressed is read from - struct worker_thread *thr_read; - /// Last read thread, used for ->next assignment - struct worker_thread *thr_read_last; + /// Current thread compressed data is written to struct worker_thread *thr_write; + lzma_outq outq; + /// Memory usage limit uint64_t memlimit; /// Amount of memory actually needed (only an estimate) @@ -140,6 +133,15 @@ struct lzma_stream_coder { uint8_t buffer[LZMA_BLOCK_HEADER_SIZE_MAX]; }; +static void thr_do_partial_update(void *thr_ptr) +{ + struct worker_thread *thr = thr_ptr; + + mythread_mutex_lock(&thr->mutex); + thr->partial_update = true; + mythread_mutex_unlock(&thr->mutex); +} + /// Use smaller chunks so cancellation attempts don't block for long #define CHUNK_SIZE 16384 static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) @@ -148,7 +150,6 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) size_t in_filled; size_t out_pos; lzma_ret ret; - struct out_buffer *out; next_loop_lock: @@ -162,7 +163,7 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) mythread_mutex_unlock(&thr->mutex); lzma_free(thr->in, thr->allocator); - lzma_free(thr->out.out, thr->allocator); + lzma_next_end(&thr->block_decoder, thr->allocator); mythread_mutex_destroy(&thr->mutex); @@ -190,35 +191,57 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) mythread_cond_wait(&thr->cond, &thr->mutex); goto next_loop_unlocked; } - out = &thr->out; + mythread_mutex_unlock(&thr->mutex); if ((in_filled - thr->in_pos) > CHUNK_SIZE) in_filled = thr->in_pos + CHUNK_SIZE; - out_pos = out->out_pos; + out_pos = thr->secret_progress; + ret = thr->block_decoder.code(thr->block_decoder.coder, thr->allocator, thr->in, &thr->in_pos, in_filled, - out->out, &out_pos, out->out_block_size, + thr->outbuf->buf, &out_pos, thr->outbuf->allocated, LZMA_RUN); if (ret == LZMA_OK || ret == LZMA_STREAM_END) { + bool partial_update; + mythread_mutex_lock(&thr->mutex); if (thr->in_pos == thr->in_block_size) { - mythread_mutex_lock(&thr->mutex); if (thr->state == THR_RUN) thr->state = THR_IDLE; - mythread_mutex_unlock(&thr->mutex); } + partial_update = thr->partial_update; + mythread_mutex_unlock(&thr->mutex); + + if ((partial_update && (out_pos != thr->outbuf->pos)) || + out_pos == thr->outbuf->allocated) { - if (out_pos != out->out_pos) { mythread_mutex_lock(&thr->coder->mutex); - out->out_pos = out_pos; - if (thr->coder->thr_read == thr) { + thr->outbuf->pos = out_pos; + + if (partial_update) mythread_cond_signal(&thr->coder->cond); + + if (out_pos == thr->outbuf->allocated) { + + thr->outbuf->unpadded_size = lzma_block_unpadded_size(&thr->block_options); + thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; + thr->outbuf->finished = true; + thr->outbuf = NULL; + + thr->next = thr->coder->threads_free; + thr->coder->threads_free = thr; + + mythread_mutex_unlock(&thr->coder->mutex); + goto next_loop_lock; } + mythread_mutex_unlock(&thr->coder->mutex); } + + thr->secret_progress = out_pos; goto next_loop_lock; } else { @@ -273,6 +296,7 @@ static void stream_decoder_mt_end(void *coder_ptr, struct lzma_stream_coder *coder = coder_ptr; threads_end(coder, allocator); + lzma_outq_end(&coder->outq, allocator); lzma_index_hash_end(coder->index_hash, allocator); lzma_free(coder, allocator); } @@ -363,35 +387,43 @@ static lzma_ret initialize_new_thread(struct lzma_stream_coder *coder, static lzma_ret get_thread(struct lzma_stream_coder *coder, const lzma_allocator *allocator) { + if (!lzma_outq_has_buf(&coder->outq)) + return LZMA_OK; + + if (!lzma_outq_has_buf_prealloc(&coder->outq) && + coder->threads_initialized && + coder->exp_filter_size && coder->exp_block_size) { + size_t exp; + + // It is assumed that the archive consists of multiple + // blocks sharing the same filter and block settings. + // The in-block is extended over time to fit the current + // block. For accouting the wrost case is assumed, that + // is compressed size = uncompressed size. + exp = coder->exp_filter_size; + exp += coder->exp_block_size; + exp += coder->exp_block_size; + + exp += coder->memusage; + exp += coder->outq.memusage; + + if (exp > coder->memlimit) + return LZMA_OK; + } + // If there is a free structure on the stack, use it. + mythread_mutex_lock(&coder->mutex); if (coder->threads_free != NULL) { coder->thr_write = coder->threads_free; coder->threads_free = coder->threads_free->next; } + mythread_mutex_unlock(&coder->mutex); if (coder->thr_write == NULL) { // If there are no uninitialized structures left, return. if (coder->threads_initialized == coder->threads_max) return LZMA_OK; - if (coder->exp_filter_size && coder->exp_block_size) { - size_t exp; - - // It is assumed that the archive consists of multiple - // blocks sharing the same filter and block settings. - // The in-block is extended over time to fit the current - // block. For accouting the wrost case is assumed, that - // is compressed size = uncompressed size. - exp = coder->exp_filter_size; - exp += coder->exp_block_size; - exp += coder->exp_block_size; - - if (coder->memusage + exp > coder->memlimit) { - coder->threads_max = coder->threads_initialized; - return LZMA_OK; - } - } - // Initialize a new thread. return_if_error(initialize_new_thread(coder, allocator)); } @@ -402,9 +434,8 @@ static lzma_ret get_thread(struct lzma_stream_coder *coder, coder->thr_write->in_block_size = 0; coder->thr_write->in_filled = 0; coder->thr_write->in_pos = 0; - - coder->thr_write->out.out_pos = 0; - coder->thr_write->out.out_filled = 0; + coder->thr_write->partial_update = false; + coder->thr_write->secret_progress = 0; memset(&coder->thr_write->block_options, 0, sizeof(lzma_block)); coder->thr_write->state = THR_RUN; @@ -417,101 +448,65 @@ static lzma_ret alloc_out_buffer(struct lzma_stream_coder *coder, const lzma_allocator *allocator) { struct worker_thread *thr; - struct out_buffer *buf; size_t uncomp_size; + lzma_ret ret; thr = coder->thr_write; - buf = &thr->out; + uncomp_size = thr->block_options.uncompressed_size; - if (buf->out) { - if (buf->out_block_size == uncomp_size) - goto recycle_old; + ret = lzma_outq_prealloc_buf(&coder->outq, allocator, + uncomp_size); + if (ret != LZMA_OK) + return ret; - coder->memusage -= buf->out_block_size; - buf->out_block_size = 0; - lzma_free(buf->out, allocator); - } - buf->out = lzma_alloc(uncomp_size, allocator); - if (!buf->out) - return LZMA_MEM_ERROR; - coder->memusage += uncomp_size; - buf->out_block_size = uncomp_size; + /* coder->memusage += uncomp_size; */ if (coder->exp_block_size < uncomp_size) coder->exp_block_size = uncomp_size; -recycle_old: - buf->out_pos = 0; - buf->out_filled = 0; - - mythread_mutex_lock(&coder->mutex); - if (!coder->thr_read) { - coder->thr_read = thr; - coder->thr_read_last = thr; - } else { - coder->thr_read_last->next = thr; - coder->thr_read_last = thr; - } - mythread_mutex_unlock(&coder->mutex); + thr->outbuf = lzma_outq_get_buf(&coder->outq, thr); return LZMA_OK; } static lzma_ret try_copy_decoded(struct lzma_stream_coder *coder, - uint8_t *restrict out, size_t *restrict out_pos, - size_t out_size) + const lzma_allocator *allocator, + uint8_t *restrict out, + size_t *restrict out_pos, + size_t out_size) { - struct out_buffer *out_buff; - size_t out_buff_size; - - if (!coder->thr_read) - return LZMA_OK; - - out_buff = &coder->thr_read->out; + lzma_ret ret; do { - // block fully consumed - if (out_buff->out_filled == out_buff->out_block_size) { - struct worker_thread *thr_new; - struct worker_thread *thr_old; - lzma_ret ret; - - ret = lzma_index_hash_append(coder->index_hash, - lzma_block_unpadded_size(&coder->thr_read->block_options), - coder->thr_read->block_options.uncompressed_size); - if (ret != LZMA_OK) - return ret; - - mythread_mutex_lock(&coder->mutex); - thr_old = coder->thr_read; - thr_new = thr_old->next; - thr_old->next = NULL; - - if (coder->threads_free) - thr_old->next = coder->threads_free; - coder->threads_free = thr_old; - coder->thr_read = thr_new; + lzma_vli unpadded_size; + lzma_vli uncompressed_size; + mythread_mutex_lock(&coder->mutex); + if (!lzma_outq_is_readable(&coder->outq)) { mythread_mutex_unlock(&coder->mutex); - - if (!thr_new) - return LZMA_OK; - - out_buff = &thr_new->out; + return LZMA_OK; } - // whatever is done has been consumed - if (out_buff->out_pos == out_buff->out_filled) - return LZMA_OK; + ret = lzma_outq_read(&coder->outq, allocator, + out, out_pos, out_size, + &unpadded_size, + &uncompressed_size); + mythread_mutex_unlock(&coder->mutex); + + // block fully consumed + if (ret == LZMA_STREAM_END) { + + ret = lzma_index_hash_append(coder->index_hash, + unpadded_size, + uncompressed_size); + if (ret != LZMA_OK) + return ret; + lzma_outq_enable_partial_output(&coder->outq, + thr_do_partial_update); + } if (*out_pos == out_size) return LZMA_OK; - mythread_mutex_lock(&coder->mutex); - out_buff_size = out_buff->out_pos; - mythread_mutex_unlock(&coder->mutex); - - lzma_bufcpy(out_buff->out, &out_buff->out_filled, out_buff_size, - out, out_pos, out_size); } while (1); } @@ -630,7 +625,8 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, thr = coder->thr_write; if (!thr) { seq_blk_hdr_again: - ret = try_copy_decoded(coder, out, out_pos, out_size); + ret = try_copy_decoded(coder, allocator, out, out_pos, + out_size); if (ret != LZMA_OK) return ret; @@ -645,10 +641,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, return LZMA_OK; mythread_mutex_lock(&coder->mutex); - if (coder->thr_read->out.out_pos == - coder->thr_read->out.out_filled) { - ret = wait_cond_progress(coder); - } + if (!lzma_outq_is_readable(&coder->outq)) + ret = wait_cond_progress(coder); + mythread_mutex_unlock(&coder->mutex); if (ret != LZMA_OK) @@ -815,6 +810,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, coder->memusage += coder->exp_block_size; thr->in_size = thr->in_block_size; } + if (thr->outbuf == coder->outq.head) + lzma_outq_enable_partial_output(&coder->outq, + thr_do_partial_update); } coder->sequence = SEQ_BLOCK; @@ -844,7 +842,7 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, break; } - ret = try_copy_decoded(coder, out, out_pos, out_size); + ret = try_copy_decoded(coder, allocator, out, out_pos, out_size); if (ret != LZMA_OK) return ret; @@ -869,19 +867,20 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, // first flush all worker threads, so the accounting of decoded // blocks matches index's expectation. - while (coder->thr_read) { - ret = try_copy_decoded(coder, out, out_pos, out_size); + while (!lzma_outq_is_empty(&coder->outq)) { + ret = try_copy_decoded(coder, allocator, out, out_pos, + out_size); if (ret != LZMA_OK) return ret; if (*out_pos >= out_size) return LZMA_OK; - if (!coder->thr_read) + if (lzma_outq_is_empty(&coder->outq)) break; mythread_mutex_lock(&coder->mutex); - if (coder->thr_read->out.out_pos == coder->thr_read->out.out_filled) + if (!lzma_outq_is_readable(&coder->outq)) ret = wait_cond_progress(coder); mythread_mutex_unlock(&coder->mutex); @@ -889,14 +888,13 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, return ret; } - // Decode the Index and compare it to the hash calculated - // from the sizes of the Blocks (if any). + // Decode the Index and compare it to the hash calculated + // from the sizes of the Blocks (if any). ret = lzma_index_hash_decode(coder->index_hash, in, in_pos, in_size); - if (ret != LZMA_STREAM_END) - return ret; - - coder->sequence = SEQ_STREAM_FOOTER; + if (ret != LZMA_STREAM_END) + return ret; + coder->sequence = SEQ_STREAM_FOOTER; break; } @@ -994,11 +992,11 @@ static lzma_ret stream_decoder_mt_memconfig(void *coder_ptr, uint64_t *memusage, { struct lzma_stream_coder *coder = coder_ptr; - *memusage = coder->memusage; + *memusage = coder->memusage + coder->outq.memusage; *old_memlimit = coder->memlimit; if (new_memlimit != 0) { - if (new_memlimit < coder->memusage) + if (new_memlimit < coder->memusage + coder->outq.memusage) return LZMA_MEMLIMIT_ERROR; coder->memlimit = new_memlimit; @@ -1036,6 +1034,7 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, return LZMA_MEM_ERROR; memset(coder, 0xff, sizeof(struct lzma_stream_coder)); + memzero(&coder->outq, sizeof(coder->outq)); if (mythread_mutex_init(&coder->mutex)) goto err_out; @@ -1090,17 +1089,19 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, threads_end(coder, allocator); } + coder->threads_free = NULL; + coder->thr_write = NULL; + coder->threads = lzma_alloc(options->threads * sizeof(struct worker_thread), allocator); if (coder->threads == NULL) goto err_out; - coder->threads_free = NULL; - coder->thr_read = NULL; - coder->thr_read_last = NULL; - coder->thr_write = NULL; coder->threads_max = options->threads; + return_if_error(lzma_outq_init(&coder->outq, allocator, + options->threads)); + return stream_decoder_reset(coder, allocator); err_out: -- 2.30.0