From: Sebastian Andrzej Siewior <sebast...@breakpoint.cc>

This is against the MT decoder to use the outq instead the self cooked
one buffer/thread to illustrate what had to be done.
It appears to work.

The numbers changed from
  xz -tv
   100 %         10,2 GiB / 40,0 GiB = 0,255   958 MiB/s       0:42
  xz -dv | openssl sha1
   100 %         10,2 GiB / 40,0 GiB = 0,255   813 MiB/s       0:50

to
  xz -tv
   100 %         10,2 GiB / 40,0 GiB = 0,255   1,1 GiB/s       0:36
  xz -dv | openssl sha1
   100 %         10,2 GiB / 40,0 GiB = 0,255   914 MiB/s       0:44

Signed-off-by: Sebastian Andrzej Siewior <sebast...@breakpoint.cc>
---
 src/liblzma/common/outqueue.h          |   9 +
 src/liblzma/common/stream_decoder_mt.c | 277 +++++++++++++------------
 2 files changed, 148 insertions(+), 138 deletions(-)

diff --git a/src/liblzma/common/outqueue.h b/src/liblzma/common/outqueue.h
index 355e0ced2cfc3..de630ee855c6b 100644
--- a/src/liblzma/common/outqueue.h
+++ b/src/liblzma/common/outqueue.h
@@ -203,6 +203,15 @@ lzma_outq_has_buf(const lzma_outq *outq)
        return outq->bufs_in_use < outq->bufs_limit;
 }
 
+/// \brief      Test if there is at least one preallocated buffer free
+///
+/// This returns true then a new buffer will be pre-allocated.
+///
+static inline bool
+lzma_outq_has_buf_prealloc(const lzma_outq *outq)
+{
+       return outq->bufs_in_use < outq->bufs_allocated;
+}
 
 /// \brief      Test if the queue is completely empty
 static inline bool
diff --git a/src/liblzma/common/stream_decoder_mt.c 
b/src/liblzma/common/stream_decoder_mt.c
index 1bfd2279c176b..8e361c38f27a2 100644
--- a/src/liblzma/common/stream_decoder_mt.c
+++ b/src/liblzma/common/stream_decoder_mt.c
@@ -14,6 +14,7 @@
 #include "block_decoder.h"
 #include "stream_decoder.h"
 #include "index.h"
+#include "outqueue.h"
 
 #include <stdio.h>
 
@@ -33,16 +34,6 @@ typedef enum {
 
 } worker_state;
 
