From: Sebastian Andrzej Siewior <sebast...@breakpoint.cc>

This is WIP, the decoder appears to work based on:

|$ xz -dv < buster-pl.xz | openssl sha1
|  100 %         10,2 GiB / 40,0 GiB = 0,255   114 MiB/s       6:00
|(stdin)= 5eb4e2a3ce2253a6ec3fc86ee7ad8db0a5395959
|
|vs
|
|$ ./src/xz/.libs/xz -dv < buster-pl.xz | openssl sha1
|  100 %         10,2 GiB / 40,0 GiB = 0,255   815 MiB/s       0:50
|  (stdin)= 5eb4e2a3ce2253a6ec3fc86ee7ad8db0a5395959

Options of any kind are hardcoded (it is WIP after all).
For successful decoding the block header needs to contain the
compressed/ uncompressed size (the "cu" in the Flags column of xz -lvv).
Decoding of blocks without sizes is also on the todo list so the user
can use one interface.

Threads, which finished decoding, remain idle until their output buffer
has been fully consumed. The output buffer once allocated remains
allocated until the thread is cleaned up. This saved 5 secs in the
example above compared to freeing the buffer once the buffer was fully
consumed and allocating it again once there is new data.
The input buffer is freshly allocated for each block since they vary in
size in general.

Parts of the mt-decoder are copied from the other decoder. Not sure if
this is good or should be merged somehow with the single threaded
decoder.
I made my own output queue since the output size is known. I have no
idea if this is good or if it would be better to use lzma_outq instead.

Signed-off-by: Sebastian Andrzej Siewior <sebast...@breakpoint.cc>
---
 src/liblzma/Makefile.am                |   2 +-
 src/liblzma/api/lzma/container.h       |   5 +
 src/liblzma/common/Makefile.inc        |   6 +
 src/liblzma/common/stream_decoder_mt.c | 976 +++++++++++++++++++++++++
 src/liblzma/liblzma.map                |   3 +-
 src/xz/coder.c                         |   4 +
 6 files changed, 994 insertions(+), 2 deletions(-)
 create mode 100644 src/liblzma/common/stream_decoder_mt.c

diff --git a/src/liblzma/Makefile.am b/src/liblzma/Makefile.am
index 6323e26aade10..1f2445d94a33c 100644
--- a/src/liblzma/Makefile.am
+++ b/src/liblzma/Makefile.am
@@ -13,7 +13,7 @@ doc_DATA =
 
 lib_LTLIBRARIES = liblzma.la
 liblzma_la_SOURCES =
-liblzma_la_CPPFLAGS = \
+liblzma_la_CPPFLAGS = -Wno-unused-parameter \
        -I$(top_srcdir)/src/liblzma/api \
        -I$(top_srcdir)/src/liblzma/common \
        -I$(top_srcdir)/src/liblzma/check \
diff --git a/src/liblzma/api/lzma/container.h b/src/liblzma/api/lzma/container.h
index 9fbf4df06178e..bd8e410215bdd 100644
--- a/src/liblzma/api/lzma/container.h
+++ b/src/liblzma/api/lzma/container.h
@@ -630,3 +630,8 @@ extern LZMA_API(lzma_ret) lzma_stream_buffer_decode(
                const uint8_t *in, size_t *in_pos, size_t in_size,
                uint8_t *out, size_t *out_pos, size_t out_size)
                lzma_nothrow lzma_attr_warn_unused_result;
+
+extern LZMA_API(uint64_t)
+       lzma_stream_decoder_mt_memusage(const lzma_mt *options);
+extern LZMA_API(lzma_ret)
+       lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options);
diff --git a/src/liblzma/common/Makefile.inc b/src/liblzma/common/Makefile.inc
index 0408f9a48c4db..3c140c2955475 100644
--- a/src/liblzma/common/Makefile.inc
+++ b/src/liblzma/common/Makefile.inc
@@ -78,4 +78,10 @@ liblzma_la_SOURCES += \
        common/stream_decoder.h \
        common/stream_flags_decoder.c \
        common/vli_decoder.c
+
+if COND_THREADS
+liblzma_la_SOURCES += \
+       common/stream_decoder_mt.c
+endif
+
 endif
