From: Sebastian Andrzej Siewior <sebast...@breakpoint.cc>

Changes since last post:
- Options are considered
- Memory limits are considered. The limit may get exceeded if we get
  close to it and then the existing threads enlarge their in-buffer.
- Blocks with no size information in the header can be decompressed.
  This happens synchronous.

Signed-off-by: Sebastian Andrzej Siewior <sebast...@breakpoint.cc>
---
 src/liblzma/api/lzma/container.h       |    5 +-
 src/liblzma/common/Makefile.inc        |    6 +
 src/liblzma/common/stream_decoder_mt.c | 1058 ++++++++++++++++++++++++
 src/liblzma/liblzma.map                |    1 +
 src/xz/coder.c                         |   15 +-
 5 files changed, 1080 insertions(+), 5 deletions(-)
 create mode 100644 src/liblzma/common/stream_decoder_mt.c

diff --git a/src/liblzma/api/lzma/container.h b/src/liblzma/api/lzma/container.h
index 9fbf4df06178e..de0a77b5d6482 100644
--- a/src/liblzma/api/lzma/container.h
+++ b/src/liblzma/api/lzma/container.h
@@ -173,7 +173,7 @@ typedef struct {
        uint32_t reserved_int2;
        uint32_t reserved_int3;
        uint32_t reserved_int4;
-       uint64_t reserved_int5;
+       uint64_t memlimit;
        uint64_t reserved_int6;
        uint64_t reserved_int7;
        uint64_t reserved_int8;
@@ -630,3 +630,6 @@ extern LZMA_API(lzma_ret) lzma_stream_buffer_decode(
                const uint8_t *in, size_t *in_pos, size_t in_size,
                uint8_t *out, size_t *out_pos, size_t out_size)
                lzma_nothrow lzma_attr_warn_unused_result;
+
+extern LZMA_API(lzma_ret)
+       lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options);
diff --git a/src/liblzma/common/Makefile.inc b/src/liblzma/common/Makefile.inc
index 0408f9a48c4db..3c140c2955475 100644
--- a/src/liblzma/common/Makefile.inc
+++ b/src/liblzma/common/Makefile.inc
@@ -78,4 +78,10 @@ liblzma_la_SOURCES += \
        common/stream_decoder.h \
        common/stream_flags_decoder.c \
        common/vli_decoder.c
+
+if COND_THREADS
+liblzma_la_SOURCES += \
+       common/stream_decoder_mt.c
+endif
+
 endif