-struct out_buffer {
-       uint8_t *out;
-       /// Size of ->out
-       size_t out_block_size;
-       /// Bytes written to ->out (worker)
-       size_t out_pos;
-       /// Bytes consumed of ->out (coordinator)
-       size_t out_filled;
-};
-
 struct worker_thread {
        uint8_t *in;
        /// Size of ->in
@@ -63,7 +54,9 @@ struct worker_thread {
        /// The allocator is set by the main thread.
        const lzma_allocator *allocator;
 
-       struct out_buffer out;
+       lzma_outbuf *outbuf;
+       bool partial_update;
+       size_t secret_progress;
 
        lzma_next_coder block_decoder;
        lzma_block block_options;
@@ -102,12 +95,12 @@ struct lzma_stream_coder {
        /// are created only when actually needed.
        struct worker_thread *threads_free;
        /// Current thread decompressed is read from
-       struct worker_thread *thr_read;
-       /// Last read thread, used for ->next assignment
-       struct worker_thread *thr_read_last;
+
        /// Current thread compressed data is written to
        struct worker_thread *thr_write;
 
+       lzma_outq outq;
+
        /// Memory usage limit
        uint64_t memlimit;
        /// Amount of memory actually needed (only an estimate)
@@ -140,6 +133,15 @@ struct lzma_stream_coder {
        uint8_t buffer[LZMA_BLOCK_HEADER_SIZE_MAX];
 };
 
+static void thr_do_partial_update(void *thr_ptr)
+{
+       struct worker_thread *thr = thr_ptr;
+
+       mythread_mutex_lock(&thr->mutex);
+       thr->partial_update = true;
+       mythread_mutex_unlock(&thr->mutex);
+}
+
 /// Use smaller chunks so cancellation attempts don't block for long
 #define CHUNK_SIZE     16384
 static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
@@ -148,7 +150,6 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
        size_t in_filled;
        size_t out_pos;
        lzma_ret ret;
-       struct out_buffer *out;
 
 next_loop_lock:
 
@@ -162,7 +163,7 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
                mythread_mutex_unlock(&thr->mutex);
 
                lzma_free(thr->in, thr->allocator);
-               lzma_free(thr->out.out, thr->allocator);
+
                lzma_next_end(&thr->block_decoder, thr->allocator);
 
                mythread_mutex_destroy(&thr->mutex);
@@ -190,35 +191,57 @@ static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
                mythread_cond_wait(&thr->cond, &thr->mutex);
                goto next_loop_unlocked;
        }
-       out = &thr->out;
+
        mythread_mutex_unlock(&thr->mutex);
 
        if ((in_filled - thr->in_pos) > CHUNK_SIZE)
                in_filled = thr->in_pos + CHUNK_SIZE;
 
-       out_pos = out->out_pos;
+       out_pos = thr->secret_progress;
+
        ret = thr->block_decoder.code(thr->block_decoder.coder,
                                      thr->allocator,
                                      thr->in, &thr->in_pos, in_filled,
-                                     out->out, &out_pos, out->out_block_size,
+                                     thr->outbuf->buf, &out_pos, 
thr->outbuf->allocated,
                                      LZMA_RUN);
        if (ret == LZMA_OK || ret == LZMA_STREAM_END) {
+               bool partial_update;
 
+               mythread_mutex_lock(&thr->mutex);
                if (thr->in_pos == thr->in_block_size) {
-                       mythread_mutex_lock(&thr->mutex);
                        if (thr->state == THR_RUN)
                                thr->state = THR_IDLE;
-                       mythread_mutex_unlock(&thr->mutex);
                }
+               partial_update = thr->partial_update;
+               mythread_mutex_unlock(&thr->mutex);
+
+               if ((partial_update && (out_pos != thr->outbuf->pos)) ||
+                   out_pos == thr->outbuf->allocated) {
 
-               if (out_pos != out->out_pos) {
                        mythread_mutex_lock(&thr->coder->mutex);
-                       out->out_pos = out_pos;
-                       if (thr->coder->thr_read == thr) {
+                       thr->outbuf->pos = out_pos;
+
+                       if (partial_update)
                                mythread_cond_signal(&thr->coder->cond);
+
+                       if (out_pos == thr->outbuf->allocated) {
+
+                               thr->outbuf->unpadded_size = 
lzma_block_unpadded_size(&thr->block_options);
+                               thr->outbuf->uncompressed_size = 
thr->block_options.uncompressed_size;
+                               thr->outbuf->finished = true;
+                               thr->outbuf = NULL;
+
+                               thr->next = thr->coder->threads_free;
+                               thr->coder->threads_free = thr;
+
+                               mythread_mutex_unlock(&thr->coder->mutex);
+                               goto next_loop_lock;
                        }
+
                        mythread_mutex_unlock(&thr->coder->mutex);
                }
+
+               thr->secret_progress = out_pos;
                goto next_loop_lock;
        } else {
 
@@ -273,6 +296,7 @@ static void stream_decoder_mt_end(void *coder_ptr,
        struct lzma_stream_coder *coder = coder_ptr;
 
        threads_end(coder, allocator);
+       lzma_outq_end(&coder->outq, allocator);
        lzma_index_hash_end(coder->index_hash, allocator);
        lzma_free(coder, allocator);
 }
@@ -363,35 +387,43 @@ static lzma_ret initialize_new_thread(struct 
lzma_stream_coder *coder,
 static lzma_ret get_thread(struct lzma_stream_coder *coder,
                           const lzma_allocator *allocator)
 {
+       if (!lzma_outq_has_buf(&coder->outq))
+               return LZMA_OK;
+
+       if (!lzma_outq_has_buf_prealloc(&coder->outq) &&
+           coder->threads_initialized &&
+           coder->exp_filter_size && coder->exp_block_size) {
+               size_t exp;
+
+               // It is assumed that the archive consists of multiple
+               // blocks sharing the same filter and block settings.
+               // The in-block is extended over time to fit the current
+               // block. For accouting the wrost case is assumed, that
+               // is compressed size = uncompressed size.
+               exp = coder->exp_filter_size;
+               exp += coder->exp_block_size;
+               exp += coder->exp_block_size;
+
+               exp += coder->memusage;
+               exp += coder->outq.memusage;
+
+               if (exp > coder->memlimit)
+                       return LZMA_OK;
+       }
+
        // If there is a free structure on the stack, use it.
+       mythread_mutex_lock(&coder->mutex);
        if (coder->threads_free != NULL) {
                coder->thr_write = coder->threads_free;
                coder->threads_free = coder->threads_free->next;
        }
+       mythread_mutex_unlock(&coder->mutex);
 
        if (coder->thr_write == NULL) {
                // If there are no uninitialized structures left, return.
                if (coder->threads_initialized == coder->threads_max)
                        return LZMA_OK;
 
-               if (coder->exp_filter_size && coder->exp_block_size) {
-                       size_t exp;
-
-                       // It is assumed that the archive consists of multiple
-                       // blocks sharing the same filter and block settings.
-                       // The in-block is extended over time to fit the current
-                       // block. For accouting the wrost case is assumed, that
-                       // is compressed size = uncompressed size.
-                       exp = coder->exp_filter_size;
-                       exp += coder->exp_block_size;
-                       exp += coder->exp_block_size;
-
-                       if (coder->memusage + exp > coder->memlimit) {
-                               coder->threads_max = coder->threads_initialized;
-                               return LZMA_OK;
-                       }
-               }
-
                // Initialize a new thread.
                return_if_error(initialize_new_thread(coder, allocator));
        }
@@ -402,9 +434,8 @@ static lzma_ret get_thread(struct lzma_stream_coder *coder,
        coder->thr_write->in_block_size = 0;
        coder->thr_write->in_filled = 0;
        coder->thr_write->in_pos = 0;
-
-       coder->thr_write->out.out_pos = 0;
-       coder->thr_write->out.out_filled = 0;
+       coder->thr_write->partial_update = false;
+       coder->thr_write->secret_progress = 0;
 
        memset(&coder->thr_write->block_options, 0, sizeof(lzma_block));
        coder->thr_write->state = THR_RUN;
@@ -417,101 +448,65 @@ static lzma_ret alloc_out_buffer(struct 
lzma_stream_coder *coder,
                                 const lzma_allocator *allocator)
 {
        struct worker_thread *thr;
-       struct out_buffer *buf;
        size_t uncomp_size;
+       lzma_ret ret;
 
        thr = coder->thr_write;
-       buf = &thr->out;
+
        uncomp_size = thr->block_options.uncompressed_size;
 
-       if (buf->out) {
-               if (buf->out_block_size == uncomp_size)
-                       goto recycle_old;
+       ret = lzma_outq_prealloc_buf(&coder->outq, allocator,
+                                    uncomp_size);
+       if (ret != LZMA_OK)
+               return ret;
 
-               coder->memusage -= buf->out_block_size;
-               buf->out_block_size = 0;
-               lzma_free(buf->out, allocator);
-       }
-       buf->out = lzma_alloc(uncomp_size, allocator);
-       if (!buf->out)
-               return LZMA_MEM_ERROR;
-       coder->memusage += uncomp_size;
-       buf->out_block_size = uncomp_size;
+       /* coder->memusage += uncomp_size; */
        if (coder->exp_block_size < uncomp_size)
                coder->exp_block_size = uncomp_size;
 
-recycle_old:
-       buf->out_pos = 0;
-       buf->out_filled = 0;
-
-       mythread_mutex_lock(&coder->mutex);
-       if (!coder->thr_read) {
-               coder->thr_read = thr;
-               coder->thr_read_last = thr;
-       } else {
-               coder->thr_read_last->next = thr;
-               coder->thr_read_last = thr;
-       }
-       mythread_mutex_unlock(&coder->mutex);
+       thr->outbuf = lzma_outq_get_buf(&coder->outq, thr);
        return LZMA_OK;
 }
 
 static lzma_ret try_copy_decoded(struct lzma_stream_coder *coder,
-                            uint8_t *restrict out, size_t *restrict out_pos,
-                            size_t out_size)
+                                const lzma_allocator *allocator,
+                                uint8_t *restrict out,
+                                size_t *restrict out_pos,
+                                size_t out_size)
 {
-       struct out_buffer *out_buff;
-       size_t out_buff_size;
-
-       if (!coder->thr_read)
-               return LZMA_OK;
-
-       out_buff = &coder->thr_read->out;
+       lzma_ret ret;
 
        do {
-               // block fully consumed
-               if (out_buff->out_filled == out_buff->out_block_size) {
-                       struct worker_thread *thr_new;
-                       struct worker_thread *thr_old;
-                       lzma_ret ret;
-
-                       ret = lzma_index_hash_append(coder->index_hash,
-                                                    
lzma_block_unpadded_size(&coder->thr_read->block_options),
-                                                    
coder->thr_read->block_options.uncompressed_size);
-                       if (ret != LZMA_OK)
-                               return ret;
-
-                       mythread_mutex_lock(&coder->mutex);
-                       thr_old = coder->thr_read;
-                       thr_new = thr_old->next;
-                       thr_old->next = NULL;
-
-                       if (coder->threads_free)
-                               thr_old->next = coder->threads_free;
-                       coder->threads_free = thr_old;
-                       coder->thr_read = thr_new;
+               lzma_vli unpadded_size;
+               lzma_vli uncompressed_size;
 
+               mythread_mutex_lock(&coder->mutex);
+               if (!lzma_outq_is_readable(&coder->outq)) {
                        mythread_mutex_unlock(&coder->mutex);
-
-                       if (!thr_new)
-                               return LZMA_OK;
-
-                       out_buff = &thr_new->out;
+                       return LZMA_OK;
                }
 
-               // whatever is done has been consumed
-               if (out_buff->out_pos == out_buff->out_filled)
-                       return LZMA_OK;
+               ret = lzma_outq_read(&coder->outq, allocator,
+                                    out, out_pos, out_size,
+                                    &unpadded_size,
+                                    &uncompressed_size);
+               mythread_mutex_unlock(&coder->mutex);
+
+               // block fully consumed
+               if (ret == LZMA_STREAM_END) {
+
+                       ret = lzma_index_hash_append(coder->index_hash,
+                                                    unpadded_size,
+                                                    uncompressed_size);
+                       if (ret != LZMA_OK)
+                               return ret;
+                       lzma_outq_enable_partial_output(&coder->outq,
+                                                       thr_do_partial_update);
+               }
 
                if (*out_pos == out_size)
                        return LZMA_OK;
 
-               mythread_mutex_lock(&coder->mutex);
-               out_buff_size = out_buff->out_pos;
-               mythread_mutex_unlock(&coder->mutex);
-
-               lzma_bufcpy(out_buff->out, &out_buff->out_filled, out_buff_size,
-                           out, out_pos, out_size);
        } while (1);
 }
 
@@ -630,7 +625,8 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator 
*allocator,
               thr = coder->thr_write;
               if (!thr) {
 seq_blk_hdr_again:
-                      ret = try_copy_decoded(coder, out, out_pos, out_size);
+                      ret = try_copy_decoded(coder, allocator, out, out_pos,
+                                             out_size);
                       if (ret != LZMA_OK)
                               return ret;
 
@@ -645,10 +641,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator 
*allocator,
                                       return LZMA_OK;
 
                               mythread_mutex_lock(&coder->mutex);
-                              if (coder->thr_read->out.out_pos ==
-                                  coder->thr_read->out.out_filled) {
-                                      ret = wait_cond_progress(coder);
-                              }
+                              if (!lzma_outq_is_readable(&coder->outq))
+                                  ret = wait_cond_progress(coder);
+
                               mythread_mutex_unlock(&coder->mutex);
 
                               if (ret != LZMA_OK)
@@ -815,6 +810,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator 
*allocator,
                                       coder->memusage += coder->exp_block_size;
                               thr->in_size = thr->in_block_size;
                       }
+                      if (thr->outbuf == coder->outq.head)
+                              lzma_outq_enable_partial_output(&coder->outq,
+                                                              
thr_do_partial_update);
               }
 
               coder->sequence = SEQ_BLOCK;
@@ -844,7 +842,7 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator 
*allocator,
                       break;
               }
 
-              ret = try_copy_decoded(coder, out, out_pos, out_size);
+              ret = try_copy_decoded(coder, allocator, out, out_pos, out_size);
               if (ret != LZMA_OK)
                       return ret;
 
@@ -869,19 +867,20 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator 
*allocator,
 
                // first flush all worker threads, so the accounting of decoded
                // blocks matches index's expectation.
-               while (coder->thr_read) {
-                       ret = try_copy_decoded(coder, out, out_pos, out_size);
+               while (!lzma_outq_is_empty(&coder->outq)) {
+                       ret = try_copy_decoded(coder, allocator, out, out_pos,
+                                              out_size);
                        if (ret != LZMA_OK)
                                return ret;
 
                        if (*out_pos >= out_size)
                                return LZMA_OK;
 
-                       if (!coder->thr_read)
+                       if (lzma_outq_is_empty(&coder->outq))
                                break;
 
                        mythread_mutex_lock(&coder->mutex);
-                       if (coder->thr_read->out.out_pos == 
coder->thr_read->out.out_filled)
+                       if (!lzma_outq_is_readable(&coder->outq))
                                ret = wait_cond_progress(coder);
                        mythread_mutex_unlock(&coder->mutex);
 
@@ -889,14 +888,13 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator 
*allocator,
                                return ret;
                }
 
-                // Decode the Index and compare it to the hash calculated
-                // from the sizes of the Blocks (if any).
+               // Decode the Index and compare it to the hash calculated
+               // from the sizes of the Blocks (if any).
                ret = lzma_index_hash_decode(coder->index_hash, in, in_pos,
                                             in_size);
-                if (ret != LZMA_STREAM_END)
-                        return ret;
-
-                coder->sequence = SEQ_STREAM_FOOTER;
+               if (ret != LZMA_STREAM_END)
+                       return ret;
+               coder->sequence = SEQ_STREAM_FOOTER;
                break;
        }
 
@@ -994,11 +992,11 @@ static lzma_ret stream_decoder_mt_memconfig(void 
*coder_ptr, uint64_t *memusage,
 {
        struct lzma_stream_coder *coder = coder_ptr;
 
-       *memusage = coder->memusage;
+       *memusage = coder->memusage + coder->outq.memusage;
        *old_memlimit = coder->memlimit;
 
        if (new_memlimit != 0) {
-               if (new_memlimit < coder->memusage)
+               if (new_memlimit < coder->memusage + coder->outq.memusage)
                        return LZMA_MEMLIMIT_ERROR;
 
                coder->memlimit = new_memlimit;
@@ -1036,6 +1034,7 @@ stream_decoder_mt_init(lzma_next_coder *next, const 
lzma_allocator *allocator,
                        return LZMA_MEM_ERROR;
 
                memset(coder, 0xff, sizeof(struct lzma_stream_coder));
+               memzero(&coder->outq, sizeof(coder->outq));
 
                if (mythread_mutex_init(&coder->mutex))
                        goto err_out;
@@ -1090,17 +1089,19 @@ stream_decoder_mt_init(lzma_next_coder *next, const 
lzma_allocator *allocator,
                threads_end(coder, allocator);
        }
 
+       coder->threads_free = NULL;
+       coder->thr_write = NULL;
+
        coder->threads = lzma_alloc(options->threads * sizeof(struct 
worker_thread),
                                    allocator);
        if (coder->threads == NULL)
                goto err_out;
-       coder->threads_free = NULL;
-       coder->thr_read = NULL;
-       coder->thr_read_last = NULL;
-       coder->thr_write = NULL;
 
        coder->threads_max = options->threads;
 
+       return_if_error(lzma_outq_init(&coder->outq, allocator,
+                                      options->threads));
+
        return stream_decoder_reset(coder, allocator);
 
 err_out:
-- 
2.30.0


Reply via email to