diff --git a/src/liblzma/common/stream_decoder_mt.c 
b/src/liblzma/common/stream_decoder_mt.c
new file mode 100644
index 0000000000000..0101ab4339d59
--- /dev/null
+++ b/src/liblzma/common/stream_decoder_mt.c
@@ -0,0 +1,976 @@
+///////////////////////////////////////////////////////////////////////////////
+//
+/// \file       stream_decoder_mt.c
+/// \brief      Multithreaded .xz Stream decoder
+//
+//  Author:     Sebastian Andrzej Siewior
+//
+//  This file has been put into the public domain.
+//  You can do whatever you want with this file.
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include "common.h"
+#include "block_decoder.h"
+#include "outqueue.h"
+#include "stream_decoder.h"
+#include "index.h"
+
+#include <stdio.h>
+
+typedef enum {
+       /// Waiting for work.
+       THR_IDLE,
+
+       /// Decoding is in progress.
+       THR_RUN,
+
+       /// The main thread wants the thread to stop whatever it was doing
+       /// but not exit.
+       THR_STOP,
+
+       /// The main thread wants the thread to exit. We could use
+       /// cancellation but since there's stopped anyway, this is lazier.
+       THR_EXIT,
+
+} worker_state;
+
+struct out_buffer {
+       uint8_t *out;
+       size_t out_block_size;  // Size of ->out
+       size_t out_pos;         // Bytes written to ->out (worker)
+       size_t out_filled;      // Bytes consumed of ->out (coordinator)
+};
+
+struct worker_thread {
+       worker_state state;
+
+       uint8_t *in;
+       size_t in_block_size;   // Size of ->in
+       size_t in_filled;       // Bytes written to ->in (coordinator)
+       size_t in_pos;          // Bytes consumed of ->in (worker)
+
+       struct out_buffer out;
+
+       /// Pointer to the main structure is needed when putting this
+       /// thread back to the stack of free threads.
+       struct lzma_stream_coder *coder;
+
+       /// The allocator is set by the main thread. Since a copy of the
+       /// pointer is kept here, the application must not change the
+       /// allocator before calling lzma_end().
+       const lzma_allocator *allocator;
+
+       /// Block encoder
+       lzma_next_coder block_decoder;
+
+       /// Compression options for this Block
+       lzma_block block_options;
+
+       /// Next structure in the stack of worker threads
+       struct worker_thread *next;
+
+       mythread_mutex mutex;
+       mythread_cond cond;
+       lzma_ret thread_error;
+
+       /// The ID of this thread is used to join the thread
+       /// when it's not needed anymore.
+       mythread thread_id;
+};
+
+struct lzma_stream_coder {
+       enum {
+               SEQ_STREAM_HEADER,
+               SEQ_BLOCK_HEADER,
+               SEQ_BLOCK,
+               SEQ_INDEX,
+               SEQ_STREAM_FOOTER,
+               SEQ_STREAM_PADDING,
+       } sequence;
+
+       /// Array of allocated thread-specific structures
+       struct worker_thread *threads;
+
+       /// Number of structures in "threads" above. This is also the
+       /// number of threads that will be created at maximum.
+       uint32_t threads_max;
+
+       /// Number of thread structures that have been initialized, and
+       /// thus the number of worker threads actually created so far.
+       uint32_t threads_initialized;
+
+       /// Stack of free threads. When a thread finishes, it puts itself
+       /// back into this stack. This starts as empty because threads
+       /// are created only when actually needed.
+       struct worker_thread *threads_free;
+
+       lzma_index_hash *index_hash;
+
+       mythread_mutex mutex;
+       mythread_cond cond;
+
+       struct worker_thread *thr_read;
+       struct worker_thread *thr_read_last;
+       struct worker_thread *thr_write;
+
+       lzma_stream_flags stream_flags;
+       bool tell_no_check;
+       bool tell_unsupported_check;
+       bool tell_any_check;
+       bool ignore_check;
+       bool concatenated;
+       bool first_stream;
+       size_t pos;
+       uint8_t buffer[LZMA_BLOCK_HEADER_SIZE_MAX];
+};
+
+extern LZMA_API(uint64_t)
+lzma_stream_decoder_mt_memusage(const lzma_mt *options)
+{
+       if (options)
+               return 0;
+       return 0;
+}
+
+#define CHUNK_SIZE     16384
+static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr)
+{
+       struct worker_thread *thr = thr_ptr;
+       size_t in_filled;
+       size_t out_pos;
+       lzma_ret ret;
+       struct out_buffer *out;
+
+next_loop_lock:
+
+       mythread_mutex_lock(&thr->mutex);
+next_loop_unlocked:
+
+       if (thr->state == THR_IDLE) {
+               mythread_cond_wait(&thr->cond, &thr->mutex);
+               goto next_loop_unlocked;
+       } else if (thr->state == THR_EXIT) {
+               mythread_mutex_unlock(&thr->mutex);
+
+               lzma_free(thr->in, thr->allocator);
+               lzma_free(thr->out.out, thr->allocator);
+               lzma_next_end(&thr->block_decoder, thr->allocator);
+
+               mythread_mutex_destroy(&thr->mutex);
+               mythread_cond_destroy(&thr->cond);
+               return MYTHREAD_RET_VALUE;
+
+       } else if (thr->state == THR_STOP) {
+               thr->state = THR_IDLE;
+               mythread_cond_wait(&thr->cond, &thr->mutex);
+               goto next_loop_unlocked;
+       } else if (thr->state != THR_RUN) {
+               mythread_mutex_unlock(&thr->mutex);
+               assert(0);
+       }
+
+       in_filled = thr->in_filled;
+
+       if (!thr->in || in_filled == thr->in_pos) {
+               mythread_cond_wait(&thr->cond, &thr->mutex);
+               goto next_loop_unlocked;
+       }
+       out = &thr->out;
+       mythread_mutex_unlock(&thr->mutex);
+
+       if ((in_filled - thr->in_pos) > CHUNK_SIZE)
+               in_filled = thr->in_pos + CHUNK_SIZE;
+
+       out_pos = out->out_pos;
+       ret = thr->block_decoder.code(thr->block_decoder.coder,
+                                     thr->allocator,
+                                     thr->in, &thr->in_pos, in_filled,
+                                     out->out, &out_pos, out->out_block_size,
+                                     LZMA_RUN);
+       if (ret == LZMA_OK || ret == LZMA_STREAM_END) {
+               void *free_ptr = NULL;
+
+               mythread_mutex_lock(&thr->mutex);
+               /* fully consumed */
+               if (thr->in && thr->in_pos == thr->in_block_size) {
+                       free_ptr = thr->in;
+
+                       thr->in = NULL;
+                       thr->in_pos = 0;
+                       thr->in_block_size = 0;
+               }
+               mythread_mutex_unlock(&thr->mutex);
+               if (free_ptr)
+                       lzma_free(free_ptr, thr->allocator);
+
+               mythread_mutex_lock(&thr->coder->mutex);
+
+               if (out_pos != out->out_pos) {
+                       out->out_pos = out_pos;
+                       if (thr->coder->thr_read == thr) {
+                               mythread_cond_signal(&thr->coder->cond);
+                       }
+               }
+               mythread_mutex_unlock(&thr->coder->mutex);
+               goto next_loop_lock;
+       } else {
+               /* le error */
+               mythread_mutex_lock(&thr->mutex);
+               thr->state = THR_IDLE;
+               thr->thread_error = ret;
+               mythread_mutex_unlock(&thr->mutex);
+
+               mythread_mutex_lock(&thr->coder->mutex);
+               mythread_cond_signal(&thr->coder->cond);
+               mythread_mutex_unlock(&thr->coder->mutex);
+               goto next_loop_lock;
+       }
+       return MYTHREAD_RET_VALUE;
+}
+
+static void threads_end(struct lzma_stream_coder *coder,
+                       const lzma_allocator *allocator)
+{
+       uint32_t i;
+       int ret;
+
+       for (i = 0; i < coder->threads_initialized; ++i) {
+               mythread_mutex_lock(&coder->threads[i].mutex);
+               coder->threads[i].state = THR_EXIT;
+               mythread_cond_signal(&coder->threads[i].cond);
+               mythread_mutex_unlock(&coder->threads[i].mutex);
+       }
+
+       for (i = 0; i < coder->threads_initialized; ++i) {
+               ret = mythread_join(coder->threads[i].thread_id);
+               assert(ret == 0);
+       }
+
+       lzma_free(coder->threads, allocator);
+       return;
+}
+
+static void threads_stop(struct lzma_stream_coder *coder, bool
+                        wait_for_threads)
+{
+       uint32_t i;
+
+       for (i = 0; i < coder->threads_initialized; ++i) {
+               mythread_mutex_lock(&coder->threads[i].mutex);
+               coder->threads[i].state = THR_STOP;
+               mythread_cond_signal(&coder->threads[i].cond);
+               mythread_mutex_unlock(&coder->threads[i].mutex);
+       }
+
+       if (!wait_for_threads)
+               return;
+
+       // Wait for the threads to settle in the idle state.
+       for (i = 0; i < coder->threads_initialized; ++i) {
+               mythread_mutex_lock(&coder->threads[i].mutex);
+               while (coder->threads[i].state != THR_IDLE) {
+                       mythread_cond_wait(&coder->threads[i].cond,
+                                          &coder->threads[i].mutex);
+               }
+               mythread_mutex_unlock(&coder->threads[i].mutex);
+       }
+}
+
+static void stream_decoder_mt_end(void *coder_ptr,
+                                 const lzma_allocator *allocator)
+{
+       struct lzma_stream_coder *coder = coder_ptr;
+
+       threads_end(coder, allocator);
+       lzma_index_hash_end(coder->index_hash, allocator);
+       lzma_free(coder, allocator);
+}
+
+static lzma_ret stream_decode_in(struct lzma_stream_coder *coder,
+                                const lzma_allocator *allocator,
+                                const uint8_t *restrict in,
+                                size_t *restrict in_pos,
+                                size_t in_size, lzma_action action)
+{
+       struct worker_thread *thr = coder->thr_write;
+       size_t old_filled;
+       lzma_ret ret = LZMA_OK;
+
+       mythread_mutex_lock(&thr->mutex);
+
+       if (thr->state == THR_IDLE) {
+               ret = thr->thread_error;
+               if (ret != LZMA_OK) {
+                       mythread_mutex_unlock(&thr->mutex);
+                       return ret;
+               }
+       }
+
+       old_filled = thr->in_filled;
+       lzma_bufcpy(in, in_pos, in_size, thr->in, &thr->in_filled,
+                   thr->in_block_size);
+
+       if (old_filled == thr->in_pos)
+               mythread_cond_signal(&thr->cond);
+
+       mythread_mutex_unlock(&thr->mutex);
+
+       /* complete in buffer consumed and out-buffer written */
+       if (thr->in_filled == thr->in_block_size) {
+
+               coder->sequence = SEQ_BLOCK_HEADER;
+               coder->thr_write = NULL;
+               return LZMA_OK;
+       }
+
+       return ret;
+}
+
+/// Initialize a new worker_thread structure and create a new thread.
+static lzma_ret initialize_new_thread(struct lzma_stream_coder *coder,
+                                     const lzma_allocator *allocator)
+{
+       struct worker_thread *thr = &coder->threads[coder->threads_initialized];
+
+       thr->in = NULL;
+       memset(thr, 0, sizeof(struct worker_thread));
+
+       if (mythread_mutex_init(&thr->mutex))
+               goto error_mutex;
+
+       if (mythread_cond_init(&thr->cond))
+               goto error_cond;
+
+       thr->state = THR_IDLE;
+       thr->thread_error = LZMA_OK;
+       thr->allocator = allocator;
+       thr->coder = coder;
+       thr->block_decoder = LZMA_NEXT_CODER_INIT;
+
+       if (mythread_create(&thr->thread_id, worker_decoder, thr))
+               goto error_thread;
+
+       ++coder->threads_initialized;
+       coder->thr_write = thr;
+
+       return LZMA_OK;
+
+error_thread:
+       mythread_cond_destroy(&thr->cond);
+
+error_cond:
+       mythread_mutex_destroy(&thr->mutex);
+
+error_mutex:
+       return LZMA_MEM_ERROR;
+}
+
+
+static lzma_ret get_thread(struct lzma_stream_coder *coder,
+                          const lzma_allocator *allocator)
+{
+       // If there is a free structure on the stack, use it.
+       mythread_mutex_lock(&coder->mutex);
+       if (coder->threads_free != NULL) {
+               coder->thr_write = coder->threads_free;
+               coder->threads_free = coder->threads_free->next;
+       }
+
+       mythread_mutex_unlock(&coder->mutex);
+
+       if (coder->thr_write == NULL) {
+               // If there are no uninitialized structures left, return.
+               if (coder->threads_initialized == coder->threads_max)
+                       return LZMA_OK;
+
+               // Initialize a new thread.
+               return_if_error(initialize_new_thread(coder, allocator));
+       }
+
+       mythread_mutex_lock(&coder->thr_write->mutex);
+       coder->thr_write->in_filled = 0;
+
+       memset(&coder->thr_write->block_options, 0, sizeof(lzma_block));
+       coder->thr_write->state = THR_RUN;
+       mythread_mutex_unlock(&coder->thr_write->mutex);
+
+       return LZMA_OK;
+}
+
+static lzma_ret alloc_out_buffer(struct lzma_stream_coder *coder,
+                                const lzma_allocator *allocator)
+{
+       struct worker_thread *thr;
+       struct out_buffer *buf;
+       size_t uncomp_size;
+
+       thr = coder->thr_write;
+       buf = &thr->out;
+       uncomp_size = thr->block_options.uncompressed_size;
+
+       if (buf->out) {
+               if (buf->out_block_size == uncomp_size)
+                       goto recycle_old;
+
+               buf->out_block_size = 0;
+               lzma_free(buf->out, allocator);
+       }
+       buf->out = lzma_alloc(uncomp_size, allocator);
+       if (!buf->out)
+               return LZMA_MEM_ERROR;
+       buf->out_block_size = uncomp_size;
+
+recycle_old:
+       buf->out_pos = 0;
+       buf->out_filled = 0;
+
+       mythread_mutex_lock(&coder->mutex);
+       if (!coder->thr_read) {
+               coder->thr_read = thr;
+               coder->thr_read_last = thr;
+       } else {
+               coder->thr_read_last->next = thr;
+               coder->thr_read_last = thr;
+       }
+       mythread_mutex_unlock(&coder->mutex);
+       return LZMA_OK;
+}
+
+static lzma_ret try_copy_decoded(struct lzma_stream_coder *coder,
+                            const lzma_allocator *allocator,
+                            uint8_t *restrict out,
+                            size_t *restrict out_pos, size_t out_size)
+{
+       struct out_buffer *out_buff;
+
+       if (!coder->thr_read)
+               return LZMA_OK;
+
+       out_buff = &coder->thr_read->out;
+
+       do {
+               /* block fully consumed */
+               if (out_buff->out_filled == out_buff->out_block_size) {
+                       struct worker_thread *thr_new;
+                       struct worker_thread *thr_old;
+                       lzma_ret ret;
+
+                       ret = lzma_index_hash_append(coder->index_hash,
+                                                    
lzma_block_unpadded_size(&coder->thr_read->block_options),
+                                                    
coder->thr_read->block_options.uncompressed_size);
+                       if (ret != LZMA_OK)
+                               return ret;
+
+                       mythread_mutex_lock(&coder->mutex);
+
+                       thr_old = coder->thr_read;
+                       thr_new = thr_old->next;
+                       thr_old->next = NULL;
+
+                       if (coder->threads_free)
+                               thr_old->next = coder->threads_free;
+                       coder->threads_free = thr_old;
+                       coder->thr_read = thr_new;
+
+                       mythread_mutex_unlock(&coder->mutex);
+
+                       if (!thr_new)
+                               return LZMA_OK;
+
+                       out_buff = &thr_new->out;
+               }
+
+               /* whatever is done has been consumed */
+               if (out_buff->out_pos == out_buff->out_filled)
+                       return LZMA_OK;
+
+               if (*out_pos == out_size)
+                       return LZMA_OK;
+
+               lzma_bufcpy(out_buff->out, &out_buff->out_filled, 
out_buff->out_pos,
+                           out, out_pos, out_size);
+       } while (1);
+}
+
+static void wait_for_idle_thread(struct lzma_stream_coder *coder,
+                                const lzma_allocator *allocator,
+                                uint8_t *restrict out,
+                                size_t *restrict out_pos,
+                                size_t out_size, bool wait)
+{
+       lzma_ret ret;
+
+       if (coder->thr_write) {
+               return;
+       }
+
+       ret = get_thread(coder, allocator);
+       if (ret != LZMA_OK) {
+               assert(0);
+       }
+
+       if (coder->thr_write) {
+               /* or with progress made */
+               return;
+       }
+
+       if (!wait)
+               return;
+
+       mythread_mutex_lock(&coder->mutex);
+       if (coder->thr_read->out.out_pos == coder->thr_read->out.out_filled) {
+
+               mythread_cond_wait(&coder->cond, &coder->mutex);
+       }
+       mythread_mutex_unlock(&coder->mutex);
+}
+
+static size_t comp_blk_size(struct lzma_stream_coder *coder, size_t size)
+{
+       return vli_ceil4(size) + lzma_check_size(coder->stream_flags.check);
+}
+
+static lzma_ret
+stream_decoder_reset(struct lzma_stream_coder *coder, const lzma_allocator 
*allocator)
+{
+       // Initialize the Index hash used to verify the Index.
+       coder->index_hash = lzma_index_hash_init(coder->index_hash, allocator);
+       if (coder->index_hash == NULL)
+               return LZMA_MEM_ERROR;
+
+       // Reset the rest of the variables.
+       coder->sequence = SEQ_STREAM_HEADER;
+       coder->pos = 0;
+
+       return LZMA_OK;
+}
+
+#define made_progress()        ((start_in_pos != *in_pos) || (start_out_pos != 
*out_pos))
+
+static lzma_ret
+stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
+                const uint8_t *restrict in, size_t *restrict in_pos, size_t 
in_size,
+                uint8_t *restrict out, size_t *restrict out_pos, size_t 
out_size, lzma_action action)
+{
+       struct lzma_stream_coder *coder = coder_ptr;
+       struct worker_thread *thr;
+       lzma_ret ret;
+       size_t start_in_pos = *in_pos;
+       size_t start_out_pos = *out_pos;
+
+
+       while (true)
+       switch (coder->sequence) {
+       case SEQ_STREAM_HEADER: {
+               // Copy the Stream Header to the internal buffer.
+               lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
+                           LZMA_STREAM_HEADER_SIZE);
+
+               // Return if we didn't get the whole Stream Header yet.
+               if (coder->pos < LZMA_STREAM_HEADER_SIZE)
+                       return LZMA_OK;
+
+               if (!coder->thr_write) {
+                       ret = try_copy_decoded(coder, allocator, out, out_pos, 
out_size);
+                       if (ret != LZMA_OK)
+                               return ret;
+                       wait_for_idle_thread(coder, allocator, out, out_pos,
+                                            out_size, true);
+                       if (!coder->thr_write) {
+                               return LZMA_OK;
+                       }
+               }
+
+               coder->pos = 0;
+
+               // Decode the Stream Header.
+               ret = lzma_stream_header_decode(&coder->stream_flags,
+                                               coder->buffer);
+               if (ret != LZMA_OK)
+                       return ret == LZMA_FORMAT_ERROR && !coder->first_stream
+                               ? LZMA_DATA_ERROR : ret;
+
+               // If we are decoding concatenated Streams, and the later
+               // Streams have invalid Header Magic Bytes, we give
+               // LZMA_DATA_ERROR instead of LZMA_FORMAT_ERROR.
+               coder->first_stream = false;
+
+               // Even if we return LZMA_*_CHECK below, we want
+               // to continue from Block Header decoding.
+               coder->sequence = SEQ_BLOCK_HEADER;
+
+               // Detect if there's no integrity check or if it is
+               // unsupported if those were requested by the application.
+               if (coder->tell_no_check && coder->stream_flags.check
+                    == LZMA_CHECK_NONE)
+                       return LZMA_NO_CHECK;
+
+               if (coder->tell_unsupported_check
+                    && !lzma_check_is_supported(coder->stream_flags.check))
+                        return LZMA_UNSUPPORTED_CHECK;
+
+                if (coder->tell_any_check)
+                        return LZMA_GET_CHECK;
+                               return LZMA_OK;
+                break;
+       }
+
+       case SEQ_BLOCK_HEADER: {
+              if (*in_pos >= in_size)
+                      return LZMA_OK;
+
+              thr = coder->thr_write;
+              if (!thr) {
+seq_blk_hdr_again:
+                      ret = try_copy_decoded(coder, allocator, out, out_pos, 
out_size);
+                      if (ret != LZMA_OK)
+                              return ret;
+
+                      wait_for_idle_thread(coder, allocator, out, out_pos, 
out_size, false);
+                      if (!coder->thr_write) {
+                              if (made_progress()) {
+                                      return LZMA_OK;
+                              } else {
+                                      wait_for_idle_thread(coder, allocator, 
out, out_pos, out_size, true);
+                              }
+                      }
+                      thr = coder->thr_write;
+                      if (!thr)
+                              goto seq_blk_hdr_again;
+              }
+
+              if (coder->pos == 0) {
+                      // Detect if it's Index.
+                      if (in[*in_pos] == 0x00) {
+                              coder->sequence = SEQ_INDEX;
+                              break;
+                      }
+
+                      // Calculate the size of the Block Header. Note that
+                      // Block Header decoder wants to see this byte too
+                      // so don't advance *in_pos.
+                      thr->block_options.header_size =
+                              lzma_block_header_size_decode(in[*in_pos]);
+              }
+
+              // Copy the Block Header to the internal buffer.
+              lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
+                          thr->block_options.header_size);
+
+              // Return if we didn't get the whole Block Header yet.
+              if (coder->pos < thr->block_options.header_size)
+                      return LZMA_OK;
+
+              coder->pos = 0;
+
+              // Version 1 is needed to support the .ignore_check option.
+              thr->block_options.version = 1;
+
+              // Set up a buffer to hold the filter chain. Block Header
+              // decoder will initialize all members of this array so
+              // we don't need to do it here.
+              lzma_filter filters[LZMA_FILTERS_MAX + 1];
+              thr->block_options.filters = filters;
+
+              // Copy the type of the Check so that Block Header and Block
+              // decoders see it.
+              thr->block_options.check = coder->stream_flags.check;
+
+              // Decode the Block Header.
+              return_if_error(lzma_block_header_decode(&thr->block_options,
+                                                       allocator, 
coder->buffer));
+
+              // If LZMA_IGNORE_CHECK was used, this flag needs to be set.
+              // It has to be set after lzma_block_header_decode() because
+              // it always resets this to false.
+              thr->block_options.ignore_check = coder->ignore_check;
+
+              if (thr->block_options.compressed_size == LZMA_VLI_UNKNOWN ||
+                  thr->block_options.uncompressed_size == LZMA_VLI_UNKNOWN) {
+                      fprintf(stderr, "Missing block sizes in blk hdr\n");
+                      return LZMA_PROG_ERROR;
+              }
+
+              ret = alloc_out_buffer(coder, allocator);
+              if (ret != LZMA_OK)
+                      return ret;
+
+              thr->in_block_size = comp_blk_size(coder, 
thr->block_options.compressed_size);
+
+              thr->in = lzma_alloc(thr->in_block_size, allocator);
+              if (!thr->in)
+                      return LZMA_MEM_ERROR;
+
+              // Check the memory usage limit.
+              const uint64_t memusage = lzma_raw_decoder_memusage(filters);
+
+              if (memusage == UINT64_MAX) {
+                      // One or more unknown Filter IDs.
+                      ret = LZMA_OPTIONS_ERROR;
+              } else {
+                      // Now we can set coder->memusage since we know that
+                      // the filter chain is valid. We don't want
+                      // lzma_memusage() to return UINT64_MAX in case of
+                      // invalid filter chain.
+
+                      /* coder->memusage = memusage; */
+
+                      if (0 /* memusage > coder->memlimit */) {
+                              // The chain would need too much memory.
+                              ret = LZMA_MEMLIMIT_ERROR;
+                      } else {
+                              // Memory usage is OK.
+                              // Initialize the Block decoder.
+                              ret = lzma_block_decoder_init(
+                                                            
&thr->block_decoder,
+                                                            allocator,
+                                                            
&thr->block_options);
+                      }
+              }
+
+              // Free the allocated filter options since they are needed
+              // only to initialize the Block decoder.
+              for (size_t i = 0; i < LZMA_FILTERS_MAX; ++i)
+                      lzma_free(filters[i].options, allocator);
+              thr->block_options.filters = NULL;
+              // Check if memory usage calculation and Block enocoder
+              // initialization succeeded.
+              if (ret != LZMA_OK)
+                      return ret;
+
+              coder->sequence = SEQ_BLOCK;
+              break;
+
+       case SEQ_BLOCK:
+              thr = coder->thr_write;
+
+              mythread_mutex_lock(&thr->mutex);
+              ret = thr->thread_error;
+              if (ret != LZMA_OK) {
+                      return ret;
+              }
+              mythread_mutex_unlock(&thr->mutex);
+
+              ret = try_copy_decoded(coder, allocator, out, out_pos, out_size);
+              if (ret != LZMA_OK)
+                      return ret;
+
+              ret = stream_decode_in(coder, allocator,
+                                     in, in_pos,
+                                     in_size, action);
+              if (ret != LZMA_OK) {
+                      threads_stop(coder, false);
+                      return ret;
+              }
+
+              if ((*in_pos >= in_size) || (*out_pos >= out_size))
+                      return LZMA_OK;
+
+              break;
+       }
+
+       case SEQ_INDEX: {
+                // If we don't have any input, don't call
+                // lzma_index_hash_decode() since it would return
+                // LZMA_BUF_ERROR, which we must not do here.
+                if (*in_pos >= in_size)
+                        return LZMA_OK;
+
+               /* first flush all worker threads, so the accounting of decoded
+                * blocks matches index's expectation.
+                */
+               while (coder->thr_read) {
+                       ret = try_copy_decoded(coder, allocator, out, out_pos, 
out_size);
+                       if (ret != LZMA_OK)
+                               return ret;
+
+                       if (*out_pos >= out_size)
+                               return LZMA_OK;
+
+                       mythread_mutex_lock(&coder->mutex);
+                       if (coder->thr_read->out.out_pos == 
coder->thr_read->out.out_filled)
+                               mythread_cond_wait(&coder->cond, &coder->mutex);
+                       mythread_mutex_unlock(&coder->mutex);
+               }
+
+                // Decode the Index and compare it to the hash calculated
+                // from the sizes of the Blocks (if any).
+               ret = lzma_index_hash_decode(coder->index_hash, in, in_pos,
+                                            in_size);
+                if (ret != LZMA_STREAM_END) {
+                        return ret;
+               }
+
+                coder->sequence = SEQ_STREAM_FOOTER;
+               break;
+       }
+
+       case SEQ_STREAM_FOOTER:
+
+               lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
+                           LZMA_STREAM_HEADER_SIZE);
+
+               // Return if we didn't get the whole Stream Footer yet.
+               if (coder->pos < LZMA_STREAM_HEADER_SIZE)
+                       return LZMA_OK;
+
+               coder->pos = 0;
+               // Decode the Stream Footer. The decoder gives
+               // LZMA_FORMAT_ERROR if the magic bytes don't match,
+               // so convert that return code to LZMA_DATA_ERROR.
+               lzma_stream_flags footer_flags;
+               ret = lzma_stream_footer_decode(&footer_flags, coder->buffer);
+               if (ret != LZMA_OK)
+                       return ret == LZMA_FORMAT_ERROR
+                               ? LZMA_DATA_ERROR : ret;
+
+               // Check that Index Size stored in the Stream Footer matches
+               // the real size of the Index field.
+               if (lzma_index_hash_size(coder->index_hash)
+                   != footer_flags.backward_size)
+                       return LZMA_DATA_ERROR;
+
+               // Compare that the Stream Flags fields are identical in
+               // both Stream Header and Stream Footer.
+               ret = lzma_stream_flags_compare(&coder->stream_flags, 
&footer_flags);
+               if (ret != LZMA_OK)
+                       return ret;
+
+               if (!coder->concatenated)
+                       return LZMA_STREAM_END;
+               coder->sequence = SEQ_STREAM_PADDING;
+               break;
+
+       case SEQ_STREAM_PADDING: {
+
+               // Skip over possible Stream Padding.
+               while (true) {
+                       if (*in_pos >= in_size) {
+                               // Unless LZMA_FINISH was used, we cannot
+                               // know if there's more input coming later.
+                               if (action != LZMA_FINISH) {
+                                       return LZMA_OK;
+                               }
+
+                               // Stream Padding must be a multiple of
+                               // four bytes.
+                               return coder->pos == 0
+                                       ? LZMA_STREAM_END
+                                       : LZMA_DATA_ERROR;
+                       }
+
+                       // If the byte is not zero, it probably indicates
+                       // beginning of a new Stream (or the file is corrupt).
+                       if (in[*in_pos] != 0x00)
+                               break;
+
+                       ++*in_pos;
+                       coder->pos = (coder->pos + 1) & 3;
+               }
+
+               // Stream Padding must be a multiple of four bytes (empty
+               // Stream Padding is OK).
+               if (coder->pos != 0) {
+                       ++*in_pos;
+                       return LZMA_DATA_ERROR;
+               }
+
+               // Prepare to decode the next Stream.
+               return_if_error(stream_decoder_reset(coder, allocator));
+               break;
+       }
+
+       default:
+                assert(0);
+                return LZMA_PROG_ERROR;
+       }
+       return LZMA_PROG_ERROR;
+}
+
+static lzma_mt mt_options = {
+       .threads = 16,
+};
+
+static lzma_ret
+stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
+                      const lzma_mt *options)
+{
+       struct lzma_stream_coder *coder;
+       unsigned int flags = 0;
+
+       options = &mt_options;
+
+       lzma_next_coder_init(&stream_decoder_mt_init, next, allocator);
+
+       coder = next->coder;
+       if (!coder) {
+               coder = lzma_alloc_zero(sizeof(struct lzma_stream_coder), 
allocator);
+               if (coder == NULL)
+                       return LZMA_MEM_ERROR;
+
+               if (mythread_mutex_init(&coder->mutex))
+                       goto err_out;
+
+               if (mythread_cond_init(&coder->cond)) {
+                       mythread_mutex_destroy(&coder->mutex);
+                       goto err_out;
+               }
+
+               next->coder = coder;
+
+               next->code = stream_decode_mt;
+               next->end = stream_decoder_mt_end;
+               next->get_progress = NULL;
+       }
+
+       coder->sequence = SEQ_STREAM_HEADER;
+       coder->index_hash = NULL;
+#if 0
+       coder->memlimit = my_max(1, memlimit);
+       coder->memusage = LZMA_MEMUSAGE_BASE;
+#endif
+       coder->tell_no_check = (flags & LZMA_TELL_NO_CHECK) != 0;
+       coder->tell_unsupported_check
+               = (flags & LZMA_TELL_UNSUPPORTED_CHECK) != 0;
+       coder->tell_any_check = (flags & LZMA_TELL_ANY_CHECK) != 0;
+       coder->ignore_check = (flags & LZMA_IGNORE_CHECK) != 0;
+       coder->concatenated = (flags & LZMA_CONCATENATED) != 0;
+       coder->first_stream = true;
+
+       if (coder->threads_max != options->threads) {
+               threads_end(coder, allocator);
+
+               coder->threads = lzma_alloc(options->threads * sizeof(struct 
worker_thread),
+                                           allocator);
+               if (coder->threads == NULL)
+                       goto err_out;
+               coder->thr_read = NULL;
+               coder->thr_read_last = NULL;
+               coder->thr_write = NULL;
+
+               coder->threads_max = options->threads;
+       } else {
+               // Reuse the old structures and threads. Tell the running
+               // threads to stop and wait until they have stopped.
+               threads_stop(coder, true);
+       }
+
+       return stream_decoder_reset(coder, allocator);
+
+err_out:
+       lzma_free(coder->threads, allocator);
+       lzma_free(coder, allocator);
+       return LZMA_MEM_ERROR;
+}
+
+extern LZMA_API(lzma_ret)
+lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options)
+{
+       lzma_next_strm_init(stream_decoder_mt_init, strm, options);
+
+       strm->internal->supported_actions[LZMA_RUN] = true;
+       strm->internal->supported_actions[LZMA_FINISH] = true;
+
+       return LZMA_OK;
+}
diff --git a/src/liblzma/liblzma.map b/src/liblzma/liblzma.map
index bad8633c3b8d2..a1d07f8feea86 100644
--- a/src/liblzma/liblzma.map
+++ b/src/liblzma/liblzma.map
@@ -107,7 +107,8 @@ XZ_5.2 {
 XZ_5.3.1alpha {
 global:
        lzma_file_info_decoder;
-
+       lzma_stream_decoder_mt;
+       lzma_stream_decoder_mt_memusage;
 local:
        *;
 } XZ_5.2;
diff --git a/src/xz/coder.c b/src/xz/coder.c
index 85f954393d8bf..5f11b918378e3 100644
--- a/src/xz/coder.c
+++ b/src/xz/coder.c
@@ -520,9 +520,13 @@ coder_init(file_pair *pair)
                        break;
 
                case FORMAT_XZ:
+#if  0
                        ret = lzma_stream_decoder(&strm,
                                        hardware_memlimit_get(
                                                MODE_DECOMPRESS), flags);
+#else
+                       ret = lzma_stream_decoder_mt(&strm, NULL);
+#endif
                        break;
 
                case FORMAT_LZMA:
-- 
2.29.2


Reply via email to