diff --git a/src/liblzma/common/stream_decoder_mt.c 
b/src/liblzma/common/stream_decoder_mt.c
new file mode 100644
index 0000000000000..b2f1c9ebfa607
--- /dev/null
+++ b/src/liblzma/common/stream_decoder_mt.c
@@ -0,0 +1,1058 @@
+///////////////////////////////////////////////////////////////////////////////
+//
+/// \file       stream_decoder_mt.c
+/// \brief      Multithreaded .xz Stream decoder
+//
+//  Author:     Sebastian Andrzej Siewior
+//
+//  This file has been put into the public domain.
+//  You can do whatever you want with this file.
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include "common.h"
+#include "block_decoder.h"
+#include "outqueue.h"
+#include "stream_decoder.h"
+#include "index.h"
+
+#include <stdio.h>
+
+typedef enum {
+       /// Waiting for work.
+       THR_IDLE,
+
+       /// Decoding is in progress.
+       THR_RUN,
+
+       /// The main thread wants the thread to stop whatever it was doing
+       /// but not exit.
+       THR_STOP,
+
+       /// The main thread wants the thread to exit.
+       THR_EXIT,
+
+} worker_state;
+
+struct out_buffer {
+       uint8_t *out;
+       size_t out_block_size;  /* Size of ->out                        */
+       size_t out_pos;         /* Bytes written to ->out (worker)      */
+       size_t out_filled;      /* Bytes consumed of ->out (coordinator) */
+};
+
+struct worker_thread {
+       worker_state state;
+
+       uint8_t *in;
+       size_t in_size;         /* Size of ->in                         */
+       size_t in_block_size;   /* Size of current block                */
+       size_t in_filled;       /* Bytes written to ->in (coordinator)  */
+       size_t in_pos;          /* Bytes consumed of ->in (worker)      */
+
+       struct out_buffer out;
+
+       /// Pointer to the main structure is needed when putting this
+       /// thread back to the stack of free threads.
+       struct lzma_stream_coder *coder;
+
+       /* The allocator is set by the main thread. */
+       const lzma_allocator *allocator;
+       /* Filter size is used for memusage accounting */
+       size_t filter_size;
+
+       lzma_next_coder block_decoder;
+       lzma_block block_options;
+       struct worker_thread *next;
+
+       mythread_mutex mutex;
+       mythread_cond cond;
+       lzma_ret thread_error;
+
+       mythread thread_id;
+};
+
+struct lzma_stream_coder {
+       enum {
+               SEQ_STREAM_HEADER,
+               SEQ_BLOCK_HEADER,
+               SEQ_BLOCK,
+               SEQ_INDEX,
+               SEQ_STREAM_FOOTER,
+               SEQ_STREAM_PADDING,
+       } sequence;
+
+
+       /// Memory usage limit
+       uint64_t memlimit;
+       /// Amount of memory actually needed (only an estimate)
+       uint64_t memusage;
+       size_t exp_filter_size;
+       size_t exp_block_size;
+
+       lzma_index_hash *index_hash;
+
+       mythread_mutex mutex;
+       mythread_cond cond;
+
+       /// Array of allocated thread-specific structures
+       struct worker_thread *threads;
+
+       /// Number of structures in "threads" above. This is also the
+       /// number of threads that will be created at maximum.
+       uint32_t threads_max;
+
+       /// Number of thread structures that have been initialized, and
+       /// thus the number of worker threads actually created so far.
+       uint32_t threads_initialized;
+
+       /// Stack of free threads. When a thread finishes, it puts itself
+       /// back into this stack. This starts as empty because threads
+       /// are created only when actually needed.
+       struct worker_thread *threads_free;
+       /* Current thread decompressed is read from */
+       struct worker_thread *thr_read;
+       /* Last read thread, used for ->next assignment */
+       struct worker_thread *thr_read_last;
+       /* Current thread compressed data is written to */
+       struct worker_thread *thr_write;
+
+       lzma_stream_flags stream_flags;
+       bool tell_no_check;
+       bool tell_unsupported_check;
+       bool tell_any_check;
+       bool ignore_check;
+       bool concatenated;
+       bool first_stream;
+       /* True if block sizes are missing and threads are not used */
+       bool direct_decomp;
+
+       size_t pos;
+       uint8_t buffer[LZMA_BLOCK_HEADER_SIZE_MAX];
+};
+
+/* Use smaller chunks so cancalation attempts don't block for long */
+#define CHUNK_SIZE     16384
+static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
+{
+       struct worker_thread *thr = thr_ptr;
+       size_t in_filled;
+       size_t out_pos;
+       lzma_ret ret;
+       struct out_buffer *out;
+
+next_loop_lock:
+
+       mythread_mutex_lock(&thr->mutex);
+next_loop_unlocked:
+
+       if (thr->state == THR_IDLE) {
+               mythread_cond_wait(&thr->cond, &thr->mutex);
+               goto next_loop_unlocked;
+       } else if (thr->state == THR_EXIT) {
+               mythread_mutex_unlock(&thr->mutex);
+
+               lzma_free(thr->in, thr->allocator);
+               lzma_free(thr->out.out, thr->allocator);
+               lzma_next_end(&thr->block_decoder, thr->allocator);
+
+               mythread_mutex_destroy(&thr->mutex);
+               mythread_cond_destroy(&thr->cond);
+               return MYTHREAD_RET_VALUE;
+
+       } else if (thr->state == THR_STOP) {
+               thr->state = THR_IDLE;
+               mythread_cond_wait(&thr->cond, &thr->mutex);
+               goto next_loop_unlocked;
+       } else if (thr->state != THR_RUN) {
+               thr->state = THR_IDLE;
+               thr->thread_error = LZMA_PROG_ERROR;
+               mythread_mutex_unlock(&thr->mutex);
+
+               mythread_mutex_lock(&thr->coder->mutex);
+               mythread_cond_signal(&thr->coder->cond);
+               mythread_mutex_unlock(&thr->coder->mutex);
+               goto next_loop_lock;
+       }
+
+       in_filled = thr->in_filled;
+
+       if (in_filled == thr->in_pos) {
+               mythread_cond_wait(&thr->cond, &thr->mutex);
+               goto next_loop_unlocked;
+       }
+       out = &thr->out;
+       mythread_mutex_unlock(&thr->mutex);
+
+       if ((in_filled - thr->in_pos) > CHUNK_SIZE)
+               in_filled = thr->in_pos + CHUNK_SIZE;
+
+       out_pos = out->out_pos;
+       ret = thr->block_decoder.code(thr->block_decoder.coder,
+                                     thr->allocator,
+                                     thr->in, &thr->in_pos, in_filled,
+                                     out->out, &out_pos, out->out_block_size,
+                                     LZMA_RUN);
+       if (ret == LZMA_OK || ret == LZMA_STREAM_END) {
+
+               if (thr->in_pos == thr->in_block_size) {
+                       mythread_mutex_lock(&thr->mutex);
+                       thr->state = THR_IDLE;
+                       mythread_mutex_unlock(&thr->mutex);
+               }
+
+               if (out_pos != out->out_pos) {
+                       mythread_mutex_lock(&thr->coder->mutex);
+                       out->out_pos = out_pos;
+                       if (thr->coder->thr_read == thr) {
+                               mythread_cond_signal(&thr->coder->cond);
+                       }
+                       mythread_mutex_unlock(&thr->coder->mutex);
+               }
+               goto next_loop_lock;
+       } else {
+
+               mythread_mutex_lock(&thr->mutex);
+               thr->state = THR_IDLE;
+               thr->thread_error = ret;
+               mythread_mutex_unlock(&thr->mutex);
+
+               mythread_mutex_lock(&thr->coder->mutex);
+               mythread_cond_signal(&thr->coder->cond);
+               mythread_mutex_unlock(&thr->coder->mutex);
+               goto next_loop_lock;
+       }
+       return MYTHREAD_RET_VALUE;
+}
+
+static void threads_end(struct lzma_stream_coder *coder,
+                       const lzma_allocator *allocator)
+{
+       uint32_t i;
+
+       for (i = 0; i < coder->threads_initialized; ++i) {
+               mythread_mutex_lock(&coder->threads[i].mutex);
+               coder->threads[i].state = THR_EXIT;
+               mythread_cond_signal(&coder->threads[i].cond);
+               mythread_mutex_unlock(&coder->threads[i].mutex);
+       }
+
+       for (i = 0; i < coder->threads_initialized; ++i)
+               mythread_join(coder->threads[i].thread_id);
+
+       coder->threads_initialized = 0;
+       lzma_free(coder->threads, allocator);
+       return;
+}
+
+static void threads_stop(struct lzma_stream_coder *coder, bool 
wait_for_threads)
+{
+       uint32_t i;
+
+       for (i = 0; i < coder->threads_initialized; ++i) {
+               mythread_mutex_lock(&coder->threads[i].mutex);
+               coder->threads[i].state = THR_STOP;
+               mythread_cond_signal(&coder->threads[i].cond);
+               mythread_mutex_unlock(&coder->threads[i].mutex);
+       }
+
+       if (!wait_for_threads)
+               return;
+
+       for (i = 0; i < coder->threads_initialized; ++i) {
+               mythread_mutex_lock(&coder->threads[i].mutex);
+               while (coder->threads[i].state != THR_IDLE) {
+                       mythread_cond_wait(&coder->threads[i].cond,
+                                          &coder->threads[i].mutex);
+               }
+               mythread_mutex_unlock(&coder->threads[i].mutex);
+       }
+}
+
+static void stream_decoder_mt_end(void *coder_ptr,
+                                 const lzma_allocator *allocator)
+{
+       struct lzma_stream_coder *coder = coder_ptr;
+
+       threads_end(coder, allocator);
+       lzma_index_hash_end(coder->index_hash, allocator);
+       lzma_free(coder, allocator);
+}
+
+static lzma_ret stream_decode_in(struct lzma_stream_coder *coder,
+                                const uint8_t *restrict in,
+                                size_t *restrict in_pos,
+                                size_t in_size)
+{
+       struct worker_thread *thr = coder->thr_write;
+       size_t old_filled;
+       size_t cur_in_infilled;
+       lzma_ret ret = LZMA_OK;
+
+       mythread_mutex_lock(&thr->mutex);
+
+       if (thr->state == THR_IDLE) {
+               ret = thr->thread_error;
+               if (ret != LZMA_OK) {
+                       mythread_mutex_unlock(&thr->mutex);
+                       return ret;
+               }
+       }
+
+       old_filled = thr->in_filled;
+       mythread_mutex_unlock(&thr->mutex);
+       cur_in_infilled = old_filled;
+
+       lzma_bufcpy(in, in_pos, in_size,
+                   thr->in, &cur_in_infilled, thr->in_block_size);
+
+       mythread_mutex_lock(&thr->mutex);
+       thr->in_filled = cur_in_infilled;
+
+       if (old_filled == thr->in_pos)
+               mythread_cond_signal(&thr->cond);
+
+       mythread_mutex_unlock(&thr->mutex);
+
+       /* complete in buffer consumed and out-buffer written */
+       if (thr->in_filled == thr->in_block_size) {
+
+               coder->sequence = SEQ_BLOCK_HEADER;
+               coder->thr_write = NULL;
+               return LZMA_OK;
+       }
+
+       return ret;
+}
+
+/// Initialize a new worker_thread structure and create a new thread.
+static lzma_ret initialize_new_thread(struct lzma_stream_coder *coder,
+                                     const lzma_allocator *allocator)
+{
+       struct worker_thread *thr = &coder->threads[coder->threads_initialized];
+
+       memset(thr, 0, sizeof(struct worker_thread));
+
+       if (mythread_mutex_init(&thr->mutex))
+               goto error_mutex;
+
+       if (mythread_cond_init(&thr->cond))
+               goto error_cond;
+
+       thr->state = THR_IDLE;
+       thr->thread_error = LZMA_OK;
+       thr->allocator = allocator;
+       thr->coder = coder;
+       thr->block_decoder = LZMA_NEXT_CODER_INIT;
+
+       if (mythread_create(&thr->thread_id, worker_decoder, thr))
+               goto error_thread;
+
+       ++coder->threads_initialized;
+       coder->thr_write = thr;
+
+       return LZMA_OK;
+
+error_thread:
+       mythread_cond_destroy(&thr->cond);
+
+error_cond:
+       mythread_mutex_destroy(&thr->mutex);
+
+error_mutex:
+       return LZMA_MEM_ERROR;
+}
+
+
+static lzma_ret get_thread(struct lzma_stream_coder *coder,
+                          const lzma_allocator *allocator)
+{
+       // If there is a free structure on the stack, use it.
+       if (coder->threads_free != NULL) {
+               coder->thr_write = coder->threads_free;
+               coder->threads_free = coder->threads_free->next;
+       }
+
+       if (coder->thr_write == NULL) {
+               // If there are no uninitialized structures left, return.
+               if (coder->threads_initialized == coder->threads_max)
+                       return LZMA_OK;
+
+               if (coder->exp_filter_size && coder->exp_block_size) {
+                       size_t exp;
+
+                       /*
+                        * It is assumed that the archive consists of multiple
+                        * blocks sharing the same filter and block settings.
+                        * Therefore it is assumed the new thread will consume
+                        * the same amount of filter memory and block-size for
+                        * the out (decompressed) memory. For in (compressed)
+                        * buffer it is assumed to consume block-size/2. The
+                        * in-buffer will grow if needed so we may exceed the
+                        * actual limit.
+                        */
+                       exp = coder->exp_filter_size;
+                       exp += coder->exp_block_size;
+                       exp += coder->exp_block_size / 2;
+
+                       if (coder->memusage + exp > coder->memlimit) {
+                               coder->threads_max = coder->threads_initialized;
+                               return LZMA_OK;
+                       }
+               }
+
+               // Initialize a new thread.
+               return_if_error(initialize_new_thread(coder, allocator));
+       }
+
+       mythread_mutex_lock(&coder->thr_write->mutex);
+       coder->thr_write->next = NULL;
+
+       coder->thr_write->in_block_size = 0;
+       coder->thr_write->in_filled = 0;
+       coder->thr_write->in_pos = 0;
+
+       coder->thr_write->out.out_pos = 0;
+       coder->thr_write->out.out_filled = 0;
+
+       memset(&coder->thr_write->block_options, 0, sizeof(lzma_block));
+       coder->thr_write->state = THR_RUN;
+       mythread_mutex_unlock(&coder->thr_write->mutex);
+
+       return LZMA_OK;
+}
+
+static lzma_ret alloc_out_buffer(struct lzma_stream_coder *coder,
+                                const lzma_allocator *allocator)
+{
+       struct worker_thread *thr;
+       struct out_buffer *buf;
+       size_t uncomp_size;
+
+       thr = coder->thr_write;
+       buf = &thr->out;
+       uncomp_size = thr->block_options.uncompressed_size;
+
+       if (buf->out) {
+               if (buf->out_block_size == uncomp_size)
+                       goto recycle_old;
+
+               coder->memusage -= buf->out_block_size;
+               buf->out_block_size = 0;
+               lzma_free(buf->out, allocator);
+       }
+       buf->out = lzma_alloc(uncomp_size, allocator);
+       if (!buf->out)
+               return LZMA_MEM_ERROR;
+       coder->memusage += uncomp_size;
+       buf->out_block_size = uncomp_size;
+       if (coder->exp_block_size < uncomp_size)
+               coder->exp_block_size = uncomp_size;
+
+recycle_old:
+       buf->out_pos = 0;
+       buf->out_filled = 0;
+
+       mythread_mutex_lock(&coder->mutex);
+       if (!coder->thr_read) {
+               coder->thr_read = thr;
+               coder->thr_read_last = thr;
+       } else {
+               coder->thr_read_last->next = thr;
+               coder->thr_read_last = thr;
+       }
+       mythread_mutex_unlock(&coder->mutex);
+       return LZMA_OK;
+}
+
+static lzma_ret try_copy_decoded(struct lzma_stream_coder *coder,
+                            uint8_t *restrict out, size_t *restrict out_pos,
+                            size_t out_size)
+{
+       struct out_buffer *out_buff;
+       size_t out_buff_size;
+
+       if (!coder->thr_read)
+               return LZMA_OK;
+
+       out_buff = &coder->thr_read->out;
+
+       do {
+               /* block fully consumed */
+               if (out_buff->out_filled == out_buff->out_block_size) {
+                       struct worker_thread *thr_new;
+                       struct worker_thread *thr_old;
+                       lzma_ret ret;
+
+                       ret = lzma_index_hash_append(coder->index_hash,
+                                                    
lzma_block_unpadded_size(&coder->thr_read->block_options),
+                                                    
coder->thr_read->block_options.uncompressed_size);
+                       if (ret != LZMA_OK)
+                               return ret;
+
+                       mythread_mutex_lock(&coder->mutex);
+                       thr_old = coder->thr_read;
+                       thr_new = thr_old->next;
+                       thr_old->next = NULL;
+
+                       if (coder->threads_free)
+                               thr_old->next = coder->threads_free;
+                       coder->threads_free = thr_old;
+                       coder->thr_read = thr_new;
+
+                       mythread_mutex_unlock(&coder->mutex);
+
+                       if (!thr_new)
+                               return LZMA_OK;
+
+                       out_buff = &thr_new->out;
+               }
+
+               /* whatever is done has been consumed */
+               if (out_buff->out_pos == out_buff->out_filled)
+                       return LZMA_OK;
+
+               if (*out_pos == out_size)
+                       return LZMA_OK;
+
+               mythread_mutex_lock(&coder->mutex);
+               out_buff_size = out_buff->out_pos;
+               mythread_mutex_unlock(&coder->mutex);
+
+               lzma_bufcpy(out_buff->out, &out_buff->out_filled, out_buff_size,
+                           out, out_pos, out_size);
+       } while (1);
+}
+
+static size_t comp_blk_size(struct lzma_stream_coder *coder, size_t size)
+{
+       return vli_ceil4(size) + lzma_check_size(coder->stream_flags.check);
+}
+
+static lzma_ret
+stream_decoder_reset(struct lzma_stream_coder *coder, const lzma_allocator 
*allocator)
+{
+       // Initialize the Index hash used to verify the Index.
+       coder->index_hash = lzma_index_hash_init(coder->index_hash, allocator);
+       if (coder->index_hash == NULL)
+               return LZMA_MEM_ERROR;
+
+       // Reset the rest of the variables.
+       coder->sequence = SEQ_STREAM_HEADER;
+       coder->pos = 0;
+
+       return LZMA_OK;
+}
+
+static lzma_ret
+stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
+                const uint8_t *restrict in, size_t *restrict in_pos,
+                size_t in_size,
+                uint8_t *restrict out, size_t *restrict out_pos,
+                size_t out_size, lzma_action action)
+{
+       struct lzma_stream_coder *coder = coder_ptr;
+       struct worker_thread *thr;
+       lzma_ret ret;
+       size_t start_in_pos = *in_pos;
+       size_t start_out_pos = *out_pos;
+
+
+       while (true)
+       switch (coder->sequence) {
+       case SEQ_STREAM_HEADER: {
+               // Copy the Stream Header to the internal buffer.
+               lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
+                           LZMA_STREAM_HEADER_SIZE);
+
+               // Return if we didn't get the whole Stream Header yet.
+               if (coder->pos < LZMA_STREAM_HEADER_SIZE)
+                       return LZMA_OK;
+
+               coder->pos = 0;
+
+               // Decode the Stream Header.
+               ret = lzma_stream_header_decode(&coder->stream_flags,
+                                               coder->buffer);
+               if (ret != LZMA_OK)
+                       return ret == LZMA_FORMAT_ERROR && !coder->first_stream
+                               ? LZMA_DATA_ERROR : ret;
+
+               // If we are decoding concatenated Streams, and the later
+               // Streams have invalid Header Magic Bytes, we give
+               // LZMA_DATA_ERROR instead of LZMA_FORMAT_ERROR.
+               coder->first_stream = false;
+
+               // Even if we return LZMA_*_CHECK below, we want
+               // to continue from Block Header decoding.
+               coder->sequence = SEQ_BLOCK_HEADER;
+
+               // Detect if there's no integrity check or if it is
+               // unsupported if those were requested by the application.
+               if (coder->tell_no_check && coder->stream_flags.check
+                    == LZMA_CHECK_NONE)
+                       return LZMA_NO_CHECK;
+
+               if (coder->tell_unsupported_check
+                    && !lzma_check_is_supported(coder->stream_flags.check))
+                        return LZMA_UNSUPPORTED_CHECK;
+
+                if (coder->tell_any_check)
+                        return LZMA_GET_CHECK;
+                break;
+       }
+
+       case SEQ_BLOCK_HEADER: {
+              if (*in_pos >= in_size)
+                      return LZMA_OK;
+
+              thr = coder->thr_write;
+              if (!thr) {
+seq_blk_hdr_again:
+                      ret = try_copy_decoded(coder, out, out_pos, out_size);
+                      if (ret != LZMA_OK)
+                              return ret;
+
+                      ret = get_thread(coder, allocator);
+                      if (ret != LZMA_OK)
+                              return ret;
+
+                      if (!coder->thr_write) {
+
+                              /* No out buffer but making progress ? */
+                              if ((start_in_pos != *in_pos) ||
+                                   (start_out_pos != *out_pos))
+                                      return LZMA_OK;
+
+                              mythread_mutex_lock(&coder->mutex);
+                              if (coder->thr_read->out.out_pos == 
coder->thr_read->out.out_filled)
+                                      mythread_cond_wait(&coder->cond, 
&coder->mutex);
+
+                              mythread_mutex_unlock(&coder->mutex);
+                      }
+                      thr = coder->thr_write;
+                      if (!thr)
+                              goto seq_blk_hdr_again;
+              }
+
+              if (coder->pos == 0) {
+                      // Detect if it's Index.
+                      if (in[*in_pos] == 0x00) {
+                              coder->sequence = SEQ_INDEX;
+                              break;
+                      }
+
+                      // Calculate the size of the Block Header. Note that
+                      // Block Header decoder wants to see this byte too
+                      // so don't advance *in_pos.
+                      thr->block_options.header_size =
+                              lzma_block_header_size_decode(in[*in_pos]);
+              }
+
+              // Copy the Block Header to the internal buffer.
+
+              lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
+                          thr->block_options.header_size);
+
+              // Return if we didn't get the whole Block Header yet.
+              if (coder->pos < thr->block_options.header_size)
+                      return LZMA_OK;
+
+              coder->pos = 0;
+
+              // Version 1 is needed to support the .ignore_check option.
+              thr->block_options.version = 1;
+
+              // Set up a buffer to hold the filter chain. Block Header
+              // decoder will initialize all members of this array so
+              // we don't need to do it here.
+              lzma_filter filters[LZMA_FILTERS_MAX + 1];
+              thr->block_options.filters = filters;
+
+              // Copy the type of the Check so that Block Header and Block
+              // decoders see it.
+              thr->block_options.check = coder->stream_flags.check;
+
+              // Decode the Block Header.
+              ret = lzma_block_header_decode(&thr->block_options, allocator,
+                                             coder->buffer);
+              if (ret != LZMA_OK)
+                      return ret;
+
+              // If LZMA_IGNORE_CHECK was used, this flag needs to be set.
+              // It has to be set after lzma_block_header_decode() because
+              // it always resets this to false.
+              thr->block_options.ignore_check = coder->ignore_check;
+
+              if (thr->block_options.compressed_size == LZMA_VLI_UNKNOWN ||
+                  thr->block_options.uncompressed_size == LZMA_VLI_UNKNOWN) {
+
+                      /*
+                       * Happens if the previous (first) block header has sizes
+                       * encoded but one of the following block header does
+                       * not.
+                       */
+                      if (coder->threads_initialized != 1)
+                              return LZMA_PROG_ERROR;
+
+                      coder->direct_decomp = true;
+              } else {
+                      ret = alloc_out_buffer(coder, allocator);
+                      if (ret != LZMA_OK)
+                              return ret;
+
+                      thr->in_block_size = comp_blk_size(coder, 
thr->block_options.compressed_size);
+
+                      if (thr->in_size < thr->in_block_size) {
+                              coder->memusage -= thr->in_size;
+                              lzma_free(thr->in, allocator);
+
+                              thr->in = lzma_alloc(thr->in_block_size, 
allocator);
+                              if (!thr->in)
+                                      return LZMA_MEM_ERROR;
+                              thr->in_size = thr->in_block_size;
+                              coder->memusage += thr->in_size;
+                      }
+              }
+
+              // Check the memory usage limit.
+              const uint64_t memusage = lzma_raw_decoder_memusage(filters);
+
+              if (memusage == UINT64_MAX) {
+                      // One or more unknown Filter IDs.
+                      ret = LZMA_OPTIONS_ERROR;
+              } else {
+                      if (coder->exp_filter_size < memusage)
+                              coder->exp_filter_size = memusage;
+
+                      /*
+                       * The coder->memusage contains the size of in+out
+                       * buffer. Only for the first thread the check against
+                       * the filter size is made. Later it is attempted not to
+                       * create new threads if the memory limit is about to get
+                       * exceeded. Since the `in' buffer will be enlarged if
+                       * needed we may exceed the memory limit. Therefore there
+                       * is no further check for memusage to not abort work in
+                       * the middle.
+                       */
+                      if (coder->threads_initialized == 1 &&
+                          coder->memusage + memusage > coder->memlimit) {
+                              // The chain would need too much memory.
+                              ret = LZMA_MEMLIMIT_ERROR;
+                      } else {
+                              // Memory usage is OK.
+                              // Initialize the Block decoder.
+                              ret = lzma_block_decoder_init(
+                                                            
&thr->block_decoder,
+                                                            allocator,
+                                                            
&thr->block_options);
+                              if (thr->filter_size != memusage) {
+                                      coder->memusage -= thr->filter_size;
+                                      coder->memusage += memusage;
+                                      thr->filter_size = memusage;
+                              }
+                      }
+              }
+
+              // Free the allocated filter options since they are needed
+              // only to initialize the Block decoder.
+              for (size_t i = 0; i < LZMA_FILTERS_MAX; ++i)
+                      lzma_free(filters[i].options, allocator);
+              thr->block_options.filters = NULL;
+              // Check if memory usage calculation and Block enocoder
+              // initialization succeeded.
+              if (ret != LZMA_OK)
+                      return ret;
+
+              coder->sequence = SEQ_BLOCK;
+              break;
+
+       case SEQ_BLOCK:
+              thr = coder->thr_write;
+
+              /* Direct decompression if we lack sizes in block header */
+              if (coder->direct_decomp) {
+                      ret = thr->block_decoder.code(thr->block_decoder.coder,
+                                                    thr->allocator,
+                                                    in, in_pos, in_size,
+                                                    out, out_pos, out_size,
+                                                    action);
+                      if (ret != LZMA_STREAM_END)
+                              return ret;
+
+                      // Block decoded successfully. Add the new size pair to
+                      // the Index hash.
+                      ret = lzma_index_hash_append(coder->index_hash,
+                                                   
lzma_block_unpadded_size(&thr->block_options),
+                                                   
thr->block_options.uncompressed_size);
+                      if (ret != LZMA_OK)
+                              return ret;
+
+                      coder->sequence = SEQ_BLOCK_HEADER;
+                      break;
+              }
+
+              ret = try_copy_decoded(coder, out, out_pos, out_size);
+              if (ret != LZMA_OK)
+                      return ret;
+
+              ret = stream_decode_in(coder, in, in_pos, in_size);
+              if (ret != LZMA_OK) {
+                      threads_stop(coder, false);
+                      return ret;
+              }
+
+              if ((*in_pos >= in_size) || (*out_pos >= out_size))
+                      return LZMA_OK;
+
+              break;
+       }
+
+       case SEQ_INDEX: {
+                // If we don't have any input, don't call
+                // lzma_index_hash_decode() since it would return
+                // LZMA_BUF_ERROR, which we must not do here.
+                if (*in_pos >= in_size)
+                        return LZMA_OK;
+
+               /* first flush all worker threads, so the accounting of decoded
+                * blocks matches index's expectation.
+                */
+               while (coder->thr_read) {
+                       ret = try_copy_decoded(coder, out, out_pos, out_size);
+                       if (ret != LZMA_OK)
+                               return ret;
+
+                       if (*out_pos >= out_size)
+                               return LZMA_OK;
+
+                       if (!coder->thr_read)
+                               break;
+
+                       mythread_mutex_lock(&coder->mutex);
+                       if (coder->thr_read->out.out_pos == 
coder->thr_read->out.out_filled)
+                               mythread_cond_wait(&coder->cond, &coder->mutex);
+                       mythread_mutex_unlock(&coder->mutex);
+               }
+
+                // Decode the Index and compare it to the hash calculated
+                // from the sizes of the Blocks (if any).
+               ret = lzma_index_hash_decode(coder->index_hash, in, in_pos,
+                                            in_size);
+                if (ret != LZMA_STREAM_END) {
+                        return ret;
+               }
+
+                coder->sequence = SEQ_STREAM_FOOTER;
+               break;
+       }
+
+       case SEQ_STREAM_FOOTER:
+
+               lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
+                           LZMA_STREAM_HEADER_SIZE);
+
+               // Return if we didn't get the whole Stream Footer yet.
+               if (coder->pos < LZMA_STREAM_HEADER_SIZE)
+                       return LZMA_OK;
+
+               coder->pos = 0;
+               // Decode the Stream Footer. The decoder gives
+               // LZMA_FORMAT_ERROR if the magic bytes don't match,
+               // so convert that return code to LZMA_DATA_ERROR.
+               lzma_stream_flags footer_flags;
+               ret = lzma_stream_footer_decode(&footer_flags, coder->buffer);
+               if (ret != LZMA_OK)
+                       return ret == LZMA_FORMAT_ERROR
+                               ? LZMA_DATA_ERROR : ret;
+
+               // Check that Index Size stored in the Stream Footer matches
+               // the real size of the Index field.
+               if (lzma_index_hash_size(coder->index_hash)
+                   != footer_flags.backward_size)
+                       return LZMA_DATA_ERROR;
+
+               // Compare that the Stream Flags fields are identical in
+               // both Stream Header and Stream Footer.
+               ret = lzma_stream_flags_compare(&coder->stream_flags, 
&footer_flags);
+               if (ret != LZMA_OK)
+                       return ret;
+
+               if (!coder->concatenated)
+                       return LZMA_STREAM_END;
+               coder->sequence = SEQ_STREAM_PADDING;
+               break;
+
+       case SEQ_STREAM_PADDING: {
+
+               // Skip over possible Stream Padding.
+               while (true) {
+                       if (*in_pos >= in_size) {
+                               // Unless LZMA_FINISH was used, we cannot
+                               // know if there's more input coming later.
+                               if (action != LZMA_FINISH) {
+                                       return LZMA_OK;
+                               }
+
+                               // Stream Padding must be a multiple of
+                               // four bytes.
+                               return coder->pos == 0
+                                       ? LZMA_STREAM_END
+                                       : LZMA_DATA_ERROR;
+                       }
+
+                       // If the byte is not zero, it probably indicates
+                       // beginning of a new Stream (or the file is corrupt).
+                       if (in[*in_pos] != 0x00)
+                               break;
+
+                       ++*in_pos;
+                       coder->pos = (coder->pos + 1) & 3;
+               }
+
+               // Stream Padding must be a multiple of four bytes (empty
+               // Stream Padding is OK).
+               if (coder->pos != 0) {
+                       ++*in_pos;
+                       return LZMA_DATA_ERROR;
+               }
+
+               // Prepare to decode the next Stream.
+               return_if_error(stream_decoder_reset(coder, allocator));
+               break;
+       }
+
+       default:
+                assert(0);
+                return LZMA_PROG_ERROR;
+       }
+       return LZMA_PROG_ERROR;
+}
+
+static lzma_check stream_decoder_mt_get_check(const void *coder_ptr)
+{
+       const struct lzma_stream_coder *coder = coder_ptr;
+       return coder->stream_flags.check;
+}
+
+static lzma_ret stream_decoder_mt_memconfig(void *coder_ptr, uint64_t 
*memusage,
+                                           uint64_t *old_memlimit,
+                                           uint64_t new_memlimit)
+{
+       struct lzma_stream_coder *coder = coder_ptr;
+
+       *memusage = coder->memusage;
+       *old_memlimit = coder->memlimit;
+
+       if (new_memlimit != 0) {
+               if (new_memlimit < coder->memusage)
+                       return LZMA_MEMLIMIT_ERROR;
+
+               coder->memlimit = new_memlimit;
+       }
+
+       return LZMA_OK;
+}
+
+
+static lzma_ret
+stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
+                      const lzma_mt *options)
+{
+       struct lzma_stream_coder *coder;
+
+       if (options->threads == 0 || options->threads > LZMA_THREADS_MAX)
+               return LZMA_OPTIONS_ERROR;
+       if (options->flags & ~LZMA_SUPPORTED_FLAGS)
+               return LZMA_OPTIONS_ERROR;
+
+       lzma_next_coder_init(&stream_decoder_mt_init, next, allocator);
+
+       coder = next->coder;
+       if (!coder) {
+               coder = lzma_alloc(sizeof(struct lzma_stream_coder), allocator);
+               if (coder == NULL)
+                       return LZMA_MEM_ERROR;
+
+               memset(coder, 0xff, sizeof(struct lzma_stream_coder));
+
+               if (mythread_mutex_init(&coder->mutex))
+                       goto err_out;
+
+               if (mythread_cond_init(&coder->cond)) {
+                       mythread_mutex_destroy(&coder->mutex);
+                       goto err_out;
+               }
+
+               next->coder = coder;
+
+               next->code = stream_decode_mt;
+               next->end = stream_decoder_mt_end;
+               next->get_check = stream_decoder_mt_get_check;
+               next->memconfig = &stream_decoder_mt_memconfig;
+
+               next->get_progress = NULL;
+
+               coder->index_hash = NULL;
+               coder->threads_max = 0;
+               coder->threads_initialized = 0;
+       }
+
+       coder->sequence = SEQ_STREAM_HEADER;
+
+       coder->memlimit = my_max(1, options->memlimit);
+       coder->memusage = LZMA_MEMUSAGE_BASE;
+
+       coder->tell_no_check = options->flags & LZMA_TELL_NO_CHECK;
+       coder->tell_unsupported_check = options->flags & 
LZMA_TELL_UNSUPPORTED_CHECK;
+       coder->tell_any_check = options->flags & LZMA_TELL_ANY_CHECK;
+       coder->ignore_check = options->flags & LZMA_IGNORE_CHECK;
+       coder->concatenated = options->flags & LZMA_CONCATENATED;
+       coder->first_stream = true;
+       coder->direct_decomp = false;
+       coder->exp_filter_size = 0;
+       coder->exp_block_size = 0;
+       coder->pos = 0;
+
+       memset(&coder->stream_flags, 0, sizeof(lzma_stream_flags));
+       /* By allocating threads from scratch we can start memory-usage
+        * accouting from scratch, too. Changes in filter and block sizes may
+        * affect number of threads. We don't keep possible larger-than-needed
+        * in buffer (if the block size decreased) and have only one thread
+        * in case this stream has no block sizes (and `direct_decomp' expects
+        * no threads to keep it simple).
+        */
+       if (coder->threads_max) {
+               coder->threads_max = 0;
+               threads_end(coder, allocator);
+       }
+
+       coder->threads = lzma_alloc(options->threads * sizeof(struct 
worker_thread),
+                                   allocator);
+       if (coder->threads == NULL)
+               goto err_out;
+       coder->threads_free = NULL;
+       coder->thr_read = NULL;
+       coder->thr_read_last = NULL;
+       coder->thr_write = NULL;
+
+       coder->threads_max = options->threads;
+
+       return stream_decoder_reset(coder, allocator);
+
+err_out:
+       lzma_free(coder->threads, allocator);
+       lzma_free(coder, allocator);
+       return LZMA_MEM_ERROR;
+}
+
+extern LZMA_API(lzma_ret)
+lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options)
+{
+       lzma_next_strm_init(stream_decoder_mt_init, strm, options);
+
+       strm->internal->supported_actions[LZMA_RUN] = true;
+       strm->internal->supported_actions[LZMA_FINISH] = true;
+
+       return LZMA_OK;
+}
diff --git a/src/liblzma/liblzma.map b/src/liblzma/liblzma.map
index bad8633c3b8d2..3f34d8c2814f0 100644
--- a/src/liblzma/liblzma.map
+++ b/src/liblzma/liblzma.map
@@ -107,6 +107,7 @@ XZ_5.2 {
 XZ_5.3.1alpha {
 global:
        lzma_file_info_decoder;
+       lzma_stream_decoder_mt;
 
 local:
        *;
diff --git a/src/xz/coder.c b/src/xz/coder.c
index 85f954393d8bf..c22bf136285c5 100644
--- a/src/xz/coder.c
+++ b/src/xz/coder.c
@@ -51,7 +51,7 @@ static lzma_check check;
 /// This becomes false if the --check=CHECK option is used.
 static bool check_default = true;
 
-#if defined(HAVE_ENCODERS) && defined(MYTHREAD_ENABLED)
+#if (defined(HAVE_ENCODERS) || defined(HAVE_DECODERS)) && 
defined(MYTHREAD_ENABLED)
 static lzma_mt mt_options = {
        .flags = 0,
        .timeout = 300,
@@ -520,9 +520,16 @@ coder_init(file_pair *pair)
                        break;
 
                case FORMAT_XZ:
-                       ret = lzma_stream_decoder(&strm,
-                                       hardware_memlimit_get(
-                                               MODE_DECOMPRESS), flags);
+                       if (hardware_threads_get() > 1) {
+                               mt_options.threads = hardware_threads_get();
+                               mt_options.flags = flags;
+                               mt_options.memlimit = 
hardware_memlimit_get(MODE_DECOMPRESS);
+                               ret = lzma_stream_decoder_mt(&strm, 
&mt_options);
+                       } else {
+                               ret = lzma_stream_decoder(&strm,
+                                                 hardware_memlimit_get(
+                                                       MODE_DECOMPRESS), 
flags);
+                       }
                        break;
 
                case FORMAT_LZMA:
-- 
2.29.2


Reply via email to