[PATCH 4/5] migration iaa-compress: Add IAA initialization and deinitialization

2023-10-19 Thread Yuan Liu
This patch defines the structure for IAA jobs related to data
compression and decompression, as well as the initialization and
deinitialization processes for IAA.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/iaa-ram-compress.c | 152 +++
 migration/iaa-ram-compress.h |  20 +
 migration/meson.build|   1 +
 migration/ram-compress.c |  21 +++--
 4 files changed, 189 insertions(+), 5 deletions(-)
 create mode 100644 migration/iaa-ram-compress.c
 create mode 100644 migration/iaa-ram-compress.h

diff --git a/migration/iaa-ram-compress.c b/migration/iaa-ram-compress.c
new file mode 100644
index 00..da45952594
--- /dev/null
+++ b/migration/iaa-ram-compress.c
@@ -0,0 +1,152 @@
+/*
+ * QEMU IAA compression support
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *  Written by:
+ *  Yuan Liu
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/cutils.h"
+#include "qemu/error-report.h"
+#include "migration.h"
+#include "options.h"
+#include "io/channel-null.h"
+#include "exec/target_page.h"
+#include "exec/ramblock.h"
+#include "iaa-ram-compress.h"
+#include "qpl/qpl.h"
+
+/* The IAA work queue maximum depth */
+#define IAA_JOB_NUM (512)
+
+typedef struct {
+CompressResult result;
+ram_addr_t offset; /* The offset of the compressed page in the block */
+RAMBlock *block; /* The block of the compressed page */
+} iaa_comp_param;
+
+typedef struct {
+uint8_t *host; /* Target address for decompression page */
+} iaa_decomp_param;
+
+typedef struct IaaJob {
+QSIMPLEQ_ENTRY(IaaJob) entry;
+bool is_compression;
+uint32_t in_len;
+uint32_t out_len;
+uint8_t *in_buf;
+uint8_t *out_buf;
+qpl_job *qpl; /* It is used to submit (de)compression work to IAA */
+union {
+iaa_comp_param comp;
+iaa_decomp_param decomp;
+} param;
+} IaaJob;
+
+typedef struct IaaJobPool {
+uint32_t pos;
+uint32_t cnt;
+IaaJob *jobs[IAA_JOB_NUM];
+uint8_t *job_in_buf; /* The IAA device input buffers for all IAA jobs */
+uint8_t *job_out_buf; /* The IAA device output buffers for all IAA jobs */
+size_t buf_size;
+} IaaJobPool;
+
+static IaaJobPool iaa_job_pool;
+/* This is used to record jobs that have been submitted but not yet completed 
*/
+static QSIMPLEQ_HEAD(, IaaJob) polling_queue =
+   QSIMPLEQ_HEAD_INITIALIZER(polling_queue);
+
+void iaa_compress_deinit(void)
+{
+for (int i = 0; i < IAA_JOB_NUM; i++) {
+if (iaa_job_pool.jobs[i]) {
+if (iaa_job_pool.jobs[i]->qpl) {
+qpl_fini_job(iaa_job_pool.jobs[i]->qpl);
+g_free(iaa_job_pool.jobs[i]->qpl);
+}
+g_free(iaa_job_pool.jobs[i]);
+}
+}
+if (iaa_job_pool.job_in_buf) {
+munmap(iaa_job_pool.job_in_buf, iaa_job_pool.buf_size);
+iaa_job_pool.job_in_buf = NULL;
+}
+if (iaa_job_pool.job_out_buf) {
+munmap(iaa_job_pool.job_out_buf, iaa_job_pool.buf_size);
+iaa_job_pool.job_out_buf = NULL;
+}
+}
+
+int iaa_compress_init(bool is_decompression)
+{
+qpl_status status;
+IaaJob *job = NULL;
+uint32_t qpl_hw_size = 0;
+int flags = MAP_PRIVATE | MAP_POPULATE | MAP_ANONYMOUS;
+size_t buf_size = IAA_JOB_NUM * qemu_target_page_size();
+
+QSIMPLEQ_INIT(_queue);
+memset(_job_pool, 0, sizeof(IaaJobPool));
+iaa_job_pool.buf_size = buf_size;
+iaa_job_pool.job_out_buf = mmap(NULL, buf_size, PROT_READ | PROT_WRITE,
+flags, -1, 0);
+if (iaa_job_pool.job_out_buf == MAP_FAILED) {
+error_report("Failed to allocate iaa output buffer, error %s",
+ strerror(errno));
+return -1;
+}
+/*
+ * There is no need to allocate an input buffer for the compression
+ * function, the IAA hardware can directly access the virtual machine
+ * memory through the host address through Share Virtual Memory(SVM)
+ */
+if (is_decompression) {
+iaa_job_pool.job_in_buf = mmap(NULL, buf_size, PROT_READ | PROT_WRITE,
+   flags, -1, 0);
+if (iaa_job_pool.job_in_buf == MAP_FAILED) {
+error_report("Failed to allocate iaa input buffer, error %s",
+ strerror(errno));
+goto init_err;
+}
+}
+status = qpl_get_job_size(qpl_path_hardware, _hw_size);
+if (status != QPL_STS_OK) {
+error_report("Failed to initialize iaa hardware, error %d", status);
+goto init_err;
+}
+for (int i = 0; i < IAA_JOB_NUM; i++) {
+size_t buf_offset = qemu_target_page_size

[PATCH 1/5] configure: add qpl meson option

2023-10-19 Thread Yuan Liu
Intel Query Processing Library (QPL) is an open-source library that
supports features of the new Intel In-Memory Analytics Accelerator (IAA)
available on Intel Xeon Sapphire Rapids processors, including
high-throughput compression and decompression.

add --enable-qpl and --disable-qpl options for data (de)compression
using IAA during the live migration process.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 meson.build   | 9 -
 meson_options.txt | 2 ++
 scripts/meson-buildoptions.sh | 3 +++
 3 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/meson.build b/meson.build
index 79aef19bdc..0a69bf68cf 100644
--- a/meson.build
+++ b/meson.build
@@ -1032,6 +1032,11 @@ if not get_option('zstd').auto() or have_block
 required: get_option('zstd'),
 method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto()
+qpl = dependency('libqpl', required: get_option('qpl'),
+ method: 'pkg-config')
+endif
 virgl = not_found
 
 have_vhost_user_gpu = have_tools and targetos == 'linux' and pixman.found()
@@ -2158,6 +2163,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', 
has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -3616,7 +3622,7 @@ libmigration = static_library('migration', sources: 
migration_files + genh,
   name_suffix: 'fa',
   build_by_default: false)
 migration = declare_dependency(link_with: libmigration,
-   dependencies: [zlib, qom, io])
+   dependencies: [zlib, qom, io, qpl])
 system_ss.add(migration)
 
 block_ss = block_ss.apply(config_targetos, strict: false)
@@ -4281,6 +4287,7 @@ summary_info += {'blkio support': blkio}
 summary_info += {'curl support':  curl}
 summary_info += {'Multipath support': mpathpersist}
 summary_info += {'Linux AIO support': libaio}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'Linux io_uring support': linux_io_uring}
 summary_info += {'ATTR/XATTR support': libattr}
 summary_info += {'RDMA support':  rdma}
diff --git a/meson_options.txt b/meson_options.txt
index 6a17b90968..e8e7e37893 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -251,6 +251,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+   description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 2a74b0275b..e2adb13ce5 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -206,6 +206,7 @@ meson_options_help() {
   printf "%s\n" '  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon   xkbcommon support'
   printf "%s\n" '  zstdzstd compression support'
+  printf "%s\n" '  qpl Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -417,6 +418,8 @@ _meson_option_parse() {
 --disable-qga-vss) printf "%s" -Dqga_vss=disabled ;;
 --enable-qom-cast-debug) printf "%s" -Dqom_cast_debug=true ;;
 --disable-qom-cast-debug) printf "%s" -Dqom_cast_debug=false ;;
+--enable-qpl) printf "%s" -Dqpl=enabled ;;
+--disable-qpl) printf "%s" -Dqpl=disabled ;;
 --enable-rbd) printf "%s" -Drbd=enabled ;;
 --disable-rbd) printf "%s" -Drbd=disabled ;;
 --enable-rdma) printf "%s" -Drdma=enabled ;;
-- 
2.39.3




[PATCH 3/5] ram compress: Refactor ram compression functions

2023-10-19 Thread Yuan Liu
Refactor legacy RAM compression functions to support both IAA
compression and CPU compression.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/migration.c|  6 +--
 migration/ram-compress.c | 81 
 migration/ram-compress.h | 10 ++---
 migration/ram.c  | 18 ++---
 4 files changed, 86 insertions(+), 29 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 585d3c8f55..08a9c313d0 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -237,7 +237,7 @@ void migration_incoming_state_destroy(void)
 struct MigrationIncomingState *mis = migration_incoming_get_current();
 
 multifd_load_cleanup();
-compress_threads_load_cleanup();
+ram_compress_load_cleanup();
 
 if (mis->to_src_file) {
 /* Tell source that we are done */
@@ -524,7 +524,7 @@ process_incoming_migration_co(void *opaque)
 
 assert(mis->from_src_file);
 
-if (compress_threads_load_setup(mis->from_src_file)) {
+if (ram_compress_load_setup(mis->from_src_file)) {
 error_report("Failed to setup decompress threads");
 goto fail;
 }
@@ -577,7 +577,7 @@ fail:
 qemu_fclose(mis->from_src_file);
 
 multifd_load_cleanup();
-compress_threads_load_cleanup();
+ram_compress_load_cleanup();
 
 exit(EXIT_FAILURE);
 }
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index 06254d8c69..47357352f7 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -105,11 +105,11 @@ static void *do_data_compress(void *opaque)
 return NULL;
 }
 
-void compress_threads_save_cleanup(void)
+static void compress_threads_save_cleanup(void)
 {
 int i, thread_count;
 
-if (!migrate_compress() || !comp_param) {
+if (!comp_param) {
 return;
 }
 
@@ -144,13 +144,10 @@ void compress_threads_save_cleanup(void)
 comp_param = NULL;
 }
 
-int compress_threads_save_setup(void)
+static int compress_threads_save_setup(void)
 {
 int i, thread_count;
 
-if (!migrate_compress()) {
-return 0;
-}
 thread_count = migrate_compress_threads();
 compress_threads = g_new0(QemuThread, thread_count);
 comp_param = g_new0(CompressParam, thread_count);
@@ -370,6 +367,11 @@ int wait_for_decompress_done(void)
 return 0;
 }
 
+if (migrate_compress_with_iaa()) {
+/* Implement in next patch */
+return 0;
+}
+
 thread_count = migrate_decompress_threads();
 qemu_mutex_lock(_done_lock);
 for (idx = 0; idx < thread_count; idx++) {
@@ -381,13 +383,10 @@ int wait_for_decompress_done(void)
 return qemu_file_get_error(decomp_file);
 }
 
-void compress_threads_load_cleanup(void)
+static void compress_threads_load_cleanup(void)
 {
 int i, thread_count;
 
-if (!migrate_compress()) {
-return;
-}
 thread_count = migrate_decompress_threads();
 for (i = 0; i < thread_count; i++) {
 /*
@@ -422,14 +421,10 @@ void compress_threads_load_cleanup(void)
 decomp_file = NULL;
 }
 
-int compress_threads_load_setup(QEMUFile *f)
+static int compress_threads_load_setup(QEMUFile *f)
 {
 int i, thread_count;
 
-if (!migrate_compress()) {
-return 0;
-}
-
 thread_count = migrate_decompress_threads();
 decompress_threads = g_new0(QemuThread, thread_count);
 decomp_param = g_new0(DecompressParam, thread_count);
@@ -457,7 +452,7 @@ exit:
 return -1;
 }
 
-void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
+static void decompress_data_with_multi_threads(QEMUFile *f, void *host, int 
len)
 {
 int idx, thread_count;
 
@@ -483,3 +478,57 @@ void decompress_data_with_multi_threads(QEMUFile *f, void 
*host, int len)
 }
 }
 }
+
+int ram_compress_save_setup(void)
+{
+if (!migrate_compress()) {
+return 0;
+}
+if (migrate_compress_with_iaa()) {
+/* Implement in next patch */
+return 0;
+}
+return compress_threads_save_setup();
+}
+
+void ram_compress_save_cleanup(void)
+{
+if (!migrate_compress()) {
+return;
+}
+if (migrate_compress_with_iaa()) {
+/* Implement in next patch */
+return;
+}
+compress_threads_save_cleanup();
+}
+
+void ram_decompress_data(QEMUFile *f, void *host, int len)
+{
+if (migrate_compress_with_iaa()) {
+/* Implement in next patch */
+}
+decompress_data_with_multi_threads(f, host, len);
+}
+
+int ram_compress_load_setup(QEMUFile *f)
+{
+if (!migrate_compress()) {
+return 0;
+}
+if (migrate_compress_with_iaa()) {
+/* Implement in next patch */
+}
+return compress_threads_load_setup(f);
+}
+
+void ram_compress_load_cleanup(void)
+{
+if (!migrate_compress()) {
+return;
+}
+if (migrate_compress_with_iaa()) {
+/* Implement in next patch */
+}
+compress_threads_load_cleanup();
+}
diff --git a/migration

[PATCH 2/5] qapi/migration: Introduce compress-with-iaa migration parameter

2023-10-19 Thread Yuan Liu
Introduce the compress-with-iaa=on/off option to enable or disable live
migration data (de)compression with the In-Memory Analytics Accelerator
(IAA).

The data (de)compression with IAA feature is based on the migration
compression capability, which is enabled by setting
migrate_set_capability compress on. If the migration compression
capability is enabled and the IAA compression parameter is set, IAA will
be used instead of CPU for data (de)compression.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/migration-hmp-cmds.c |  8 
 migration/options.c| 20 
 migration/options.h|  1 +
 qapi/migration.json|  4 +++-
 4 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/migration/migration-hmp-cmds.c b/migration/migration-hmp-cmds.c
index c115ef2d23..38e441bb37 100644
--- a/migration/migration-hmp-cmds.c
+++ b/migration/migration-hmp-cmds.c
@@ -281,6 +281,10 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict 
*qdict)
 monitor_printf(mon, "%s: %u\n",
 MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_THREADS),
 params->compress_threads);
+assert(params->has_compress_with_iaa);
+monitor_printf(mon, "%s: %s\n",
+MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_WITH_IAA),
+params->compress_with_iaa ? "on" : "off");
 assert(params->has_compress_wait_thread);
 monitor_printf(mon, "%s: %s\n",
 MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD),
@@ -517,6 +521,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict 
*qdict)
 p->has_compress_threads = true;
 visit_type_uint8(v, param, >compress_threads, );
 break;
+case MIGRATION_PARAMETER_COMPRESS_WITH_IAA:
+p->has_compress_with_iaa = true;
+visit_type_bool(v, param, >compress_with_iaa, );
+break;
 case MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD:
 p->has_compress_wait_thread = true;
 visit_type_bool(v, param, >compress_wait_thread, );
diff --git a/migration/options.c b/migration/options.c
index 1d1e1321b0..06d4b36b77 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -107,6 +107,8 @@ Property migration_properties[] = {
 DEFINE_PROP_UINT8("x-compress-threads", MigrationState,
   parameters.compress_threads,
   DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
+DEFINE_PROP_BOOL("x-compress-with-iaa", MigrationState,
+  parameters.compress_with_iaa, false),
 DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState,
   parameters.compress_wait_thread, true),
 DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
@@ -724,6 +726,13 @@ int migrate_compress_threads(void)
 return s->parameters.compress_threads;
 }
 
+bool migrate_compress_with_iaa(void)
+{
+MigrationState *s = migrate_get_current();
+
+return s->parameters.compress_with_iaa;
+}
+
 int migrate_compress_wait_thread(void)
 {
 MigrationState *s = migrate_get_current();
@@ -899,6 +908,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error 
**errp)
 params->compress_level = s->parameters.compress_level;
 params->has_compress_threads = true;
 params->compress_threads = s->parameters.compress_threads;
+params->has_compress_with_iaa = true;
+params->compress_with_iaa = s->parameters.compress_with_iaa;
 params->has_compress_wait_thread = true;
 params->compress_wait_thread = s->parameters.compress_wait_thread;
 params->has_decompress_threads = true;
@@ -969,6 +980,7 @@ void migrate_params_init(MigrationParameters *params)
 /* Set has_* up only for parameter checks */
 params->has_compress_level = true;
 params->has_compress_threads = true;
+params->has_compress_with_iaa = true;
 params->has_compress_wait_thread = true;
 params->has_decompress_threads = true;
 params->has_throttle_trigger_threshold = true;
@@ -1195,6 +1207,10 @@ static void 
migrate_params_test_apply(MigrateSetParameters *params,
 dest->decompress_threads = params->decompress_threads;
 }
 
+if (params->has_compress_with_iaa) {
+dest->compress_with_iaa = params->compress_with_iaa;
+}
+
 if (params->has_throttle_trigger_threshold) {
 dest->throttle_trigger_threshold = params->throttle_trigger_threshold;
 }
@@ -1300,6 +1316,10 @@ static void migrate_params_apply(MigrateSetParameters 
*params, Error **errp)
 s->parameters.decompress_threads = params->decompress_threads;
 }
 
+if (params->has_compress_with_iaa) {
+s->parameters.compress_with_iaa = params->compress_with_iaa;
+}
+
 if (params-&g

[PATCH 5/5] migration iaa-compress: Implement IAA compression

2023-10-19 Thread Yuan Liu
Implement the functions of IAA for data compression and decompression.
The implementation uses non-blocking job submission and polling to check
the job completion status to reduce IAA's overhead in the live migration
process.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/iaa-ram-compress.c | 167 +++
 migration/iaa-ram-compress.h |   7 ++
 migration/ram-compress.c |  10 ++-
 migration/ram.c  |  56 ++--
 4 files changed, 232 insertions(+), 8 deletions(-)

diff --git a/migration/iaa-ram-compress.c b/migration/iaa-ram-compress.c
index da45952594..243aeb6d55 100644
--- a/migration/iaa-ram-compress.c
+++ b/migration/iaa-ram-compress.c
@@ -12,6 +12,7 @@
 
 #include "qemu/osdep.h"
 #include "qemu/cutils.h"
+
 #include "qemu/error-report.h"
 #include "migration.h"
 #include "options.h"
@@ -62,6 +63,31 @@ static IaaJobPool iaa_job_pool;
 static QSIMPLEQ_HEAD(, IaaJob) polling_queue =
QSIMPLEQ_HEAD_INITIALIZER(polling_queue);
 
+static IaaJob *get_job(send_iaa_data send_page)
+{
+IaaJob *job;
+
+retry:
+/* Wait for a job to complete when there is no available job */
+if (iaa_job_pool.cnt == IAA_JOB_NUM) {
+flush_iaa_jobs(false, send_page);
+goto retry;
+}
+job = iaa_job_pool.jobs[iaa_job_pool.pos];
+iaa_job_pool.pos++;
+iaa_job_pool.cnt++;
+if (iaa_job_pool.pos == IAA_JOB_NUM) {
+iaa_job_pool.pos = 0;
+}
+return job;
+}
+
+static void put_job(IaaJob *job)
+{
+assert(iaa_job_pool.cnt > 0);
+iaa_job_pool.cnt--;
+}
+
 void iaa_compress_deinit(void)
 {
 for (int i = 0; i < IAA_JOB_NUM; i++) {
@@ -150,3 +176,144 @@ init_err:
 iaa_compress_deinit();
 return -1;
 }
+
+static void process_completed_job(IaaJob *job, send_iaa_data send_page)
+{
+if (job->is_compression) {
+send_page(job->param.comp.block, job->param.comp.offset,
+  job->out_buf, job->out_len, job->param.comp.result);
+} else {
+assert(job->out_len == qemu_target_page_size());
+memcpy(job->param.decomp.host, job->out_buf, job->out_len);
+}
+put_job(job);
+}
+
+static qpl_status check_job_status(IaaJob *job, bool block)
+{
+qpl_status status;
+qpl_job *qpl = job->qpl;
+
+status = block ? qpl_wait_job(qpl) : qpl_check_job(qpl);
+if (status == QPL_STS_OK) {
+job->out_len = qpl->total_out;
+if (job->is_compression) {
+job->param.comp.result = RES_COMPRESS;
+/* if no compression benefit, send a normal page for migration */
+if (job->out_len == qemu_target_page_size()) {
+iaa_comp_param *param = &(job->param.comp);
+memcpy(job->out_buf, (param->block->host + param->offset),
+   job->out_len);
+job->param.comp.result = RES_NONE;
+}
+}
+} else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+if (job->is_compression) {
+/*
+ * if the compressed data is larger than the original data, send a
+ * normal page for migration, in this case, IAA has copied the
+ * original data to job->out_buf automatically.
+ */
+job->out_len = qemu_target_page_size();
+job->param.comp.result = RES_NONE;
+status = QPL_STS_OK;
+}
+}
+return status;
+}
+
+static void check_polling_jobs(send_iaa_data send_page)
+{
+IaaJob *job, *job_next;
+qpl_status status;
+
+QSIMPLEQ_FOREACH_SAFE(job, _queue, entry, job_next) {
+status = check_job_status(job, false);
+if (status == QPL_STS_OK) { /* job has done */
+process_completed_job(job, send_page);
+QSIMPLEQ_REMOVE_HEAD(_queue, entry);
+} else if (status == QPL_STS_BEING_PROCESSED) { /* job is running */
+break;
+} else {
+abort();
+}
+}
+}
+
+static int submit_new_job(IaaJob *job)
+{
+qpl_status status;
+qpl_job *qpl = job->qpl;
+
+qpl->op = job->is_compression ? qpl_op_compress : qpl_op_decompress;
+qpl->next_in_ptr = job->in_buf;
+qpl->next_out_ptr = job->out_buf;
+qpl->available_in = job->in_len;
+qpl->available_out = qemu_target_page_size(); /* outbuf maximum size */
+qpl->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+qpl->level = 1; /* only level 1 compression is supported */
+
+do {
+status = qpl_submit_job(qpl);
+} while (status == QPL_STS_QUEUES_ARE_BUSY_ERR);
+
+if (status != QPL_STS_OK) {
+error_report("Failed to submit iaa job, error %d", status);
+return -1;
+}
+QSIMPLEQ_INSERT_TAIL(_queue, job, entry);
+return 0;
+}
+
+int flush_

[PATCH 0/5] Live Migration Acceleration with IAA Compression

2023-10-19 Thread Yuan Liu
Hi,

I am writing to submit a code change aimed at enhancing live migration
acceleration by leveraging the compression capability of the Intel
In-Memory Analytics Accelerator (IAA).

Enabling compression functionality during the live migration process can
enhance performance, thereby reducing downtime and network bandwidth
requirements. However, this improvement comes at the cost of additional
CPU resources, posing a challenge for cloud service providers in terms of
resource allocation. To address this challenge, I have focused on offloading
the compression overhead to the IAA hardware, resulting in performance gains.

The implementation of the IAA (de)compression code is based on Intel Query
Processing Library (QPL), an open-source software project designed for
IAA high-level software programming.

Best regards,
Yuan Liu

Yuan Liu (5):
  configure: add qpl meson option
  qapi/migration: Introduce compress-with-iaa migration parameter
  ram compress: Refactor ram compression interfaces
  migration iaa-compress: Add IAA initialization and deinitialization
  migration iaa-compress: Implement IAA compression

 meson.build|   9 +-
 meson_options.txt  |   2 +
 migration/iaa-ram-compress.c   | 319 +
 migration/iaa-ram-compress.h   |  27 +++
 migration/meson.build  |   1 +
 migration/migration-hmp-cmds.c |   8 +
 migration/migration.c  |   6 +-
 migration/options.c|  20 +++
 migration/options.h|   1 +
 migration/ram-compress.c   |  96 --
 migration/ram-compress.h   |  10 +-
 migration/ram.c|  68 ++-
 qapi/migration.json|   4 +-
 scripts/meson-buildoptions.sh  |   3 +
 14 files changed, 541 insertions(+), 33 deletions(-)
 create mode 100644 migration/iaa-ram-compress.c
 create mode 100644 migration/iaa-ram-compress.h

-- 
2.39.3




[PATCH v2 4/4] multifd: Introduce QPL compression accelerator

2023-11-09 Thread Yuan Liu
Intel Query Processing Library (QPL) is an open-source library
for data compression, it supports the deflate compression algorithm,
compatible with Zlib and GZIP.

QPL supports both software compression and hardware compression.
Software compression is based on instruction optimization to accelerate
data compression, and it can be widely used on Intel CPUs. Hardware
compression utilizes the Intel In-Memory Analytics Accelerator (IAA)
hardware which is available on Intel Xeon Sapphire Rapids processors.

During multifd live migration, the QPL accelerator can be specified to
accelerate the Zlib compression algorithm. QPL can automatically choose
software or hardware acceleration based on the platform.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/meson.build   |   1 +
 migration/multifd-qpl.c | 326 
 2 files changed, 327 insertions(+)
 create mode 100644 migration/multifd-qpl.c

diff --git a/migration/meson.build b/migration/meson.build
index 92b1cc4297..c155c2d781 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -40,6 +40,7 @@ if get_option('live_block_migration').allowed()
   system_ss.add(files('block.c'))
 endif
 system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
+system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))

 specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
 if_true: files('ram.c',
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
new file mode 100644
index 00..9d2ca9e44e
--- /dev/null
+++ b/migration/multifd-qpl.c
@@ -0,0 +1,326 @@
+/*
+ * Multifd qpl compression accelerator implementation
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *
+ * Authors:
+ *  Yuan Liu
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/rcu.h"
+#include "exec/ramblock.h"
+#include "exec/target_page.h"
+#include "qapi/error.h"
+#include "migration.h"
+#include "trace.h"
+#include "options.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+#define MAX_BUF_SIZE (MULTIFD_PACKET_SIZE * 2)
+
+static bool support_compression_methods[MULTIFD_COMPRESSION__MAX];
+
+struct qpl_data {
+qpl_job *job;
+/* compressed data buffer */
+uint8_t *buf;
+/* decompressed data buffer */
+uint8_t *zbuf;
+};
+
+static int init_qpl(struct qpl_data *qpl, uint8_t channel_id,  Error **errp)
+{
+qpl_status status;
+qpl_path_t path = qpl_path_auto;
+uint32_t job_size = 0;
+
+status = qpl_get_job_size(path, _size);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multfd: %u: failed to get QPL size, error %d",
+   channel_id, status);
+return -1;
+}
+
+qpl->job = g_try_malloc0(job_size);
+if (!qpl->job) {
+error_setg(errp, "multfd: %u: failed to allocate QPL job", channel_id);
+return -1;
+}
+
+status = qpl_init_job(path, qpl->job);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multfd: %u: failed to init QPL hardware, error %d",
+   channel_id, status);
+g_free(qpl->job);
+return -1;
+}
+return 0;
+}
+
+static void deinit_qpl(struct qpl_data *qpl)
+{
+if (qpl->job) {
+qpl_fini_job(qpl->job);
+g_free(qpl->job);
+}
+}
+
+/**
+ * qpl_send_setup: setup send side
+ *
+ * Setup each channel with QPL compression.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int qpl_send_setup(MultiFDSendParams *p, Error **errp)
+{
+struct qpl_data *qpl = g_new0(struct qpl_data, 1);
+/* prefault the memory to avoid the IO page faults */
+int flags = MAP_PRIVATE | MAP_POPULATE | MAP_ANONYMOUS;
+const char *err_msg;
+
+if (init_qpl(qpl, p->id, errp) != 0) {
+err_msg = "failed to initialize QPL\n";
+goto err_qpl_init;
+}
+qpl->zbuf = mmap(NULL, MAX_BUF_SIZE, PROT_READ | PROT_WRITE, flags, -1, 0);
+if (qpl->zbuf == MAP_FAILED) {
+err_msg = "failed to allocate QPL zbuf\n";
+goto err_zbuf_mmap;
+}
+p->data = qpl;
+return 0;
+
+err_zbuf_mmap:
+deinit_qpl(qpl);
+err_qpl_init:
+g_free(qpl);
+error_setg(errp, "multifd %u: %s", p->id, err_msg);
+return -1;
+}
+
+/**
+ * qpl_send_cleanup: cleanup send side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static void qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
+{
+struct qpl_data *qpl = p->data;
+
+deinit_qpl(qpl);
+if (qpl->zbuf) {
+munmap(qpl->zbuf, MAX_BUF_SIZE);
+qpl->zbuf = NULL;
+}
+

[PATCH v2 1/4] migration: Introduce multifd-compression-accel parameter

2023-11-09 Thread Yuan Liu
Introduce the multifd-compression-accel option to enable or disable live
migration data (de)compression accelerator.

The default value of multifd-compression-accel is auto, and the enabling
and selection of the accelerator are automatically detected. By setting
multifd-compression-accel=none, the acceleration function can be disabled.
Similarly, users can explicitly specify a specific accelerator name, such
as multifd-compression-accel=qpl.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 hw/core/qdev-properties-system.c| 11 +++
 include/hw/qdev-properties-system.h |  4 
 migration/migration-hmp-cmds.c  | 10 ++
 migration/options.c | 24 
 migration/options.h |  1 +
 qapi/migration.json | 26 +-
 6 files changed, 75 insertions(+), 1 deletion(-)

diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index 688340610e..ed23035845 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -673,6 +673,17 @@ const PropertyInfo qdev_prop_multifd_compression = {
 .set_default_value = qdev_propinfo_set_default_value_enum,
 };

+/* --- MultiFD Compression Accelerator --- */
+
+const PropertyInfo qdev_prop_multifd_compression_accel = {
+.name = "MultiFDCompressionAccel",
+.description = "MultiFD Compression Accelerator, "
+   "auto/none/qpl",
+.enum_table = _lookup,
+.get = qdev_propinfo_get_enum,
+.set = qdev_propinfo_set_enum,
+.set_default_value = qdev_propinfo_set_default_value_enum,
+};
 /* --- Reserved Region --- */

 /*
diff --git a/include/hw/qdev-properties-system.h 
b/include/hw/qdev-properties-system.h
index 0ac327ae60..da086bd836 100644
--- a/include/hw/qdev-properties-system.h
+++ b/include/hw/qdev-properties-system.h
@@ -7,6 +7,7 @@ extern const PropertyInfo qdev_prop_chr;
 extern const PropertyInfo qdev_prop_macaddr;
 extern const PropertyInfo qdev_prop_reserved_region;
 extern const PropertyInfo qdev_prop_multifd_compression;
+extern const PropertyInfo qdev_prop_multifd_compression_accel;
 extern const PropertyInfo qdev_prop_losttickpolicy;
 extern const PropertyInfo qdev_prop_blockdev_on_error;
 extern const PropertyInfo qdev_prop_bios_chs_trans;
@@ -41,6 +42,9 @@ extern const PropertyInfo qdev_prop_pcie_link_width;
 #define DEFINE_PROP_MULTIFD_COMPRESSION(_n, _s, _f, _d) \
 DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_multifd_compression, \
MultiFDCompression)
+#define DEFINE_PROP_MULTIFD_COMPRESSION_ACCEL(_n, _s, _f, _d) \
+DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_multifd_compression_accel, \
+   MultiFDCompressionAccel)
 #define DEFINE_PROP_LOSTTICKPOLICY(_n, _s, _f, _d) \
 DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_losttickpolicy, \
 LostTickPolicy)
diff --git a/migration/migration-hmp-cmds.c b/migration/migration-hmp-cmds.c
index a82597f18e..3a278c89d9 100644
--- a/migration/migration-hmp-cmds.c
+++ b/migration/migration-hmp-cmds.c
@@ -344,6 +344,11 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict 
*qdict)
 monitor_printf(mon, "%s: %s\n",
 MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_COMPRESSION),
 MultiFDCompression_str(params->multifd_compression));
+assert(params->has_multifd_compression_accel);
+monitor_printf(mon, "%s: %s\n",
+MigrationParameter_str(
+MIGRATION_PARAMETER_MULTIFD_COMPRESSION_ACCEL),
+MultiFDCompressionAccel_str(params->multifd_compression_accel));
 monitor_printf(mon, "%s: %" PRIu64 " bytes\n",
 MigrationParameter_str(MIGRATION_PARAMETER_XBZRLE_CACHE_SIZE),
 params->xbzrle_cache_size);
@@ -610,6 +615,11 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict 
*qdict)
 visit_type_MultiFDCompression(v, param, >multifd_compression,
   );
 break;
+case MIGRATION_PARAMETER_MULTIFD_COMPRESSION_ACCEL:
+p->has_multifd_compression_accel = true;
+visit_type_MultiFDCompressionAccel(v, param,
+   >multifd_compression_accel, 
);
+break;
 case MIGRATION_PARAMETER_MULTIFD_ZLIB_LEVEL:
 p->has_multifd_zlib_level = true;
 visit_type_uint8(v, param, >multifd_zlib_level, );
diff --git a/migration/options.c b/migration/options.c
index 42fb818956..4c567c49e6 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -59,6 +59,8 @@
 #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100)
 #define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2
 #define DEFAULT_MIGRATE_MULTIFD_COMPRESSION MULTIFD_COMPRESSION_NONE
+/* By default use the accelerator for multifd compression */
+#define DEFAULT_MIGRATE_MULTIFD_COMPRESSION_ACCEL 

[PATCH v2 0/4] Live Migration Acceleration with IAA Compression

2023-11-09 Thread Yuan Liu
   |
 ++-+++-+--+

v2:
  - add support for multifd compression accelerator
  - add support for the QPL accelerator in the multifd
compression accelerator
  - fixed the issue that QPL was compiled into the migration
module by default

Yuan Liu (4):
  migration: Introduce multifd-compression-accel parameter
  multifd: Implement multifd compression accelerator
  configure: add qpl option
  multifd: Introduce QPL compression accelerator

 hw/core/qdev-properties-system.c|  11 +
 include/hw/qdev-properties-system.h |   4 +
 meson.build |   7 +
 meson_options.txt   |   2 +
 migration/meson.build   |   1 +
 migration/migration-hmp-cmds.c  |  10 +
 migration/multifd-qpl.c | 326 
 migration/multifd.c |  38 +++-
 migration/multifd.h |   8 +
 migration/options.c |  24 ++
 migration/options.h |   1 +
 qapi/migration.json |  26 ++-
 scripts/meson-buildoptions.sh   |   3 +
 13 files changed, 458 insertions(+), 3 deletions(-)
 create mode 100644 migration/multifd-qpl.c

--
2.39.3




[PATCH v2 2/4] multifd: Implement multifd compression accelerator

2023-11-09 Thread Yuan Liu
when starting multifd live migration, if the compression method is
enabled, compression method can be accelerated using accelerators.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd.c | 38 --
 migration/multifd.h |  8 
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 1fe53d3b98..7149e67867 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -165,6 +165,34 @@ static MultiFDMethods multifd_nocomp_ops = {
 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
 [MULTIFD_COMPRESSION_NONE] = _nocomp_ops,
 };
+static MultiFDAccelMethods *accel_multifd_ops[MULTIFD_COMPRESSION_ACCEL__MAX];
+
+static MultiFDMethods *get_multifd_ops(void)
+{
+MultiFDCompression comp = migrate_multifd_compression();
+MultiFDCompressionAccel accel = migrate_multifd_compression_accel();
+
+if (comp == MULTIFD_COMPRESSION_NONE ||
+accel == MULTIFD_COMPRESSION_ACCEL_NONE) {
+return multifd_ops[comp];
+}
+if (accel == MULTIFD_COMPRESSION_ACCEL_AUTO) {
+for (int i = 0; i < MULTIFD_COMPRESSION_ACCEL__MAX; i++) {
+if (accel_multifd_ops[i] &&
+accel_multifd_ops[i]->is_supported(comp)) {
+return accel_multifd_ops[i]->get_multifd_methods();
+}
+}
+return multifd_ops[comp];
+}
+
+/* Check if a specified accelerator is available */
+if (accel_multifd_ops[accel] &&
+accel_multifd_ops[accel]->is_supported(comp)) {
+return accel_multifd_ops[accel]->get_multifd_methods();
+}
+return multifd_ops[comp];
+}

 void multifd_register_ops(int method, MultiFDMethods *ops)
 {
@@ -172,6 +200,12 @@ void multifd_register_ops(int method, MultiFDMethods *ops)
 multifd_ops[method] = ops;
 }

+void multifd_register_accel_ops(int accel, MultiFDAccelMethods *ops)
+{
+assert(0 < accel && accel < MULTIFD_COMPRESSION_ACCEL__MAX);
+accel_multifd_ops[accel] = ops;
+}
+
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
 MultiFDInit_t msg = {};
@@ -922,7 +956,7 @@ int multifd_save_setup(Error **errp)
 multifd_send_state->pages = multifd_pages_init(page_count);
 qemu_sem_init(_send_state->channels_ready, 0);
 qatomic_set(_send_state->exiting, 0);
-multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
+multifd_send_state->ops = get_multifd_ops();

 for (i = 0; i < thread_count; i++) {
 MultiFDSendParams *p = _send_state->params[i];
@@ -1180,7 +1214,7 @@ int multifd_load_setup(Error **errp)
 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
 qatomic_set(_recv_state->count, 0);
 qemu_sem_init(_recv_state->sem_sync, 0);
-multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
+multifd_recv_state->ops = get_multifd_ops();

 for (i = 0; i < thread_count; i++) {
 MultiFDRecvParams *p = _recv_state->params[i];
diff --git a/migration/multifd.h b/migration/multifd.h
index a835643b48..c40ff79443 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -206,7 +206,15 @@ typedef struct {
 int (*recv_pages)(MultiFDRecvParams *p, Error **errp);
 } MultiFDMethods;

+typedef struct {
+/* Check if the compression method supports acceleration */
+bool (*is_supported) (MultiFDCompression compression);
+/* Get multifd methods of the accelerator */
+MultiFDMethods* (*get_multifd_methods)(void);
+} MultiFDAccelMethods;
+
 void multifd_register_ops(int method, MultiFDMethods *ops);
+void multifd_register_accel_ops(int accel, MultiFDAccelMethods *ops);

 #endif

--
2.39.3




[PATCH v2 3/4] configure: add qpl option

2023-11-09 Thread Yuan Liu
the Query Processing Library (QPL) is an open-source library that
supports data compression and decompression features.

add --enable-qpl and --disable-qpl options to enable and disable
the QPL compression accelerator. The QPL compression accelerator
can accelerate the Zlib compression algorithm during the live migration.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 meson.build   | 7 +++
 meson_options.txt | 2 ++
 scripts/meson-buildoptions.sh | 3 +++
 3 files changed, 12 insertions(+)

diff --git a/meson.build b/meson.build
index 259dc5f308..b4ba30b4fa 100644
--- a/meson.build
+++ b/meson.build
@@ -1032,6 +1032,11 @@ if not get_option('zstd').auto() or have_block
 required: get_option('zstd'),
 method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto()
+qpl = dependency('libqpl', required: get_option('qpl'),
+ method: 'pkg-config')
+endif
 virgl = not_found

 have_vhost_user_gpu = have_tools and targetos == 'linux' and pixman.found()
@@ -2165,6 +2170,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', 
has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -4325,6 +4331,7 @@ summary_info += {'snappy support':snappy}
 summary_info += {'bzip2 support': libbzip2}
 summary_info += {'lzfse support': liblzfse}
 summary_info += {'zstd support':  zstd}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'NUMA host support': numa}
 summary_info += {'capstone':  capstone}
 summary_info += {'libpmem support':   libpmem}
diff --git a/meson_options.txt b/meson_options.txt
index 3c7398f3c6..71cd533985 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -255,6 +255,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+   description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 7ca4b77eae..0909d1d517 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -220,6 +220,7 @@ meson_options_help() {
   printf "%s\n" '  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon   xkbcommon support'
   printf "%s\n" '  zstdzstd compression support'
+  printf "%s\n" '  qpl Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -556,6 +557,8 @@ _meson_option_parse() {
 --disable-xkbcommon) printf "%s" -Dxkbcommon=disabled ;;
 --enable-zstd) printf "%s" -Dzstd=enabled ;;
 --disable-zstd) printf "%s" -Dzstd=disabled ;;
+--enable-qpl) printf "%s" -Dqpl=enabled ;;
+--disable-qpl) printf "%s" -Dqpl=disabled ;;
 *) return 1 ;;
   esac
 }
--
2.39.3




[PATCH v3 1/4] migration: Introduce multifd-compression-accel parameter

2024-01-03 Thread Yuan Liu
Introduce the multifd-compression-accel option to enable or disable live
migration data (de)compression accelerator.

The default value of multifd-compression-accel is auto, and the enabling
and selection of the accelerator are automatically detected. By setting
multifd-compression-accel=none, the acceleration function can be disabled.
Similarly, users can explicitly specify a specific accelerator name, such
as multifd-compression-accel=qpl.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 hw/core/qdev-properties-system.c| 11 ++
 include/hw/qdev-properties-system.h |  4 
 migration/migration-hmp-cmds.c  | 10 ++
 migration/options.c | 28 ++
 migration/options.h |  1 +
 qapi/migration.json | 31 -
 6 files changed, 84 insertions(+), 1 deletion(-)

diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index 688340610e..ed23035845 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -673,6 +673,17 @@ const PropertyInfo qdev_prop_multifd_compression = {
 .set_default_value = qdev_propinfo_set_default_value_enum,
 };
 
+/* --- MultiFD Compression Accelerator --- */
+
+const PropertyInfo qdev_prop_multifd_compression_accel = {
+.name = "MultiFDCompressionAccel",
+.description = "MultiFD Compression Accelerator, "
+   "auto/none/qpl",
+.enum_table = _lookup,
+.get = qdev_propinfo_get_enum,
+.set = qdev_propinfo_set_enum,
+.set_default_value = qdev_propinfo_set_default_value_enum,
+};
 /* --- Reserved Region --- */
 
 /*
diff --git a/include/hw/qdev-properties-system.h 
b/include/hw/qdev-properties-system.h
index 0ac327ae60..3c125db3a3 100644
--- a/include/hw/qdev-properties-system.h
+++ b/include/hw/qdev-properties-system.h
@@ -7,6 +7,7 @@ extern const PropertyInfo qdev_prop_chr;
 extern const PropertyInfo qdev_prop_macaddr;
 extern const PropertyInfo qdev_prop_reserved_region;
 extern const PropertyInfo qdev_prop_multifd_compression;
+extern const PropertyInfo qdev_prop_multifd_compression_accel;
 extern const PropertyInfo qdev_prop_losttickpolicy;
 extern const PropertyInfo qdev_prop_blockdev_on_error;
 extern const PropertyInfo qdev_prop_bios_chs_trans;
@@ -41,6 +42,9 @@ extern const PropertyInfo qdev_prop_pcie_link_width;
 #define DEFINE_PROP_MULTIFD_COMPRESSION(_n, _s, _f, _d) \
 DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_multifd_compression, \
MultiFDCompression)
+#define DEFINE_PROP_MULTIFD_COMP_ACCEL(_n, _s, _f, _d) \
+DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_multifd_compression_accel, \
+   MultiFDCompressionAccel)
 #define DEFINE_PROP_LOSTTICKPOLICY(_n, _s, _f, _d) \
 DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_losttickpolicy, \
 LostTickPolicy)
diff --git a/migration/migration-hmp-cmds.c b/migration/migration-hmp-cmds.c
index a82597f18e..3a278c89d9 100644
--- a/migration/migration-hmp-cmds.c
+++ b/migration/migration-hmp-cmds.c
@@ -344,6 +344,11 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict 
*qdict)
 monitor_printf(mon, "%s: %s\n",
 MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_COMPRESSION),
 MultiFDCompression_str(params->multifd_compression));
+assert(params->has_multifd_compression_accel);
+monitor_printf(mon, "%s: %s\n",
+MigrationParameter_str(
+MIGRATION_PARAMETER_MULTIFD_COMPRESSION_ACCEL),
+MultiFDCompressionAccel_str(params->multifd_compression_accel));
 monitor_printf(mon, "%s: %" PRIu64 " bytes\n",
 MigrationParameter_str(MIGRATION_PARAMETER_XBZRLE_CACHE_SIZE),
 params->xbzrle_cache_size);
@@ -610,6 +615,11 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict 
*qdict)
 visit_type_MultiFDCompression(v, param, >multifd_compression,
   );
 break;
+case MIGRATION_PARAMETER_MULTIFD_COMPRESSION_ACCEL:
+p->has_multifd_compression_accel = true;
+visit_type_MultiFDCompressionAccel(v, param,
+   >multifd_compression_accel, 
);
+break;
 case MIGRATION_PARAMETER_MULTIFD_ZLIB_LEVEL:
 p->has_multifd_zlib_level = true;
 visit_type_uint8(v, param, >multifd_zlib_level, );
diff --git a/migration/options.c b/migration/options.c
index 42fb818956..6ef06d1816 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -59,6 +59,12 @@
 #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100)
 #define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2
 #define DEFAULT_MIGRATE_MULTIFD_COMPRESSION MULTIFD_COMPRESSION_NONE
+
+/*
+ * When the compression method is available and supported by the
+ * accelerator, data compre

[PATCH v3 2/4] multifd: Implement multifd compression accelerator

2024-01-03 Thread Yuan Liu
when starting multifd live migration, if the compression method is
enabled, compression method can be accelerated using accelerators.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Fabiano Rosas 
---
 migration/multifd.c | 40 ++--
 migration/multifd.h |  8 
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 1fe53d3b98..8ee083b691 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -165,6 +165,36 @@ static MultiFDMethods multifd_nocomp_ops = {
 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
 [MULTIFD_COMPRESSION_NONE] = _nocomp_ops,
 };
+static MultiFDAccelMethods *accel_multifd_ops[MULTIFD_COMPRESSION_ACCEL__MAX];
+
+static MultiFDMethods *get_multifd_ops(void)
+{
+MultiFDCompression comp = migrate_multifd_compression();
+MultiFDCompressionAccel accel = migrate_multifd_compression_accel();
+
+assert(comp < MULTIFD_COMPRESSION__MAX);
+assert(accel < MULTIFD_COMPRESSION_ACCEL__MAX);
+if (comp == MULTIFD_COMPRESSION_NONE ||
+accel == MULTIFD_COMPRESSION_ACCEL_NONE) {
+return multifd_ops[comp];
+}
+if (accel == MULTIFD_COMPRESSION_ACCEL_AUTO) {
+for (int i = 0; i < MULTIFD_COMPRESSION_ACCEL__MAX; i++) {
+if (accel_multifd_ops[i] &&
+accel_multifd_ops[i]->is_supported(comp)) {
+return accel_multifd_ops[i]->get_multifd_methods();
+}
+}
+return multifd_ops[comp];
+}
+
+/* Check if a specified accelerator is available */
+if (accel_multifd_ops[accel] &&
+accel_multifd_ops[accel]->is_supported(comp)) {
+return accel_multifd_ops[accel]->get_multifd_methods();
+}
+return multifd_ops[comp];
+}
 
 void multifd_register_ops(int method, MultiFDMethods *ops)
 {
@@ -172,6 +202,12 @@ void multifd_register_ops(int method, MultiFDMethods *ops)
 multifd_ops[method] = ops;
 }
 
+void multifd_register_accel_ops(int accel, MultiFDAccelMethods *ops)
+{
+assert(0 < accel && accel < MULTIFD_COMPRESSION_ACCEL__MAX);
+accel_multifd_ops[accel] = ops;
+}
+
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
 MultiFDInit_t msg = {};
@@ -922,7 +958,7 @@ int multifd_save_setup(Error **errp)
 multifd_send_state->pages = multifd_pages_init(page_count);
 qemu_sem_init(_send_state->channels_ready, 0);
 qatomic_set(_send_state->exiting, 0);
-multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
+multifd_send_state->ops = get_multifd_ops();
 
 for (i = 0; i < thread_count; i++) {
 MultiFDSendParams *p = _send_state->params[i];
@@ -1180,7 +1216,7 @@ int multifd_load_setup(Error **errp)
 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
 qatomic_set(_recv_state->count, 0);
 qemu_sem_init(_recv_state->sem_sync, 0);
-multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
+multifd_recv_state->ops = get_multifd_ops();
 
 for (i = 0; i < thread_count; i++) {
 MultiFDRecvParams *p = _recv_state->params[i];
diff --git a/migration/multifd.h b/migration/multifd.h
index a835643b48..c40ff79443 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -206,7 +206,15 @@ typedef struct {
 int (*recv_pages)(MultiFDRecvParams *p, Error **errp);
 } MultiFDMethods;
 
+typedef struct {
+/* Check if the compression method supports acceleration */
+bool (*is_supported) (MultiFDCompression compression);
+/* Get multifd methods of the accelerator */
+MultiFDMethods* (*get_multifd_methods)(void);
+} MultiFDAccelMethods;
+
 void multifd_register_ops(int method, MultiFDMethods *ops);
+void multifd_register_accel_ops(int accel, MultiFDAccelMethods *ops);
 
 #endif
 
-- 
2.39.3




[PATCH v3 3/4] configure: add qpl option

2024-01-03 Thread Yuan Liu
the Query Processing Library (QPL) is an open-source library that
supports data compression and decompression features.

add --enable-qpl and --disable-qpl options to enable and disable
the QPL compression accelerator. The QPL compression accelerator
can accelerate the Zlib compression algorithm during the live migration.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 meson.build   | 18 ++
 meson_options.txt |  2 ++
 scripts/meson-buildoptions.sh |  3 +++
 3 files changed, 23 insertions(+)

diff --git a/meson.build b/meson.build
index 259dc5f308..f2bb81f9cb 100644
--- a/meson.build
+++ b/meson.build
@@ -1032,6 +1032,22 @@ if not get_option('zstd').auto() or have_block
 required: get_option('zstd'),
 method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto()
+  libqpl = cc.find_library('qpl', required: false)
+  if not libqpl.found()
+error('libqpl not found, please install it from ' +
+
'https://intel.github.io/qpl/documentation/get_started_docs/installation.html')
+  endif
+  libaccel = cc.find_library('accel-config', required: false)
+  if not libaccel.found()
+error('libaccel-config not found, please install it from ' +
+'https://github.com/intel/idxd-config')
+  endif
+  qpl = declare_dependency(dependencies: [libqpl, libaccel,
+cc.find_library('dl', required: get_option('qpl'))],
+link_args: ['-lstdc++'])
+endif
 virgl = not_found
 
 have_vhost_user_gpu = have_tools and targetos == 'linux' and pixman.found()
@@ -2165,6 +2181,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', 
has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -4325,6 +4342,7 @@ summary_info += {'snappy support':snappy}
 summary_info += {'bzip2 support': libbzip2}
 summary_info += {'lzfse support': liblzfse}
 summary_info += {'zstd support':  zstd}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'NUMA host support': numa}
 summary_info += {'capstone':  capstone}
 summary_info += {'libpmem support':   libpmem}
diff --git a/meson_options.txt b/meson_options.txt
index 3c7398f3c6..71cd533985 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -255,6 +255,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+   description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 7ca4b77eae..0909d1d517 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -220,6 +220,7 @@ meson_options_help() {
   printf "%s\n" '  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon   xkbcommon support'
   printf "%s\n" '  zstdzstd compression support'
+  printf "%s\n" '  qpl Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -556,6 +557,8 @@ _meson_option_parse() {
 --disable-xkbcommon) printf "%s" -Dxkbcommon=disabled ;;
 --enable-zstd) printf "%s" -Dzstd=enabled ;;
 --disable-zstd) printf "%s" -Dzstd=disabled ;;
+--enable-qpl) printf "%s" -Dqpl=enabled ;;
+--disable-qpl) printf "%s" -Dqpl=disabled ;;
 *) return 1 ;;
   esac
 }
-- 
2.39.3




[PATCH v3 0/4] Live Migration Acceleration with IAA Compression

2024-01-03 Thread Yuan Liu
   |
 ++-+++-+--+

v2:   
  - add support for multifd compression accelerator
  - add support for the QPL accelerator in the multifd
compression accelerator
  - fixed the issue that QPL was compiled into the migration
module by default

v3:
  - use Meson instead of pkg-config to resolve QPL build
dependency issue
  - fix coding style
  - fix a CI issue for get_multifd_ops function in multifd.c file

Yuan Liu (4):
  migration: Introduce multifd-compression-accel parameter
  multifd: Implement multifd compression accelerator
  configure: add qpl option
  multifd: Introduce QPL compression accelerator

 hw/core/qdev-properties-system.c|  11 +
 include/hw/qdev-properties-system.h |   4 +
 meson.build |  18 ++
 meson_options.txt   |   2 +
 migration/meson.build   |   1 +
 migration/migration-hmp-cmds.c  |  10 +
 migration/multifd-qpl.c | 323 
 migration/multifd.c |  40 +++-
 migration/multifd.h |   8 +
 migration/options.c |  28 +++
 migration/options.h |   1 +
 qapi/migration.json |  31 ++-
 scripts/meson-buildoptions.sh   |   3 +
 13 files changed, 477 insertions(+), 3 deletions(-)
 create mode 100644 migration/multifd-qpl.c

-- 
2.39.3




[PATCH v3 4/4] multifd: Introduce QPL compression accelerator

2024-01-03 Thread Yuan Liu
Intel Query Processing Library (QPL) is an open-source library
for data compression, it supports the deflate compression algorithm,
compatible with Zlib and GZIP.

QPL supports both software compression and hardware compression.
Software compression is based on instruction optimization to accelerate
data compression, and it can be widely used on Intel CPUs. Hardware
compression utilizes the Intel In-Memory Analytics Accelerator (IAA)
hardware which is available on Intel Xeon Sapphire Rapids processors.

During multifd live migration, the QPL accelerator can be specified to
accelerate the Zlib compression algorithm. QPL can automatically choose
software or hardware acceleration based on the platform.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/meson.build   |   1 +
 migration/multifd-qpl.c | 323 
 2 files changed, 324 insertions(+)
 create mode 100644 migration/multifd-qpl.c

diff --git a/migration/meson.build b/migration/meson.build
index 92b1cc4297..c155c2d781 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -40,6 +40,7 @@ if get_option('live_block_migration').allowed()
   system_ss.add(files('block.c'))
 endif
 system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
+system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))
 
 specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
 if_true: files('ram.c',
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
new file mode 100644
index 00..88ebe87c09
--- /dev/null
+++ b/migration/multifd-qpl.c
@@ -0,0 +1,323 @@
+/*
+ * Multifd qpl compression accelerator implementation
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *
+ * Authors:
+ *  Yuan Liu
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/rcu.h"
+#include "exec/ramblock.h"
+#include "exec/target_page.h"
+#include "qapi/error.h"
+#include "migration.h"
+#include "trace.h"
+#include "options.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+#define MAX_BUF_SIZE (MULTIFD_PACKET_SIZE * 2)
+static bool support_compression_methods[MULTIFD_COMPRESSION__MAX];
+
+struct qpl_data {
+qpl_job *job;
+/* compressed data buffer */
+uint8_t *buf;
+/* decompressed data buffer */
+uint8_t *zbuf;
+};
+
+static int init_qpl(struct qpl_data *qpl, uint8_t channel_id,  Error **errp)
+{
+qpl_status status;
+qpl_path_t path = qpl_path_auto;
+uint32_t job_size = 0;
+
+status = qpl_get_job_size(path, _size);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multfd: %u: failed to get QPL size, error %d",
+   channel_id, status);
+return -1;
+}
+
+qpl->job = g_try_malloc0(job_size);
+if (!qpl->job) {
+error_setg(errp, "multfd: %u: failed to allocate QPL job", channel_id);
+return -1;
+}
+
+status = qpl_init_job(path, qpl->job);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multfd: %u: failed to init QPL hardware, error %d",
+   channel_id, status);
+return -1;
+}
+return 0;
+}
+
+static void deinit_qpl(struct qpl_data *qpl)
+{
+if (qpl->job) {
+qpl_fini_job(qpl->job);
+g_free(qpl->job);
+}
+}
+
+/**
+ * qpl_send_setup: setup send side
+ *
+ * Setup each channel with QPL compression.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int qpl_send_setup(MultiFDSendParams *p, Error **errp)
+{
+struct qpl_data *qpl = g_new0(struct qpl_data, 1);
+int flags = MAP_PRIVATE | MAP_POPULATE | MAP_ANONYMOUS;
+const char *err_msg;
+
+if (init_qpl(qpl, p->id, errp) != 0) {
+err_msg = "failed to initialize QPL\n";
+goto err_qpl_init;
+}
+qpl->zbuf = mmap(NULL, MAX_BUF_SIZE, PROT_READ | PROT_WRITE, flags, -1, 0);
+if (qpl->zbuf == MAP_FAILED) {
+err_msg = "failed to allocate QPL zbuf\n";
+goto err_zbuf_mmap;
+}
+p->data = qpl;
+return 0;
+
+err_zbuf_mmap:
+deinit_qpl(qpl);
+err_qpl_init:
+g_free(qpl);
+error_setg(errp, "multifd %u: %s", p->id, err_msg);
+return -1;
+}
+
+/**
+ * qpl_send_cleanup: cleanup send side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static void qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
+{
+struct qpl_data *qpl = p->data;
+
+deinit_qpl(qpl);
+if (qpl->zbuf) {
+munmap(qpl->zbuf, MAX_BUF_SIZE);
+qpl->zbuf = NULL;
+}
+g_free(p->data);
+p->data = NULL;
+}
+
+/**
+ * qpl_send_prepare: prep

[PATCH v6 4/7] migration/multifd: add qpl compression method

2024-05-06 Thread Yuan Liu
add the Query Processing Library (QPL) compression method

Introduce the qpl as a new multifd migration compression method, it can
use In-Memory Analytics Accelerator(IAA) to accelerate compression and
decompression, which can not only reduce network bandwidth requirement
but also reduce host compression and decompression CPU overhead.

How to enable qpl compression during migration:
migrate_set_parameter multifd-compression qpl

The qpl method only supports one compression level, there is no qpl
compression level parameter added, users do not need to specify the
qpl compression level.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 hw/core/qdev-properties-system.c |  2 +-
 migration/meson.build|  1 +
 migration/multifd-qpl.c  | 20 
 migration/multifd.h  |  1 +
 qapi/migration.json  |  7 ++-
 5 files changed, 29 insertions(+), 2 deletions(-)
 create mode 100644 migration/multifd-qpl.c

diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index d79d6f4b53..6ccd7224f6 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -659,7 +659,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
 const PropertyInfo qdev_prop_multifd_compression = {
 .name = "MultiFDCompression",
 .description = "multifd_compression values, "
-   "none/zlib/zstd",
+   "none/zlib/zstd/qpl",
 .enum_table = _lookup,
 .get = qdev_propinfo_get_enum,
 .set = qdev_propinfo_set_enum,
diff --git a/migration/meson.build b/migration/meson.build
index f76b1ba328..1d432d5328 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -43,6 +43,7 @@ if get_option('live_block_migration').allowed()
   system_ss.add(files('block.c'))
 endif
 system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
+system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))
 
 specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
 if_true: files('ram.c',
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
new file mode 100644
index 00..056a68a060
--- /dev/null
+++ b/migration/multifd-qpl.c
@@ -0,0 +1,20 @@
+/*
+ * Multifd qpl compression accelerator implementation
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *
+ * Authors:
+ *  Yuan Liu
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include "qemu/module.h"
+
+static void multifd_qpl_register(void)
+{
+/* noop */
+}
+
+migration_init(multifd_qpl_register);
diff --git a/migration/multifd.h b/migration/multifd.h
index c9d9b09239..5b7d9b15f8 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -40,6 +40,7 @@ MultiFDRecvData *multifd_get_recv_data(void);
 #define MULTIFD_FLAG_NOCOMP (0 << 1)
 #define MULTIFD_FLAG_ZLIB (1 << 1)
 #define MULTIFD_FLAG_ZSTD (2 << 1)
+#define MULTIFD_FLAG_QPL (4 << 1)
 
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
diff --git a/qapi/migration.json b/qapi/migration.json
index 8c65b90328..854e8609bd 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -628,11 +628,16 @@
 #
 # @zstd: use zstd compression method.
 #
+# @qpl: use qpl compression method. Query Processing Library(qpl) is based on
+#   the deflate compression algorithm and use the Intel In-Memory Analytics
+#   Accelerator(IAA) accelerated compression and decompression. (Since 9.1)
+#
 # Since: 5.0
 ##
 { 'enum': 'MultiFDCompression',
   'data': [ 'none', 'zlib',
-{ 'name': 'zstd', 'if': 'CONFIG_ZSTD' } ] }
+{ 'name': 'zstd', 'if': 'CONFIG_ZSTD' },
+{ 'name': 'qpl', 'if': 'CONFIG_QPL' } ] }
 
 ##
 # @MigMode:
-- 
2.39.3




[PATCH v6 6/7] migration/multifd: implement qpl compression and decompression

2024-05-06 Thread Yuan Liu
each qpl job is used to (de)compress a normal page and it can
be processed independently by the IAA hardware. All qpl jobs
are submitted to the hardware at once, and wait for all jobs
completion. If hardware path(IAA) is not available, use software
for compression and decompression.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 284 +++-
 1 file changed, 280 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 89fa51091a..9a1fddbdd0 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,6 +13,7 @@
 #include "qemu/osdep.h"
 #include "qemu/module.h"
 #include "qapi/error.h"
+#include "exec/ramblock.h"
 #include "migration.h"
 #include "multifd.h"
 #include "qpl/qpl.h"
@@ -204,6 +205,139 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams 
*p, Error **errp)
 p->iov = NULL;
 }
 
+/**
+ * multifd_qpl_prepare_job: prepare a compression or decompression job
+ *
+ * Prepare a compression or decompression job and configure job attributes
+ * including job compression level and flags.
+ *
+ * @job: pointer to the QplData structure
+ * @is_compression: compression or decompression indication
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the size of the output data buffer
+ */
+static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
+uint8_t *input, uint32_t input_len,
+uint8_t *output, uint32_t output_len)
+{
+job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+job->next_in_ptr = input;
+job->next_out_ptr = output;
+job->available_in = input_len;
+job->available_out = output_len;
+job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+/* only supports one compression level */
+job->level = 1;
+}
+
+/**
+ * multifd_qpl_build_packet: build a qpl compressed data packet
+ *
+ * The qpl compressed data packet consists of two parts, one part stores
+ * the compressed length of each page, and the other part is the compressed
+ * data of each page. The zbuf_hdr stores the compressed length of all pages,
+ * and use a separate IOV to store the compressed data of each page.
+ *
+ * @qpl: pointer to the QplData structure
+ * @p: Params for the channel that we are using
+ * @idx: The index of the compressed length array
+ * @addr: pointer to the compressed data
+ * @len: The length of the compressed data
+ */
+static void multifd_qpl_build_packet(QplData *qpl, MultiFDSendParams *p,
+ uint32_t idx, uint8_t *addr, uint32_t len)
+{
+qpl->zbuf_hdr[idx] = cpu_to_be32(len);
+p->iov[p->iovs_num].iov_base = addr;
+p->iov[p->iovs_num].iov_len = len;
+p->iovs_num++;
+p->next_packet_size += len;
+}
+
+/**
+ * multifd_qpl_compress_pages: compress normal pages
+ *
+ * Each normal page will be compressed independently, and the compression jobs
+ * will be submitted to the IAA hardware in non-blocking mode, waiting for all
+ * jobs to be completed and filling the compressed length and data into the
+ * sending IOVs. If IAA device is not available, the software path is used.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_compress_pages(MultiFDSendParams *p, Error **errp)
+{
+qpl_status status;
+QplData *qpl = p->compress_data;
+MultiFDPages_t *pages = p->pages;
+uint8_t *zbuf = qpl->zbuf;
+uint8_t *host = pages->block->host;
+uint32_t job_num = pages->normal_num;
+qpl_job *job = NULL;
+
+assert(job_num <= qpl->total_job_num);
+/* submit all compression jobs */
+for (int i = 0; i < job_num; i++) {
+job = qpl->job_array[i];
+multifd_qpl_prepare_job(job, true, host + pages->offset[i],
+p->page_size, zbuf, p->page_size - 1);
+/* if hardware path(IAA) is unavailable, call the software path */
+if (!qpl->iaa_avail) {
+status = qpl_execute_job(job);
+if (status == QPL_STS_OK) {
+multifd_qpl_build_packet(qpl, p, i, zbuf, job->total_out);
+} else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+/* compressed length exceeds page size, send page directly */
+multifd_qpl_build_packet(qpl, p, i, host + pages->offset[i],
+ p->page_size);
+} else {
+error_setg(errp, "multifd %u: qpl_execute_job error %d",
+   p->id, status);
+

[PATCH v6 2/7] migration/multifd: put IOV initialization into compression method

2024-05-06 Thread Yuan Liu
Different compression methods may require different numbers of IOVs.
Based on streaming compression of zlib and zstd, all pages will be
compressed to a data block, so two IOVs are needed for packet header
and compressed data block.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-zlib.c |  7 +++
 migration/multifd-zstd.c |  8 +++-
 migration/multifd.c  | 22 --
 3 files changed, 26 insertions(+), 11 deletions(-)

diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 737a9645d2..2ced69487e 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -70,6 +70,10 @@ static int zlib_send_setup(MultiFDSendParams *p, Error 
**errp)
 goto err_free_zbuff;
 }
 p->compress_data = z;
+
+/* Needs 2 IOVs, one for packet header and one for compressed data */
+p->iov = g_new0(struct iovec, 2);
+
 return 0;
 
 err_free_zbuff:
@@ -101,6 +105,9 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error 
**errp)
 z->buf = NULL;
 g_free(p->compress_data);
 p->compress_data = NULL;
+
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index 256858df0a..ca17b7e310 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -52,7 +52,6 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
 struct zstd_data *z = g_new0(struct zstd_data, 1);
 int res;
 
-p->compress_data = z;
 z->zcs = ZSTD_createCStream();
 if (!z->zcs) {
 g_free(z);
@@ -77,6 +76,10 @@ static int zstd_send_setup(MultiFDSendParams *p, Error 
**errp)
 error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
 return -1;
 }
+p->compress_data = z;
+
+/* Needs 2 IOVs, one for packet header and one for compressed data */
+p->iov = g_new0(struct iovec, 2);
 return 0;
 }
 
@@ -98,6 +101,9 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error 
**errp)
 z->zbuff = NULL;
 g_free(p->compress_data);
 p->compress_data = NULL;
+
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
diff --git a/migration/multifd.c b/migration/multifd.c
index f317bff077..d82885fdbb 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -137,6 +137,13 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error 
**errp)
 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
 }
 
+if (multifd_use_packets()) {
+/* We need one extra place for the packet header */
+p->iov = g_new0(struct iovec, p->page_count + 1);
+} else {
+p->iov = g_new0(struct iovec, p->page_count);
+}
+
 return 0;
 }
 
@@ -150,6 +157,8 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error 
**errp)
  */
 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
 {
+g_free(p->iov);
+p->iov = NULL;
 return;
 }
 
@@ -228,6 +237,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error 
**errp)
  */
 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
 {
+p->iov = g_new0(struct iovec, p->page_count);
 return 0;
 }
 
@@ -240,6 +250,8 @@ static int nocomp_recv_setup(MultiFDRecvParams *p, Error 
**errp)
  */
 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
 {
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
@@ -783,8 +795,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams 
*p, Error **errp)
 p->packet_len = 0;
 g_free(p->packet);
 p->packet = NULL;
-g_free(p->iov);
-p->iov = NULL;
 multifd_send_state->ops->send_cleanup(p, errp);
 
 return *errp == NULL;
@@ -1179,11 +1189,6 @@ bool multifd_send_setup(void)
 p->packet = g_malloc0(p->packet_len);
 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
-
-/* We need one extra place for the packet header */
-p->iov = g_new0(struct iovec, page_count + 1);
-} else {
-p->iov = g_new0(struct iovec, page_count);
 }
 p->name = g_strdup_printf("multifdsend_%d", i);
 p->page_size = qemu_target_page_size();
@@ -1353,8 +1358,6 @@ static void 
multifd_recv_cleanup_channel(MultiFDRecvParams *p)
 p->packet_len = 0;
 g_free(p->packet);
 p->packet = NULL;
-g_free(p->iov);
-p->iov = NULL;
 g_free(p->normal);
 p->normal = NULL;
 g_free(p->zero);
@@ -1602,7 +1605,6 @@ int multifd_recv_setup(Error **errp)
 p->packet = g_malloc0(p->packet_len);
 }
 p->name = g_strdup_printf("multifdrecv_%d", i);
-p->iov = g_new0(struct iovec, page_count);
 p->normal = g_new0(ram_addr_t, page_count);
 p->zero = g_new0(ram_addr_t, page_count);
 p->page_count = page_count;
-- 
2.39.3




[PATCH v6 3/7] configure: add --enable-qpl build option

2024-05-06 Thread Yuan Liu
add --enable-qpl and --disable-qpl options to enable and disable
the QPL compression method for multifd migration.

The Query Processing Library (QPL) is an open-source library
that supports data compression and decompression features. It
is based on the deflate compression algorithm and use Intel
In-Memory Analytics Accelerator(IAA) hardware for compression
and decompression acceleration.

For more live migration with IAA, please refer to the document
docs/devel/migration/qpl-compression.rst

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 meson.build   | 8 
 meson_options.txt | 2 ++
 scripts/meson-buildoptions.sh | 3 +++
 3 files changed, 13 insertions(+)

diff --git a/meson.build b/meson.build
index 5db2dbc12e..2a8d8385fe 100644
--- a/meson.build
+++ b/meson.build
@@ -1204,6 +1204,12 @@ if not get_option('zstd').auto() or have_block
 required: get_option('zstd'),
 method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto() or have_system
+  qpl = dependency('qpl', version: '>=1.5.0',
+required: get_option('qpl'),
+method: 'pkg-config')
+endif
 virgl = not_found
 
 have_vhost_user_gpu = have_tools and host_os == 'linux' and pixman.found()
@@ -2309,6 +2315,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', 
has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -4436,6 +4443,7 @@ summary_info += {'snappy support':snappy}
 summary_info += {'bzip2 support': libbzip2}
 summary_info += {'lzfse support': liblzfse}
 summary_info += {'zstd support':  zstd}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'NUMA host support': numa}
 summary_info += {'capstone':  capstone}
 summary_info += {'libpmem support':   libpmem}
diff --git a/meson_options.txt b/meson_options.txt
index adc77bae0c..562db29ab4 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -259,6 +259,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+   description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 0a29d35fdb..26bf9e21fd 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -222,6 +222,7 @@ meson_options_help() {
   printf "%s\n" '  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon   xkbcommon support'
   printf "%s\n" '  zstdzstd compression support'
+  printf "%s\n" '  qpl Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -562,6 +563,8 @@ _meson_option_parse() {
 --disable-xkbcommon) printf "%s" -Dxkbcommon=disabled ;;
 --enable-zstd) printf "%s" -Dzstd=enabled ;;
 --disable-zstd) printf "%s" -Dzstd=disabled ;;
+--enable-qpl) printf "%s" -Dqpl=enabled ;;
+--disable-qpl) printf "%s" -Dqpl=disabled ;;
 *) return 1 ;;
   esac
 }
-- 
2.39.3




[PATCH v6 1/7] docs/migration: add qpl compression feature

2024-05-06 Thread Yuan Liu
add Intel Query Processing Library (QPL) compression method
introduction

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 docs/devel/migration/features.rst|   1 +
 docs/devel/migration/qpl-compression.rst | 262 +++
 2 files changed, 263 insertions(+)
 create mode 100644 docs/devel/migration/qpl-compression.rst

diff --git a/docs/devel/migration/features.rst 
b/docs/devel/migration/features.rst
index d5ca7b86d5..bc98b65075 100644
--- a/docs/devel/migration/features.rst
+++ b/docs/devel/migration/features.rst
@@ -12,3 +12,4 @@ Migration has plenty of features to support different use 
cases.
virtio
mapped-ram
CPR
+   qpl-compression
diff --git a/docs/devel/migration/qpl-compression.rst 
b/docs/devel/migration/qpl-compression.rst
new file mode 100644
index 00..13fb7a67b1
--- /dev/null
+++ b/docs/devel/migration/qpl-compression.rst
@@ -0,0 +1,262 @@
+===
+QPL Compression
+===
+The Intel Query Processing Library (Intel ``QPL``) is an open-source library to
+provide compression and decompression features and it is based on deflate
+compression algorithm (RFC 1951).
+
+The ``QPL`` compression relies on Intel In-Memory Analytics 
Accelerator(``IAA``)
+and Shared Virtual Memory(``SVM``) technology, they are new features supported
+from Intel 4th Gen Intel Xeon Scalable processors, codenamed Sapphire Rapids
+processor(``SPR``).
+
+For more ``QPL`` introduction, please refer to `QPL Introduction
+<https://intel.github.io/qpl/documentation/introduction_docs/introduction.html>`_
+
+QPL Compression Framework
+=
+
+::
+
+  ++   +--+
+  | MultiFD Thread |   |accel-config tool |
+  +---++   ++-+
+  | |
+  | |
+  |compress/decompress  |
+  +---++| Setup IAA
+  |  QPL library   || Resources
+  +---+---++|
+  |   | |
+  |   +-+---+
+  |   Open IAA  |
+  |   Devices +-+-+
+  |   |idxd driver|
+  |   +-+-+
+  | |
+  | |
+  |   +-+-+
+  +---+IAA Devices|
+  Submit jobs +---+
+  via enqcmd
+
+
+QPL Build And Installation
+--
+
+.. code-block:: shell
+
+  $git clone --recursive https://github.com/intel/qpl.git qpl
+  $mkdir qpl/build
+  $cd qpl/build
+  $cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr 
-DQPL_LIBRARY_TYPE=SHARED ..
+  $sudo cmake --build . --target install
+
+For more details about ``QPL`` installation, please refer to `QPL Installation
+<https://intel.github.io/qpl/documentation/get_started_docs/installation.html>`_
+
+IAA Device Management
+-
+
+The number of ``IAA`` devices will vary depending on the Xeon product model.
+On a ``SPR`` server, there can be a maximum of 8 ``IAA`` devices, with up to
+4 devices per socket.
+
+By default, all ``IAA`` devices are disabled and need to be configured and
+enabled by users manually.
+
+Check the number of devices through the following command
+
+.. code-block:: shell
+
+  #lspci -d 8086:0cfe
+  6a:02.0 System peripheral: Intel Corporation Device 0cfe
+  6f:02.0 System peripheral: Intel Corporation Device 0cfe
+  74:02.0 System peripheral: Intel Corporation Device 0cfe
+  79:02.0 System peripheral: Intel Corporation Device 0cfe
+  e7:02.0 System peripheral: Intel Corporation Device 0cfe
+  ec:02.0 System peripheral: Intel Corporation Device 0cfe
+  f1:02.0 System peripheral: Intel Corporation Device 0cfe
+  f6:02.0 System peripheral: Intel Corporation Device 0cfe
+
+IAA Device Configuration And Enabling
+^
+
+The ``accel-config`` tool is used to enable ``IAA`` devices and configure
+``IAA`` hardware resources(work queues and engines). One ``IAA`` device
+has 8 work queues and 8 processing engines, multiple engines can be assigned
+to a work queue via ``group`` attribute.
+
+For ``accel-config`` installation, please refer to `accel-config installation
+<https://github.com/intel/idxd-config>`_
+
+One example of configuring and enabling an ``IAA`` device.
+
+.. code-block:: shell
+
+  #accel-config config-engine iax1/engine1.0 -g 0
+  #accel-config config-engine iax1/engine1.1 -g 0
+  #accel-config config-engine iax1/engine1.2 -g 0
+  #accel-config config-engine iax1/engine1.3 -g 0
+  #accel-config config-engine iax1/engine1.4 -g 0
+  #accel-config config-engine iax1/engine1.5 -g 0
+  #accel-config config-engine iax1/engine1.6 -g 0
+  #accel-config config-engine iax1/engine1.7 -g 0
+  #accel-config config-wq iax1/wq1.0 -g 0 -s 128 -p 10 -b 1 -t 128 -m shared 
-y user -n app1 -d user
+  #accel-config enable-device iax1
+  #acce

[PATCH v6 7/7] tests/migration-test: add qpl compression test

2024-05-06 Thread Yuan Liu
add qpl to compression method test for multifd migration

the qpl compression supports software path and hardware
path(IAA device), and the hardware path is used first by
default. If the hardware path is unavailable, it will
automatically fallback to the software path for testing.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 tests/qtest/migration-test.c | 24 
 1 file changed, 24 insertions(+)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 5d6d8cd634..0f75ed7c49 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -2777,6 +2777,15 @@ test_migrate_precopy_tcp_multifd_zstd_start(QTestState 
*from,
 }
 #endif /* CONFIG_ZSTD */
 
+#ifdef CONFIG_QPL
+static void *
+test_migrate_precopy_tcp_multifd_qpl_start(QTestState *from,
+QTestState *to)
+{
+return test_migrate_precopy_tcp_multifd_start_common(from, to, "qpl");
+}
+#endif /* CONFIG_QPL */
+
 static void test_multifd_tcp_uri_none(void)
 {
 MigrateCommon args = {
@@ -2857,6 +2866,17 @@ static void test_multifd_tcp_zstd(void)
 }
 #endif
 
+#ifdef CONFIG_QPL
+static void test_multifd_tcp_qpl(void)
+{
+MigrateCommon args = {
+.listen_uri = "defer",
+.start_hook = test_migrate_precopy_tcp_multifd_qpl_start,
+};
+test_precopy_common();
+}
+#endif
+
 #ifdef CONFIG_GNUTLS
 static void *
 test_migrate_multifd_tcp_tls_psk_start_match(QTestState *from,
@@ -3760,6 +3780,10 @@ int main(int argc, char **argv)
 migration_test_add("/migration/multifd/tcp/plain/zstd",
test_multifd_tcp_zstd);
 #endif
+#ifdef CONFIG_QPL
+migration_test_add("/migration/multifd/tcp/plain/qpl",
+   test_multifd_tcp_qpl);
+#endif
 #ifdef CONFIG_GNUTLS
 migration_test_add("/migration/multifd/tcp/tls/psk/match",
test_multifd_tcp_tls_psk_match);
-- 
2.39.3




[PATCH v6 5/7] migration/multifd: implement initialization of qpl compression

2024-05-06 Thread Yuan Liu
the qpl initialization includes memory allocation for compressed
data and the qpl job initialization.

the qpl job initialization will check if the In-Memory Analytics
Accelerator(IAA) device is available and use the IAA device first.
If the platform does not have IAA device or the IAA device is not
available, the qpl compression will fallback to the software path.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 272 +++-
 1 file changed, 271 insertions(+), 1 deletion(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 056a68a060..89fa51091a 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -9,12 +9,282 @@
  * This work is licensed under the terms of the GNU GPL, version 2 or later.
  * See the COPYING file in the top-level directory.
  */
+
 #include "qemu/osdep.h"
 #include "qemu/module.h"
+#include "qapi/error.h"
+#include "migration.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+typedef struct {
+qpl_job **job_array;
+/* the number of allocated jobs */
+uint32_t total_job_num;
+/* compressed data buffer */
+uint8_t *zbuf;
+/* the length of compressed data */
+uint32_t *zbuf_hdr;
+/* the status of IAA device */
+bool iaa_avail;
+} QplData;
+
+/**
+ * check_iaa_avail: check if IAA device is available
+ *
+ * If the system does not have an IAA device, the IAA device is
+ * not enabled or the IAA work queue is not configured as a shared
+ * mode, the QPL hardware path initialization will fail.
+ *
+ * Returns true if IAA device is available, otherwise false.
+ */
+static bool check_iaa_avail(void)
+{
+qpl_job *job = NULL;
+uint32_t job_size = 0;
+qpl_path_t path = qpl_path_hardware;
+
+if (qpl_get_job_size(path, _size) != QPL_STS_OK) {
+return false;
+}
+job = g_malloc0(job_size);
+if (qpl_init_job(path, job) != QPL_STS_OK) {
+g_free(job);
+return false;
+}
+g_free(job);
+return true;
+}
+
+/**
+ * multifd_qpl_free_jobs: cleanup jobs
+ *
+ * Free all job resources.
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_free_jobs(QplData *qpl)
+{
+assert(qpl != NULL);
+for (int i = 0; i < qpl->total_job_num; i++) {
+qpl_fini_job(qpl->job_array[i]);
+g_free(qpl->job_array[i]);
+qpl->job_array[i] = NULL;
+}
+g_free(qpl->job_array);
+qpl->job_array = NULL;
+}
+
+/**
+ * multifd_qpl_init_jobs: initialize jobs
+ *
+ * Initialize all jobs
+ *
+ * @qpl: pointer to the QplData structure
+ * @chan_id: multifd channel number
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error **errp)
+{
+qpl_path_t path;
+qpl_status status;
+uint32_t job_size = 0;
+qpl_job *job = NULL;
+
+path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
+status = qpl_get_job_size(path, _size);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
+   chan_id, status);
+return -1;
+}
+qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
+for (int i = 0; i < qpl->total_job_num; i++) {
+job = g_malloc0(job_size);
+status = qpl_init_job(path, job);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
+   chan_id, status);
+multifd_qpl_free_jobs(qpl);
+return -1;
+}
+qpl->job_array[i] = job;
+}
+return 0;
+}
+
+/**
+ * multifd_qpl_init: initialize QplData structure
+ *
+ * Allocate and initialize a QplData structure
+ *
+ * Returns QplData pointer for success or NULL for error
+ *
+ * @job_num: pointer to the QplData structure
+ * @job_size: the buffer size of the job
+ * @chan_id: multifd channel number
+ * @errp: pointer to an error
+ */
+static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
+ uint8_t chan_id, Error **errp)
+{
+QplData *qpl;
+
+qpl = g_new0(QplData, 1);
+qpl->total_job_num = job_num;
+qpl->iaa_avail = check_iaa_avail();
+if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
+g_free(qpl);
+return NULL;
+}
+qpl->zbuf = g_malloc0(job_size * job_num);
+qpl->zbuf_hdr = g_new0(uint32_t, job_num);
+return qpl;
+}
+
+/**
+ * multifd_qpl_deinit: cleanup QplData structure
+ *
+ * Free jobs, comprssed buffers and QplData structure
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_deinit(QplData *qpl)
+{
+if (qpl != NULL) {
+multifd_qpl_free_jobs(qpl);
+g_free(qpl->zbuf_hdr);
+g_free(qpl->zbuf);
+g_free(qpl);
+}
+}
+
+/**
+ * multifd_qpl_send

[PATCH v5 7/7] tests/migration-test: add qpl compression test

2024-03-20 Thread Yuan Liu
add qpl to compression method test for multifd migration

the migration with qpl compression needs to access IAA hardware
resource, please run "check-qtest" with sudo or root permission,
otherwise migration test will fail

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 tests/qtest/migration-test.c | 24 
 1 file changed, 24 insertions(+)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 71895abb7f..052d0d60fd 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -2815,6 +2815,15 @@ test_migrate_precopy_tcp_multifd_zstd_start(QTestState 
*from,
 }
 #endif /* CONFIG_ZSTD */
 
+#ifdef CONFIG_QPL
+static void *
+test_migrate_precopy_tcp_multifd_qpl_start(QTestState *from,
+QTestState *to)
+{
+return test_migrate_precopy_tcp_multifd_start_common(from, to, "qpl");
+}
+#endif /* CONFIG_QPL */
+
 static void test_multifd_tcp_none(void)
 {
 MigrateCommon args = {
@@ -2880,6 +2889,17 @@ static void test_multifd_tcp_zstd(void)
 }
 #endif
 
+#ifdef CONFIG_QPL
+static void test_multifd_tcp_qpl(void)
+{
+MigrateCommon args = {
+.listen_uri = "defer",
+.start_hook = test_migrate_precopy_tcp_multifd_qpl_start,
+};
+test_precopy_common();
+}
+#endif
+
 #ifdef CONFIG_GNUTLS
 static void *
 test_migrate_multifd_tcp_tls_psk_start_match(QTestState *from,
@@ -3789,6 +3809,10 @@ int main(int argc, char **argv)
 migration_test_add("/migration/multifd/tcp/plain/zstd",
test_multifd_tcp_zstd);
 #endif
+#ifdef CONFIG_QPL
+migration_test_add("/migration/multifd/tcp/plain/qpl",
+   test_multifd_tcp_qpl);
+#endif
 #ifdef CONFIG_GNUTLS
 migration_test_add("/migration/multifd/tcp/tls/psk/match",
test_multifd_tcp_tls_psk_match);
-- 
2.39.3




[PATCH v5 5/7] migration/multifd: implement initialization of qpl compression

2024-03-20 Thread Yuan Liu
the qpl initialization includes memory allocation for compressed
data and the qpl job initialization.

the qpl initialization will check whether the In-Memory Analytics
Accelerator(IAA) hardware is available, if the platform does not
have IAA hardware or the IAA hardware is not available, the QPL
compression initialization will fail.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 243 +++-
 1 file changed, 242 insertions(+), 1 deletion(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 056a68a060..6de65e9da7 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -9,12 +9,253 @@
  * This work is licensed under the terms of the GNU GPL, version 2 or later.
  * See the COPYING file in the top-level directory.
  */
+
 #include "qemu/osdep.h"
 #include "qemu/module.h"
+#include "qapi/error.h"
+#include "migration.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+typedef struct {
+qpl_job **job_array;
+/* the number of allocated jobs */
+uint32_t job_num;
+/* the size of data processed by a qpl job */
+uint32_t data_size;
+/* compressed data buffer */
+uint8_t *zbuf;
+/* the length of compressed data */
+uint32_t *zbuf_hdr;
+} QplData;
+
+static void free_zbuf(QplData *qpl)
+{
+if (qpl->zbuf != NULL) {
+munmap(qpl->zbuf, qpl->job_num * qpl->data_size);
+qpl->zbuf = NULL;
+}
+if (qpl->zbuf_hdr != NULL) {
+g_free(qpl->zbuf_hdr);
+qpl->zbuf_hdr = NULL;
+}
+}
+
+static int alloc_zbuf(QplData *qpl, uint8_t chan_id, Error **errp)
+{
+int flags = MAP_PRIVATE | MAP_POPULATE | MAP_ANONYMOUS;
+uint32_t size = qpl->job_num * qpl->data_size;
+uint8_t *buf;
+
+buf = (uint8_t *) mmap(NULL, size, PROT_READ | PROT_WRITE, flags, -1, 0);
+if (buf == MAP_FAILED) {
+error_setg(errp, "multifd: %u: alloc_zbuf failed, job num %u, size %u",
+   chan_id, qpl->job_num, qpl->data_size);
+return -1;
+}
+qpl->zbuf = buf;
+qpl->zbuf_hdr = g_new0(uint32_t, qpl->job_num);
+return 0;
+}
+
+static void free_jobs(QplData *qpl)
+{
+for (int i = 0; i < qpl->job_num; i++) {
+qpl_fini_job(qpl->job_array[i]);
+g_free(qpl->job_array[i]);
+qpl->job_array[i] = NULL;
+}
+g_free(qpl->job_array);
+qpl->job_array = NULL;
+}
+
+static int alloc_jobs(QplData *qpl, uint8_t chan_id, Error **errp)
+{
+qpl_status status;
+uint32_t job_size = 0;
+qpl_job *job = NULL;
+/* always use IAA hardware accelerator */
+qpl_path_t path = qpl_path_hardware;
+
+status = qpl_get_job_size(path, _size);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
+   chan_id, status);
+return -1;
+}
+qpl->job_array = g_new0(qpl_job *, qpl->job_num);
+for (int i = 0; i < qpl->job_num; i++) {
+job = g_malloc0(job_size);
+status = qpl_init_job(path, job);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
+   chan_id, status);
+free_jobs(qpl);
+return -1;
+}
+qpl->job_array[i] = job;
+}
+return 0;
+}
+
+static int init_qpl(QplData *qpl, uint32_t job_num, uint32_t data_size,
+uint8_t chan_id, Error **errp)
+{
+qpl->job_num = job_num;
+qpl->data_size = data_size;
+if (alloc_zbuf(qpl, chan_id, errp) != 0) {
+return -1;
+}
+if (alloc_jobs(qpl, chan_id, errp) != 0) {
+free_zbuf(qpl);
+return -1;
+}
+return 0;
+}
+
+static void deinit_qpl(QplData *qpl)
+{
+if (qpl != NULL) {
+free_jobs(qpl);
+free_zbuf(qpl);
+qpl->job_num = 0;
+qpl->data_size = 0;
+}
+}
+
+/**
+ * qpl_send_setup: setup send side
+ *
+ * Setup each channel with QPL compression.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int qpl_send_setup(MultiFDSendParams *p, Error **errp)
+{
+QplData *qpl;
+
+qpl = g_new0(QplData, 1);
+if (init_qpl(qpl, p->page_count, p->page_size, p->id, errp) != 0) {
+g_free(qpl);
+return -1;
+}
+p->compress_data = qpl;
+
+assert(p->iov == NULL);
+/*
+ * Each page will be compressed independently and sent using an IOV. The
+ * additional two IOVs are used to store packet header and compressed data
+ * length
+ */
+p->iov = g_new0(struct iovec, p->page_count + 2);
+return 0;
+}
+
+/**
+ * qpl_send_cleanup: cleanup send side
+ *
+ * Close the channel and return memory.
+ *
+ * @

[PATCH v5 1/7] docs/migration: add qpl compression feature

2024-03-20 Thread Yuan Liu
add Intel Query Processing Library (QPL) compression method
introduction

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 docs/devel/migration/features.rst|   1 +
 docs/devel/migration/qpl-compression.rst | 231 +++
 2 files changed, 232 insertions(+)
 create mode 100644 docs/devel/migration/qpl-compression.rst

diff --git a/docs/devel/migration/features.rst 
b/docs/devel/migration/features.rst
index d5ca7b86d5..bc98b65075 100644
--- a/docs/devel/migration/features.rst
+++ b/docs/devel/migration/features.rst
@@ -12,3 +12,4 @@ Migration has plenty of features to support different use 
cases.
virtio
mapped-ram
CPR
+   qpl-compression
diff --git a/docs/devel/migration/qpl-compression.rst 
b/docs/devel/migration/qpl-compression.rst
new file mode 100644
index 00..42c7969d30
--- /dev/null
+++ b/docs/devel/migration/qpl-compression.rst
@@ -0,0 +1,231 @@
+===
+QPL Compression
+===
+The Intel Query Processing Library (Intel ``QPL``) is an open-source library to
+provide compression and decompression features and it is based on deflate
+compression algorithm (RFC 1951).
+
+The ``QPL`` compression relies on Intel In-Memory Analytics 
Accelerator(``IAA``)
+and Shared Virtual Memory(``SVM``) technology, they are new features supported
+from Intel 4th Gen Intel Xeon Scalable processors, codenamed Sapphire Rapids
+processor(``SPR``).
+
+For more ``QPL`` introduction, please refer to:
+
+https://intel.github.io/qpl/documentation/introduction_docs/introduction.html
+
+QPL Compression Framework
+=
+
+::
+
+  ++   +--+
+  | MultiFD Service|   |accel-config tool |
+  +---++   ++-+
+  | |
+  | |
+  +---++| Setup IAA
+  |  QPL library   || Resources
+  +---+---++|
+  |   | |
+  |   +-+---+
+  |   Open IAA  |
+  |   Devices +-+-+
+  |   |idxd driver|
+  |   +-+-+
+  | |
+  | |
+  |   +-+-+
+  +---+IAA Devices|
+  Submit jobs +---+
+  via enqcmd
+
+
+Intel In-Memory Analytics Accelerator (Intel IAA) Introduction
+
+
+Intel ``IAA`` is an accelerator that has been designed to help benefit
+in-memory databases and analytic workloads. There are three main areas
+that Intel ``IAA`` can assist with analytics primitives (scan, filter, etc.),
+sparse data compression and memory tiering.
+
+``IAA`` Manual Documentation:
+
+https://www.intel.com/content/www/us/en/content-details/721858/intel-in-memory-analytics-accelerator-architecture-specification
+
+IAA Device Enabling
+---
+
+- Enabling ``IAA`` devices for platform configuration, please refer to:
+
+https://www.intel.com/content/www/us/en/content-details/780887/intel-in-memory-analytics-accelerator-intel-iaa.html
+
+- ``IAA`` device driver is ``Intel Data Accelerator Driver (idxd)``, it is
+  recommended that the minimum version of Linux kernel is 5.18.
+
+- Add ``"intel_iommu=on,sm_on"`` parameter to kernel command line
+  for ``SVM`` feature enabling.
+
+Here is an easy way to verify ``IAA`` device driver and ``SVM``, refer to:
+
+https://github.com/intel/idxd-config/tree/stable/test
+
+IAA Device Management
+-
+
+The number of ``IAA`` devices will vary depending on the Xeon product model.
+On a ``SPR`` server, there can be a maximum of 8 ``IAA`` devices, with up to
+4 devices per socket.
+
+By default, all ``IAA`` devices are disabled and need to be configured and
+enabled by users manually.
+
+Check the number of devices through the following command
+
+.. code-block:: shell
+
+  # lspci -d 8086:0cfe
+  # 6a:02.0 System peripheral: Intel Corporation Device 0cfe
+  # 6f:02.0 System peripheral: Intel Corporation Device 0cfe
+  # 74:02.0 System peripheral: Intel Corporation Device 0cfe
+  # 79:02.0 System peripheral: Intel Corporation Device 0cfe
+  # e7:02.0 System peripheral: Intel Corporation Device 0cfe
+  # ec:02.0 System peripheral: Intel Corporation Device 0cfe
+  # f1:02.0 System peripheral: Intel Corporation Device 0cfe
+  # f6:02.0 System peripheral: Intel Corporation Device 0cfe
+
+IAA Device Configuration
+
+
+The ``accel-config`` tool is used to enable ``IAA`` devices and configure
+``IAA`` hardware resources(work queues and engines). One ``IAA`` device
+has 8 work queues and 8 processing engines, multiple engines can be assigned
+to a work queue via ``group`` attribute.
+
+One example of configuring and enabling an ``IAA`` device.
+
+.. code-block:: shell
+
+  # accel-config config-engine iax1/engine1.0 -g 0

[PATCH v5 4/7] migration/multifd: add qpl compression method

2024-03-20 Thread Yuan Liu
add the Query Processing Library (QPL) compression method

Although both qpl and zlib support deflate compression, qpl will
only use the In-Memory Analytics Accelerator(IAA) for compression
and decompression, and IAA is not compatible with the Zlib in
migration, so qpl is used as a new compression method for migration.

How to enable qpl compression during migration:
migrate_set_parameter multifd-compression qpl

The qpl only supports one compression level, there is no qpl
compression level parameter added, users do not need to specify
the qpl compression level.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 hw/core/qdev-properties-system.c |  2 +-
 migration/meson.build|  1 +
 migration/multifd-qpl.c  | 20 
 migration/multifd.h  |  1 +
 qapi/migration.json  |  7 ++-
 5 files changed, 29 insertions(+), 2 deletions(-)
 create mode 100644 migration/multifd-qpl.c

diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index d79d6f4b53..6ccd7224f6 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -659,7 +659,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
 const PropertyInfo qdev_prop_multifd_compression = {
 .name = "MultiFDCompression",
 .description = "multifd_compression values, "
-   "none/zlib/zstd",
+   "none/zlib/zstd/qpl",
 .enum_table = _lookup,
 .get = qdev_propinfo_get_enum,
 .set = qdev_propinfo_set_enum,
diff --git a/migration/meson.build b/migration/meson.build
index 1eeb915ff6..cb177de1d2 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -41,6 +41,7 @@ if get_option('live_block_migration').allowed()
   system_ss.add(files('block.c'))
 endif
 system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
+system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))
 
 specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
 if_true: files('ram.c',
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
new file mode 100644
index 00..056a68a060
--- /dev/null
+++ b/migration/multifd-qpl.c
@@ -0,0 +1,20 @@
+/*
+ * Multifd qpl compression accelerator implementation
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *
+ * Authors:
+ *  Yuan Liu
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include "qemu/module.h"
+
+static void multifd_qpl_register(void)
+{
+/* noop */
+}
+
+migration_init(multifd_qpl_register);
diff --git a/migration/multifd.h b/migration/multifd.h
index c9d9b09239..5b7d9b15f8 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -40,6 +40,7 @@ MultiFDRecvData *multifd_get_recv_data(void);
 #define MULTIFD_FLAG_NOCOMP (0 << 1)
 #define MULTIFD_FLAG_ZLIB (1 << 1)
 #define MULTIFD_FLAG_ZSTD (2 << 1)
+#define MULTIFD_FLAG_QPL (4 << 1)
 
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
diff --git a/qapi/migration.json b/qapi/migration.json
index aa1b39bce1..dceb35db5b 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -629,11 +629,16 @@
 #
 # @zstd: use zstd compression method.
 #
+# @qpl: use qpl compression method. Query Processing Library(qpl) is based on
+#   the deflate compression algorithm and use the Intel In-Memory Analytics
+#   Accelerator(IAA) accelerated compression and decompression. (Since 9.0)
+#
 # Since: 5.0
 ##
 { 'enum': 'MultiFDCompression',
   'data': [ 'none', 'zlib',
-{ 'name': 'zstd', 'if': 'CONFIG_ZSTD' } ] }
+{ 'name': 'zstd', 'if': 'CONFIG_ZSTD' },
+{ 'name': 'qpl', 'if': 'CONFIG_QPL' } ] }
 
 ##
 # @MigMode:
-- 
2.39.3




[PATCH v5 3/7] configure: add --enable-qpl build option

2024-03-20 Thread Yuan Liu
add --enable-qpl and --disable-qpl options to enable and disable
the QPL compression method for multifd migration.

the Query Processing Library (QPL) is an open-source library
that supports data compression and decompression features.

The QPL compression is based on the deflate compression algorithm
and use Intel In-Memory Analytics Accelerator(IAA) hardware for
compression and decompression acceleration.

Please refer to the following for more information about QPL
https://intel.github.io/qpl/documentation/introduction_docs/introduction.html

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 meson.build   | 16 
 meson_options.txt |  2 ++
 scripts/meson-buildoptions.sh |  3 +++
 3 files changed, 21 insertions(+)

diff --git a/meson.build b/meson.build
index b375248a76..bee7dcd53b 100644
--- a/meson.build
+++ b/meson.build
@@ -1200,6 +1200,20 @@ if not get_option('zstd').auto() or have_block
 required: get_option('zstd'),
 method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto()
+  libqpl = cc.find_library('qpl', required: false)
+  if not libqpl.found()
+error('libqpl not found, please install it from ' +
+
'https://intel.github.io/qpl/documentation/get_started_docs/installation.html')
+  endif
+  libaccel = dependency('libaccel-config', version: '>=4.0.0',
+required: true,
+method: 'pkg-config')
+  qpl = declare_dependency(dependencies: [libqpl, libaccel,
+cc.find_library('dl', required: get_option('qpl'))],
+link_args: ['-lstdc++'])
+endif
 virgl = not_found
 
 have_vhost_user_gpu = have_tools and host_os == 'linux' and pixman.found()
@@ -2305,6 +2319,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', 
has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -4462,6 +4477,7 @@ summary_info += {'snappy support':snappy}
 summary_info += {'bzip2 support': libbzip2}
 summary_info += {'lzfse support': liblzfse}
 summary_info += {'zstd support':  zstd}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'NUMA host support': numa}
 summary_info += {'capstone':  capstone}
 summary_info += {'libpmem support':   libpmem}
diff --git a/meson_options.txt b/meson_options.txt
index 0a99a059ec..06cd675572 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -259,6 +259,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+   description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 680fa3f581..784f74fde9 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -222,6 +222,7 @@ meson_options_help() {
   printf "%s\n" '  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon   xkbcommon support'
   printf "%s\n" '  zstdzstd compression support'
+  printf "%s\n" '  qpl Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -562,6 +563,8 @@ _meson_option_parse() {
 --disable-xkbcommon) printf "%s" -Dxkbcommon=disabled ;;
 --enable-zstd) printf "%s" -Dzstd=enabled ;;
 --disable-zstd) printf "%s" -Dzstd=disabled ;;
+--enable-qpl) printf "%s" -Dqpl=enabled ;;
+--disable-qpl) printf "%s" -Dqpl=disabled ;;
 *) return 1 ;;
   esac
 }
-- 
2.39.3




[PATCH v5 2/7] migration/multifd: put IOV initialization into compression method

2024-03-20 Thread Yuan Liu
Different compression methods may require different numbers of IOVs.
Based on streaming compression of zlib and zstd, all pages will be
compressed to a data block, so two IOVs are needed for packet header
and compressed data block.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-zlib.c | 4 
 migration/multifd-zstd.c | 6 +-
 migration/multifd.c  | 8 +---
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 99821cd4d5..8095ef8e28 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -70,6 +70,10 @@ static int zlib_send_setup(MultiFDSendParams *p, Error 
**errp)
 goto err_free_zbuff;
 }
 p->compress_data = z;
+
+assert(p->iov == NULL);
+/* For packet header and zlib streaming compression block */
+p->iov = g_new0(struct iovec, 2);
 return 0;
 
 err_free_zbuff:
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index 02112255ad..9c9217794e 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -52,7 +52,6 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
 struct zstd_data *z = g_new0(struct zstd_data, 1);
 int res;
 
-p->compress_data = z;
 z->zcs = ZSTD_createCStream();
 if (!z->zcs) {
 g_free(z);
@@ -77,6 +76,11 @@ static int zstd_send_setup(MultiFDSendParams *p, Error 
**errp)
 error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
 return -1;
 }
+p->compress_data = z;
+
+assert(p->iov == NULL);
+/* For packet header and zstd streaming compression block */
+p->iov = g_new0(struct iovec, 2);
 return 0;
 }
 
diff --git a/migration/multifd.c b/migration/multifd.c
index 0179422f6d..5155e02ae3 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -1181,9 +1181,11 @@ bool multifd_send_setup(void)
 p->packet = g_malloc0(p->packet_len);
 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
-
-/* We need one extra place for the packet header */
-p->iov = g_new0(struct iovec, page_count + 1);
+/* IOVs are initialized in send_setup of compression method */
+if (!migrate_multifd_compression()) {
+/* We need one extra place for the packet header */
+p->iov = g_new0(struct iovec, page_count + 1);
+}
 } else {
 p->iov = g_new0(struct iovec, page_count);
 }
-- 
2.39.3




[PATCH v5 6/7] migration/multifd: implement qpl compression and decompression

2024-03-20 Thread Yuan Liu
each qpl job is used to (de)compress a normal page and it can
be processed independently by the IAA hardware. All qpl jobs
are submitted to the hardware at once, and wait for all jobs
completion.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 229 +++-
 1 file changed, 225 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 6de65e9da7..479b051b24 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,6 +13,7 @@
 #include "qemu/osdep.h"
 #include "qemu/module.h"
 #include "qapi/error.h"
+#include "exec/ramblock.h"
 #include "migration.h"
 #include "multifd.h"
 #include "qpl/qpl.h"
@@ -171,6 +172,112 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error 
**errp)
 p->compress_data = NULL;
 }
 
+static inline void prepare_job(qpl_job *job, uint8_t *input, uint32_t 
input_len,
+   uint8_t *output, uint32_t output_len,
+   bool is_compression)
+{
+job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+job->next_in_ptr = input;
+job->next_out_ptr = output;
+job->available_in = input_len;
+job->available_out = output_len;
+job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+/* only supports one compression level */
+job->level = 1;
+}
+
+/**
+ * set_raw_data_hdr: set the length of raw data
+ *
+ * If the length of the compressed output data is greater than or equal to
+ * the page size, then set the compressed data length to the data size and
+ * send raw data directly.
+ *
+ * @qpl: pointer to the QplData structure
+ * @index: the index of the compression job header
+ */
+static inline void set_raw_data_hdr(QplData *qpl, uint32_t index)
+{
+assert(index < qpl->job_num);
+qpl->zbuf_hdr[index] = cpu_to_be32(qpl->data_size);
+}
+
+/**
+ * is_raw_data: check if the data is raw data
+ *
+ * The raw data length is always equal to data size, which is the
+ * size of one page.
+ *
+ * Returns true if the data is raw data, otherwise false
+ *
+ * @qpl: pointer to the QplData structure
+ * @index: the index of the decompressed job header
+ */
+static inline bool is_raw_data(QplData *qpl, uint32_t index)
+{
+assert(index < qpl->job_num);
+return qpl->zbuf_hdr[index] == qpl->data_size;
+}
+
+static int run_comp_jobs(MultiFDSendParams *p, Error **errp)
+{
+qpl_status status;
+QplData *qpl = p->compress_data;
+MultiFDPages_t *pages = p->pages;
+uint32_t job_num = pages->normal_num;
+qpl_job *job = NULL;
+uint32_t off = 0;
+
+assert(job_num <= qpl->job_num);
+/* submit all compression jobs */
+for (int i = 0; i < job_num; i++) {
+job = qpl->job_array[i];
+/* the compressed data size should be less than one page */
+prepare_job(job, pages->block->host + pages->offset[i], qpl->data_size,
+qpl->zbuf + off, qpl->data_size - 1, true);
+retry:
+status = qpl_submit_job(job);
+if (status == QPL_STS_OK) {
+off += qpl->data_size;
+} else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+goto retry;
+} else {
+error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+   p->id, status);
+return -1;
+}
+}
+
+/* wait all jobs to complete */
+for (int i = 0; i < job_num; i++) {
+job = qpl->job_array[i];
+status = qpl_wait_job(job);
+if (status == QPL_STS_OK) {
+qpl->zbuf_hdr[i] = cpu_to_be32(job->total_out);
+p->iov[p->iovs_num].iov_len = job->total_out;
+p->iov[p->iovs_num].iov_base = qpl->zbuf + (qpl->data_size * i);
+p->next_packet_size += job->total_out;
+} else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+/*
+ * the compression job does not fail, the output data
+ * size is larger than the provided memory size. In this
+ * case, raw data is sent directly to the destination.
+ */
+set_raw_data_hdr(qpl, i);
+p->iov[p->iovs_num].iov_len = qpl->data_size;
+p->iov[p->iovs_num].iov_base = pages->block->host +
+   pages->offset[i];
+p->next_packet_size += qpl->data_size;
+} else {
+error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+   p->id, status);
+return -1;
+}
+p->iovs_num++;
+}
+return 0;
+}
+
 /**
  * qpl_send_prepare: prepare data to be able to send
  *
@@ -184,8 +29

[PATCH 0/1] Solve zero page causing multiple page faults

2024-04-02 Thread Yuan Liu
ensure that the received zero pages memory data is 0?

In this case, the performance impact of live migration is not big
because the destination is not the bottleneck.

When using QPL (SVM-capable device), even if IOTLB is improved, the
overall performance will still be seriously degraded because a large
number of IO page faults are still generated.

Previous discussion link:
1. 
https://lore.kernel.org/all/CAAYibXib+TWnJpV22E=adncdBmwXJRqgRjJXK7X71J=bdfa...@mail.gmail.com/
2. 
https://lore.kernel.org/all/ph7pr11mb594123f7eefebfce219af100a3...@ph7pr11mb5941.namprd11.prod.outlook.com/

Yuan Liu (1):
  migration/multifd: solve zero page causing multiple page faults

 migration/multifd-zero-page.c | 4 +++-
 migration/multifd-zlib.c  | 1 +
 migration/multifd-zstd.c  | 1 +
 migration/multifd.c   | 1 +
 migration/ram.c   | 4 
 migration/ram.h   | 1 +
 6 files changed, 11 insertions(+), 1 deletion(-)

-- 
2.39.3




[PATCH 1/1] migration/multifd: solve zero page causing multiple page faults

2024-04-02 Thread Yuan Liu
Implemented recvbitmap tracking of received pages in multifd.

If the zero page appears for the first time in the recvbitmap, this
page is not checked and set.

If the zero page has already appeared in the recvbitmap, there is no
need to check the data but directly set the data to 0, because it is
unlikely that the zero page will be migrated multiple times.

Signed-off-by: Yuan Liu 
---
 migration/multifd-zero-page.c | 4 +++-
 migration/multifd-zlib.c  | 1 +
 migration/multifd-zstd.c  | 1 +
 migration/multifd.c   | 1 +
 migration/ram.c   | 4 
 migration/ram.h   | 1 +
 6 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
index 1ba38be636..e1b8370f88 100644
--- a/migration/multifd-zero-page.c
+++ b/migration/multifd-zero-page.c
@@ -80,8 +80,10 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
 {
 for (int i = 0; i < p->zero_num; i++) {
 void *page = p->host + p->zero[i];
-if (!buffer_is_zero(page, p->page_size)) {
+if (ramblock_recv_bitmap_test_byte_offset(p->block, p->zero[i])) {
 memset(page, 0, p->page_size);
+} else {
+ramblock_recv_bitmap_set_offset(p->block, p->zero[i]);
 }
 }
 }
diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 8095ef8e28..6246ecca2b 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -288,6 +288,7 @@ static int zlib_recv(MultiFDRecvParams *p, Error **errp)
 int flush = Z_NO_FLUSH;
 unsigned long start = zs->total_out;
 
+ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
 if (i == p->normal_num - 1) {
 flush = Z_SYNC_FLUSH;
 }
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index 9c9217794e..989333b572 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -282,6 +282,7 @@ static int zstd_recv(MultiFDRecvParams *p, Error **errp)
 z->in.pos = 0;
 
 for (i = 0; i < p->normal_num; i++) {
+ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
 z->out.dst = p->host + p->normal[i];
 z->out.size = p->page_size;
 z->out.pos = 0;
diff --git a/migration/multifd.c b/migration/multifd.c
index 72712fc31f..c9f544dba0 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -277,6 +277,7 @@ static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
 for (int i = 0; i < p->normal_num; i++) {
 p->iov[i].iov_base = p->host + p->normal[i];
 p->iov[i].iov_len = p->page_size;
+ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
 }
 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
 }
diff --git a/migration/ram.c b/migration/ram.c
index 8deb84984f..3aa70794c1 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -275,6 +275,10 @@ void ramblock_recv_bitmap_set_range(RAMBlock *rb, void 
*host_addr,
   nr);
 }
 
+void ramblock_recv_bitmap_set_offset(RAMBlock *rb, uint64_t byte_offset)
+{
+set_bit_atomic(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
+}
 #define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
 
 /*
diff --git a/migration/ram.h b/migration/ram.h
index 08feecaf51..bc0318b834 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -69,6 +69,7 @@ int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr);
 bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset);
 void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr);
 void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr, size_t nr);
+void ramblock_recv_bitmap_set_offset(RAMBlock *rb, uint64_t byte_offset);
 int64_t ramblock_recv_bitmap_send(QEMUFile *file,
   const char *block_name);
 bool ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *rb, Error **errp);
-- 
2.39.3




[PATCH v4 1/8] docs/migration: add qpl compression feature

2024-03-04 Thread Yuan Liu
add QPL compression method introduction

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 docs/devel/migration/features.rst|   1 +
 docs/devel/migration/qpl-compression.rst | 231 +++
 2 files changed, 232 insertions(+)
 create mode 100644 docs/devel/migration/qpl-compression.rst

diff --git a/docs/devel/migration/features.rst 
b/docs/devel/migration/features.rst
index a9acaf618e..9819393c12 100644
--- a/docs/devel/migration/features.rst
+++ b/docs/devel/migration/features.rst
@@ -10,3 +10,4 @@ Migration has plenty of features to support different use 
cases.
dirty-limit
vfio
virtio
+   qpl-compression
diff --git a/docs/devel/migration/qpl-compression.rst 
b/docs/devel/migration/qpl-compression.rst
new file mode 100644
index 00..42c7969d30
--- /dev/null
+++ b/docs/devel/migration/qpl-compression.rst
@@ -0,0 +1,231 @@
+===
+QPL Compression
+===
+The Intel Query Processing Library (Intel ``QPL``) is an open-source library to
+provide compression and decompression features and it is based on deflate
+compression algorithm (RFC 1951).
+
+The ``QPL`` compression relies on Intel In-Memory Analytics 
Accelerator(``IAA``)
+and Shared Virtual Memory(``SVM``) technology, they are new features supported
+from Intel 4th Gen Intel Xeon Scalable processors, codenamed Sapphire Rapids
+processor(``SPR``).
+
+For more ``QPL`` introduction, please refer to:
+
+https://intel.github.io/qpl/documentation/introduction_docs/introduction.html
+
+QPL Compression Framework
+=
+
+::
+
+  ++   +--+
+  | MultiFD Service|   |accel-config tool |
+  +---++   ++-+
+  | |
+  | |
+  +---++| Setup IAA
+  |  QPL library   || Resources
+  +---+---++|
+  |   | |
+  |   +-+---+
+  |   Open IAA  |
+  |   Devices +-+-+
+  |   |idxd driver|
+  |   +-+-+
+  | |
+  | |
+  |   +-+-+
+  +---+IAA Devices|
+  Submit jobs +---+
+  via enqcmd
+
+
+Intel In-Memory Analytics Accelerator (Intel IAA) Introduction
+
+
+Intel ``IAA`` is an accelerator that has been designed to help benefit
+in-memory databases and analytic workloads. There are three main areas
+that Intel ``IAA`` can assist with analytics primitives (scan, filter, etc.),
+sparse data compression and memory tiering.
+
+``IAA`` Manual Documentation:
+
+https://www.intel.com/content/www/us/en/content-details/721858/intel-in-memory-analytics-accelerator-architecture-specification
+
+IAA Device Enabling
+---
+
+- Enabling ``IAA`` devices for platform configuration, please refer to:
+
+https://www.intel.com/content/www/us/en/content-details/780887/intel-in-memory-analytics-accelerator-intel-iaa.html
+
+- ``IAA`` device driver is ``Intel Data Accelerator Driver (idxd)``, it is
+  recommended that the minimum version of Linux kernel is 5.18.
+
+- Add ``"intel_iommu=on,sm_on"`` parameter to kernel command line
+  for ``SVM`` feature enabling.
+
+Here is an easy way to verify ``IAA`` device driver and ``SVM``, refer to:
+
+https://github.com/intel/idxd-config/tree/stable/test
+
+IAA Device Management
+-
+
+The number of ``IAA`` devices will vary depending on the Xeon product model.
+On a ``SPR`` server, there can be a maximum of 8 ``IAA`` devices, with up to
+4 devices per socket.
+
+By default, all ``IAA`` devices are disabled and need to be configured and
+enabled by users manually.
+
+Check the number of devices through the following command
+
+.. code-block:: shell
+
+  # lspci -d 8086:0cfe
+  # 6a:02.0 System peripheral: Intel Corporation Device 0cfe
+  # 6f:02.0 System peripheral: Intel Corporation Device 0cfe
+  # 74:02.0 System peripheral: Intel Corporation Device 0cfe
+  # 79:02.0 System peripheral: Intel Corporation Device 0cfe
+  # e7:02.0 System peripheral: Intel Corporation Device 0cfe
+  # ec:02.0 System peripheral: Intel Corporation Device 0cfe
+  # f1:02.0 System peripheral: Intel Corporation Device 0cfe
+  # f6:02.0 System peripheral: Intel Corporation Device 0cfe
+
+IAA Device Configuration
+
+
+The ``accel-config`` tool is used to enable ``IAA`` devices and configure
+``IAA`` hardware resources(work queues and engines). One ``IAA`` device
+has 8 work queues and 8 processing engines, multiple engines can be assigned
+to a work queue via ``group`` attribute.
+
+One example of configuring and enabling an ``IAA`` device.
+
+.. code-block:: shell
+
+  # accel-config config-engine iax1/engine1.0 -g 0
+  # accel-config config-engine iax1

[PATCH v4 3/8] configure: add --enable-qpl build option

2024-03-04 Thread Yuan Liu
add --enable-qpl and --disable-qpl options to enable and disable
the QPL compression method for multifd migration.

the Query Processing Library (QPL) is an open-source library
that supports data compression and decompression features.

The QPL compression is based on the deflate compression algorithm
and use Intel In-Memory Analytics Accelerator(IAA) hardware for
compression and decompression acceleration.

Please refer to the following for more information about QPL
https://intel.github.io/qpl/documentation/introduction_docs/introduction.html

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 meson.build   | 18 ++
 meson_options.txt |  2 ++
 scripts/meson-buildoptions.sh |  3 +++
 3 files changed, 23 insertions(+)

diff --git a/meson.build b/meson.build
index c1dc83e4c0..2dea1e6834 100644
--- a/meson.build
+++ b/meson.build
@@ -1197,6 +1197,22 @@ if not get_option('zstd').auto() or have_block
 required: get_option('zstd'),
 method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto()
+  libqpl = cc.find_library('qpl', required: false)
+  if not libqpl.found()
+error('libqpl not found, please install it from ' +
+
'https://intel.github.io/qpl/documentation/get_started_docs/installation.html')
+  endif
+  libaccel = cc.find_library('accel-config', required: false)
+  if not libaccel.found()
+error('libaccel-config not found, please install it from ' +
+'https://github.com/intel/idxd-config')
+  endif
+  qpl = declare_dependency(dependencies: [libqpl, libaccel,
+cc.find_library('dl', required: get_option('qpl'))],
+link_args: ['-lstdc++'])
+endif
 virgl = not_found
 
 have_vhost_user_gpu = have_tools and host_os == 'linux' and pixman.found()
@@ -2298,6 +2314,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', 
has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -4438,6 +4455,7 @@ summary_info += {'snappy support':snappy}
 summary_info += {'bzip2 support': libbzip2}
 summary_info += {'lzfse support': liblzfse}
 summary_info += {'zstd support':  zstd}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'NUMA host support': numa}
 summary_info += {'capstone':  capstone}
 summary_info += {'libpmem support':   libpmem}
diff --git a/meson_options.txt b/meson_options.txt
index 0a99a059ec..06cd675572 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -259,6 +259,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+   description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 680fa3f581..784f74fde9 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -222,6 +222,7 @@ meson_options_help() {
   printf "%s\n" '  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon   xkbcommon support'
   printf "%s\n" '  zstdzstd compression support'
+  printf "%s\n" '  qpl Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -562,6 +563,8 @@ _meson_option_parse() {
 --disable-xkbcommon) printf "%s" -Dxkbcommon=disabled ;;
 --enable-zstd) printf "%s" -Dzstd=enabled ;;
 --disable-zstd) printf "%s" -Dzstd=disabled ;;
+--enable-qpl) printf "%s" -Dqpl=enabled ;;
+--disable-qpl) printf "%s" -Dqpl=disabled ;;
 *) return 1 ;;
   esac
 }
-- 
2.39.3




[PATCH v4 4/8] migration/multifd: add qpl compression method

2024-03-04 Thread Yuan Liu
add the Query Processing Library (QPL) compression method

Although both qpl and zlib support deflate compression, qpl will
only use the In-Memory Analytics Accelerator(IAA) for compression
and decompression, and IAA is not compatible with the Zlib in
migration, so qpl is used as a new compression method for migration.

How to enable qpl compression during migration:
migrate_set_parameter multifd-compression qpl

The qpl only supports one compression level, there is no qpl
compression level parameter added, users do not need to specify
the qpl compression level.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 hw/core/qdev-properties-system.c |   2 +-
 migration/meson.build|   1 +
 migration/multifd-qpl.c  | 158 +++
 migration/multifd.h  |   1 +
 qapi/migration.json  |   7 +-
 5 files changed, 167 insertions(+), 2 deletions(-)
 create mode 100644 migration/multifd-qpl.c

diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index 1a396521d5..b4f0e5cbdb 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -658,7 +658,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
 const PropertyInfo qdev_prop_multifd_compression = {
 .name = "MultiFDCompression",
 .description = "multifd_compression values, "
-   "none/zlib/zstd",
+   "none/zlib/zstd/qpl",
 .enum_table = _lookup,
 .get = qdev_propinfo_get_enum,
 .set = qdev_propinfo_set_enum,
diff --git a/migration/meson.build b/migration/meson.build
index 92b1cc4297..c155c2d781 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -40,6 +40,7 @@ if get_option('live_block_migration').allowed()
   system_ss.add(files('block.c'))
 endif
 system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
+system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))
 
 specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
 if_true: files('ram.c',
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
new file mode 100644
index 00..6b94e732ac
--- /dev/null
+++ b/migration/multifd-qpl.c
@@ -0,0 +1,158 @@
+/*
+ * Multifd qpl compression accelerator implementation
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *
+ * Authors:
+ *  Yuan Liu
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/rcu.h"
+#include "exec/ramblock.h"
+#include "exec/target_page.h"
+#include "qapi/error.h"
+#include "migration.h"
+#include "trace.h"
+#include "options.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+struct qpl_data {
+qpl_job **job_array;
+/* the number of allocated jobs */
+uint32_t job_num;
+/* the size of data processed by a qpl job */
+uint32_t data_size;
+/* compressed data buffer */
+uint8_t *zbuf;
+/* the length of compressed data */
+uint32_t *zbuf_hdr;
+};
+
+/**
+ * qpl_send_setup: setup send side
+ *
+ * Setup each channel with QPL compression.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int qpl_send_setup(MultiFDSendParams *p, Error **errp)
+{
+/* Implement in next patch */
+return -1;
+}
+
+/**
+ * qpl_send_cleanup: cleanup send side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static void qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
+{
+/* Implement in next patch */
+}
+
+/**
+ * qpl_send_prepare: prepare data to be able to send
+ *
+ * Create a compressed buffer with all the pages that we are going to
+ * send.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int qpl_send_prepare(MultiFDSendParams *p, Error **errp)
+{
+/* Implement in next patch */
+return -1;
+}
+
+/**
+ * qpl_recv_setup: setup receive side
+ *
+ * Create the compressed channel and buffer.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+/* Implement in next patch */
+return -1;
+}
+
+/**
+ * qpl_recv_cleanup: setup receive side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void qpl_recv_cleanup(MultiFDRecvParams *p)
+{
+/* Implement in next patch */
+}
+
+/**
+ * qpl_recv_pages: read the data from the channel into actual pages
+ *
+ * Read the compressed buffer, and uncompress it into the actual
+ * pages.
+ *
+ * Returns 0 for success or -1 for error
+ *

[PATCH v4 8/8] tests/migration-test: add qpl compression test

2024-03-04 Thread Yuan Liu
add qpl to compression method test for multifd migration

the migration with qpl compression needs to access IAA hardware
resource, please run "check-qtest" with sudo or root permission,
otherwise migration test will fail

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 tests/qtest/migration-test.c | 24 
 1 file changed, 24 insertions(+)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 23d50fe599..96842f9515 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -2653,6 +2653,15 @@ test_migrate_precopy_tcp_multifd_zstd_start(QTestState 
*from,
 }
 #endif /* CONFIG_ZSTD */
 
+#ifdef CONFIG_QPL
+static void *
+test_migrate_precopy_tcp_multifd_qpl_start(QTestState *from,
+QTestState *to)
+{
+return test_migrate_precopy_tcp_multifd_start_common(from, to, "qpl");
+}
+#endif /* CONFIG_QPL */
+
 static void test_multifd_tcp_none(void)
 {
 MigrateCommon args = {
@@ -2688,6 +2697,17 @@ static void test_multifd_tcp_zstd(void)
 }
 #endif
 
+#ifdef CONFIG_QPL
+static void test_multifd_tcp_qpl(void)
+{
+MigrateCommon args = {
+.listen_uri = "defer",
+.start_hook = test_migrate_precopy_tcp_multifd_qpl_start,
+};
+test_precopy_common();
+}
+#endif
+
 #ifdef CONFIG_GNUTLS
 static void *
 test_migrate_multifd_tcp_tls_psk_start_match(QTestState *from,
@@ -3574,6 +3594,10 @@ int main(int argc, char **argv)
 migration_test_add("/migration/multifd/tcp/plain/zstd",
test_multifd_tcp_zstd);
 #endif
+#ifdef CONFIG_QPL
+migration_test_add("/migration/multifd/tcp/plain/qpl",
+   test_multifd_tcp_qpl);
+#endif
 #ifdef CONFIG_GNUTLS
 migration_test_add("/migration/multifd/tcp/tls/psk/match",
test_multifd_tcp_tls_psk_match);
-- 
2.39.3




[PATCH v4 7/8] migration/multifd: fix zlib and zstd compression levels not working

2024-03-04 Thread Yuan Liu
add zlib and zstd compression levels in multifd parameter
testing and application and add compression level tests

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reported-by: Xiaohui Li 
---
 migration/options.c  | 12 
 tests/qtest/migration-test.c | 16 
 2 files changed, 28 insertions(+)

diff --git a/migration/options.c b/migration/options.c
index 3e3e0b93b4..1cd3cc7c33 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -1312,6 +1312,12 @@ static void 
migrate_params_test_apply(MigrateSetParameters *params,
 if (params->has_multifd_compression) {
 dest->multifd_compression = params->multifd_compression;
 }
+if (params->has_multifd_zlib_level) {
+dest->multifd_zlib_level = params->multifd_zlib_level;
+}
+if (params->has_multifd_zstd_level) {
+dest->multifd_zstd_level = params->multifd_zstd_level;
+}
 if (params->has_xbzrle_cache_size) {
 dest->xbzrle_cache_size = params->xbzrle_cache_size;
 }
@@ -1447,6 +1453,12 @@ static void migrate_params_apply(MigrateSetParameters 
*params, Error **errp)
 if (params->has_multifd_compression) {
 s->parameters.multifd_compression = params->multifd_compression;
 }
+if (params->has_multifd_zlib_level) {
+s->parameters.multifd_zlib_level = params->multifd_zlib_level;
+}
+if (params->has_multifd_zstd_level) {
+s->parameters.multifd_zstd_level = params->multifd_zstd_level;
+}
 if (params->has_xbzrle_cache_size) {
 s->parameters.xbzrle_cache_size = params->xbzrle_cache_size;
 xbzrle_cache_resize(params->xbzrle_cache_size, errp);
diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 8a5bb1752e..23d50fe599 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -2621,10 +2621,24 @@ test_migrate_precopy_tcp_multifd_start(QTestState *from,
 return test_migrate_precopy_tcp_multifd_start_common(from, to, "none");
 }
 
+static void
+test_and_set_multifd_compression_level(QTestState *who, const char *param)
+{
+/* The default compression level is 1, test a level other than 1 */
+int level = 2;
+
+migrate_set_parameter_int(who, param, level);
+migrate_check_parameter_int(who, param, level);
+/* only test compression level 1 during migration */
+migrate_set_parameter_int(who, param, 1);
+}
+
 static void *
 test_migrate_precopy_tcp_multifd_zlib_start(QTestState *from,
 QTestState *to)
 {
+/* the compression level is used only on the source side. */
+test_and_set_multifd_compression_level(from, "multifd-zlib-level");
 return test_migrate_precopy_tcp_multifd_start_common(from, to, "zlib");
 }
 
@@ -2633,6 +2647,8 @@ static void *
 test_migrate_precopy_tcp_multifd_zstd_start(QTestState *from,
 QTestState *to)
 {
+/* the compression level is used only on the source side. */
+test_and_set_multifd_compression_level(from, "multifd-zstd-level");
 return test_migrate_precopy_tcp_multifd_start_common(from, to, "zstd");
 }
 #endif /* CONFIG_ZSTD */
-- 
2.39.3




[PATCH v4 2/8] migration/multifd: add get_iov_count in the multifd method

2024-03-04 Thread Yuan Liu
the new function get_iov_count is used to get the number of
IOVs required by a specified multifd method

Different multifd methods may require different numbers of IOVs.
Based on streaming compression of zlib and zstd, all pages will be
compressed to a data block, so an IOV is required to send this data
block. For no compression, each IOV is used to send a page, so the
number of IOVs required is the same as the number of pages.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-zlib.c | 18 +-
 migration/multifd-zstd.c | 18 +-
 migration/multifd.c  | 24 +---
 migration/multifd.h  |  2 ++
 4 files changed, 57 insertions(+), 5 deletions(-)

diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 012e3bdea1..35187f2aff 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -313,13 +313,29 @@ static int zlib_recv_pages(MultiFDRecvParams *p, Error 
**errp)
 return 0;
 }
 
+/**
+ * zlib_get_iov_count: get the count of IOVs
+ *
+ * For zlib streaming compression, all pages will be compressed into a data
+ * block, and an IOV is requested for sending this block.
+ *
+ * Returns the count of the IOVs
+ *
+ * @page_count: Indicate the maximum count of pages processed by multifd
+ */
+static uint32_t zlib_get_iov_count(uint32_t page_count)
+{
+return 1;
+}
+
 static MultiFDMethods multifd_zlib_ops = {
 .send_setup = zlib_send_setup,
 .send_cleanup = zlib_send_cleanup,
 .send_prepare = zlib_send_prepare,
 .recv_setup = zlib_recv_setup,
 .recv_cleanup = zlib_recv_cleanup,
-.recv_pages = zlib_recv_pages
+.recv_pages = zlib_recv_pages,
+.get_iov_count = zlib_get_iov_count
 };
 
 static void multifd_zlib_register(void)
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index dc8fe43e94..25ed1add2a 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -304,13 +304,29 @@ static int zstd_recv_pages(MultiFDRecvParams *p, Error 
**errp)
 return 0;
 }
 
+/**
+ * zstd_get_iov_count: get the count of IOVs
+ *
+ * For zstd streaming compression, all pages will be compressed into a data
+ * block, and an IOV is requested for sending this block.
+ *
+ * Returns the count of the IOVs
+ *
+ * @page_count: Indicate the maximum count of pages processed by multifd
+ */
+static uint32_t zstd_get_iov_count(uint32_t page_count)
+{
+return 1;
+}
+
 static MultiFDMethods multifd_zstd_ops = {
 .send_setup = zstd_send_setup,
 .send_cleanup = zstd_send_cleanup,
 .send_prepare = zstd_send_prepare,
 .recv_setup = zstd_recv_setup,
 .recv_cleanup = zstd_recv_cleanup,
-.recv_pages = zstd_recv_pages
+.recv_pages = zstd_recv_pages,
+.get_iov_count = zstd_get_iov_count
 };
 
 static void multifd_zstd_register(void)
diff --git a/migration/multifd.c b/migration/multifd.c
index adfe8c9a0a..787402247e 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -209,13 +209,29 @@ static int nocomp_recv_pages(MultiFDRecvParams *p, Error 
**errp)
 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
 }
 
+/**
+ * nocomp_get_iov_count: get the count of IOVs
+ *
+ * For no compression, the count of IOVs required is the same as the count of
+ * pages
+ *
+ * Returns the count of the IOVs
+ *
+ * @page_count: Indicate the maximum count of pages processed by multifd
+ */
+static uint32_t nocomp_get_iov_count(uint32_t page_count)
+{
+return page_count;
+}
+
 static MultiFDMethods multifd_nocomp_ops = {
 .send_setup = nocomp_send_setup,
 .send_cleanup = nocomp_send_cleanup,
 .send_prepare = nocomp_send_prepare,
 .recv_setup = nocomp_recv_setup,
 .recv_cleanup = nocomp_recv_cleanup,
-.recv_pages = nocomp_recv_pages
+.recv_pages = nocomp_recv_pages,
+.get_iov_count = nocomp_get_iov_count
 };
 
 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
@@ -998,6 +1014,8 @@ bool multifd_send_setup(void)
 Error *local_err = NULL;
 int thread_count, ret = 0;
 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+/* We need one extra place for the packet header */
+uint32_t iov_count = 1;
 uint8_t i;
 
 if (!migrate_multifd()) {
@@ -1012,6 +1030,7 @@ bool multifd_send_setup(void)
 qemu_sem_init(_send_state->channels_ready, 0);
 qatomic_set(_send_state->exiting, 0);
 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
+iov_count += multifd_send_state->ops->get_iov_count(page_count);
 
 for (i = 0; i < thread_count; i++) {
 MultiFDSendParams *p = _send_state->params[i];
@@ -1026,8 +1045,7 @@ bool multifd_send_setup(void)
 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
 p->name = g_strdup_printf("multifdsend_%d", i);
-/* We need one extra place for the packet header *

[PATCH v4 6/8] migration/multifd: implement qpl compression and decompression

2024-03-04 Thread Yuan Liu
each qpl job is used to (de)compress a normal page and it can
be processed independently by the IAA hardware. All qpl jobs
are submitted to the hardware at once, and wait for all jobs
completion.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 219 +++-
 1 file changed, 215 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index f4db97ca01..eb815ea3be 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -167,6 +167,112 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error 
**errp)
 p->data = NULL;
 }
 
+static inline void prepare_job(qpl_job *job, uint8_t *input, uint32_t 
input_len,
+   uint8_t *output, uint32_t output_len,
+   bool is_compression)
+{
+job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+job->next_in_ptr = input;
+job->next_out_ptr = output;
+job->available_in = input_len;
+job->available_out = output_len;
+job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+/* only supports one compression level */
+job->level = 1;
+}
+
+/**
+ * set_raw_data_hdr: set the length of raw data
+ *
+ * If the length of the compressed output data is greater than or equal to
+ * the page size, then set the compressed data length to the data size and
+ * send raw data directly.
+ *
+ * @qpl: pointer to the qpl_data structure
+ * @index: the index of the compression job header
+ */
+static inline void set_raw_data_hdr(struct qpl_data *qpl, uint32_t index)
+{
+assert(index < qpl->job_num);
+qpl->zbuf_hdr[index] = cpu_to_be32(qpl->data_size);
+}
+
+/**
+ * is_raw_data: check if the data is raw data
+ *
+ * The raw data length is always equal to data size, which is the
+ * size of one page.
+ *
+ * Returns true if the data is raw data, otherwise false
+ *
+ * @qpl: pointer to the qpl_data structure
+ * @index: the index of the decompressed job header
+ */
+static inline bool is_raw_data(struct qpl_data *qpl, uint32_t index)
+{
+assert(index < qpl->job_num);
+return qpl->zbuf_hdr[index] == qpl->data_size;
+}
+
+static int run_comp_jobs(MultiFDSendParams *p, Error **errp)
+{
+qpl_status status;
+struct qpl_data *qpl = p->data;
+MultiFDPages_t *pages = p->pages;
+uint32_t job_num = pages->num;
+qpl_job *job = NULL;
+uint32_t off = 0;
+
+assert(job_num <= qpl->job_num);
+/* submit all compression jobs */
+for (int i = 0; i < job_num; i++) {
+job = qpl->job_array[i];
+/* the compressed data size should be less than one page */
+prepare_job(job, pages->block->host + pages->offset[i], qpl->data_size,
+qpl->zbuf + off, qpl->data_size - 1, true);
+retry:
+status = qpl_submit_job(job);
+if (status == QPL_STS_OK) {
+off += qpl->data_size;
+} else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+goto retry;
+} else {
+error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+   p->id, status);
+return -1;
+}
+}
+
+/* wait all jobs to complete */
+for (int i = 0; i < job_num; i++) {
+job = qpl->job_array[i];
+status = qpl_wait_job(job);
+if (status == QPL_STS_OK) {
+qpl->zbuf_hdr[i] = cpu_to_be32(job->total_out);
+p->iov[p->iovs_num].iov_len = job->total_out;
+p->iov[p->iovs_num].iov_base = qpl->zbuf + (qpl->data_size * i);
+p->next_packet_size += job->total_out;
+} else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+/*
+ * the compression job does not fail, the output data
+ * size is larger than the provided memory size. In this
+ * case, raw data is sent directly to the destination.
+ */
+set_raw_data_hdr(qpl, i);
+p->iov[p->iovs_num].iov_len = qpl->data_size;
+p->iov[p->iovs_num].iov_base = pages->block->host +
+   pages->offset[i];
+p->next_packet_size += qpl->data_size;
+} else {
+error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+   p->id, status);
+return -1;
+}
+p->iovs_num++;
+}
+return 0;
+}
+
 /**
  * qpl_send_prepare: prepare data to be able to send
  *
@@ -180,8 +286,25 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error 
**errp)
  */
 static int qpl_send_prepare(MultiFDSendParams *p, Error **errp)
 {
-/* Implement in next patch */
-return -1;
+struct qpl_data *qpl = p->data;
+uint32_t hdr_size = p->pages->nu

[PATCH v4 5/8] migration/multifd: implement initialization of qpl compression

2024-03-04 Thread Yuan Liu
the qpl initialization includes memory allocation for compressed
data and the qpl job initialization.

the qpl initialization will check whether the In-Memory Analytics
Accelerator(IAA) hardware is available, if the platform does not
have IAA hardware or the IAA hardware is not available, the QPL
compression initialization will fail.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 128 ++--
 1 file changed, 122 insertions(+), 6 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 6b94e732ac..f4db97ca01 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -33,6 +33,100 @@ struct qpl_data {
 uint32_t *zbuf_hdr;
 };
 
+static void free_zbuf(struct qpl_data *qpl)
+{
+if (qpl->zbuf != NULL) {
+munmap(qpl->zbuf, qpl->job_num * qpl->data_size);
+qpl->zbuf = NULL;
+}
+if (qpl->zbuf_hdr != NULL) {
+g_free(qpl->zbuf_hdr);
+qpl->zbuf_hdr = NULL;
+}
+}
+
+static int alloc_zbuf(struct qpl_data *qpl, uint8_t chan_id, Error **errp)
+{
+int flags = MAP_PRIVATE | MAP_POPULATE | MAP_ANONYMOUS;
+uint32_t size = qpl->job_num * qpl->data_size;
+uint8_t *buf;
+
+buf = (uint8_t *) mmap(NULL, size, PROT_READ | PROT_WRITE, flags, -1, 0);
+if (buf == MAP_FAILED) {
+error_setg(errp, "multifd: %u: alloc_zbuf failed, job num %u, size %u",
+   chan_id, qpl->job_num, qpl->data_size);
+return -1;
+}
+qpl->zbuf = buf;
+qpl->zbuf_hdr = g_new0(uint32_t, qpl->job_num);
+return 0;
+}
+
+static void free_jobs(struct qpl_data *qpl)
+{
+for (int i = 0; i < qpl->job_num; i++) {
+qpl_fini_job(qpl->job_array[i]);
+g_free(qpl->job_array[i]);
+qpl->job_array[i] = NULL;
+}
+g_free(qpl->job_array);
+qpl->job_array = NULL;
+}
+
+static int alloc_jobs(struct qpl_data *qpl, uint8_t chan_id, Error **errp)
+{
+qpl_status status;
+uint32_t job_size = 0;
+qpl_job *job = NULL;
+/* always use IAA hardware accelerator */
+qpl_path_t path = qpl_path_hardware;
+
+status = qpl_get_job_size(path, _size);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
+   chan_id, status);
+return -1;
+}
+qpl->job_array = g_new0(qpl_job *, qpl->job_num);
+for (int i = 0; i < qpl->job_num; i++) {
+job = g_malloc0(job_size);
+status = qpl_init_job(path, job);
+if (status != QPL_STS_OK) {
+error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
+   chan_id, status);
+free_jobs(qpl);
+return -1;
+}
+qpl->job_array[i] = job;
+}
+return 0;
+}
+
+static int init_qpl(struct qpl_data *qpl, uint32_t job_num, uint32_t data_size,
+uint8_t chan_id, Error **errp)
+{
+qpl->job_num = job_num;
+qpl->data_size = data_size;
+if (alloc_zbuf(qpl, chan_id, errp) != 0) {
+return -1;
+}
+if (alloc_jobs(qpl, chan_id, errp) != 0) {
+free_zbuf(qpl);
+return -1;
+}
+return 0;
+}
+
+static void deinit_qpl(struct qpl_data *qpl)
+{
+if (qpl != NULL) {
+free_jobs(qpl);
+free_zbuf(qpl);
+qpl->job_num = 0;
+qpl->data_size = 0;
+}
+}
+
 /**
  * qpl_send_setup: setup send side
  *
@@ -45,8 +139,15 @@ struct qpl_data {
  */
 static int qpl_send_setup(MultiFDSendParams *p, Error **errp)
 {
-/* Implement in next patch */
-return -1;
+struct qpl_data *qpl;
+
+qpl = g_new0(struct qpl_data, 1);
+if (init_qpl(qpl, p->page_count, p->page_size, p->id, errp) != 0) {
+g_free(qpl);
+return -1;
+}
+p->data = qpl;
+return 0;
 }
 
 /**
@@ -59,7 +160,11 @@ static int qpl_send_setup(MultiFDSendParams *p, Error 
**errp)
  */
 static void qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
 {
-/* Implement in next patch */
+struct qpl_data *qpl = p->data;
+
+deinit_qpl(qpl);
+g_free(p->data);
+p->data = NULL;
 }
 
 /**
@@ -91,8 +196,15 @@ static int qpl_send_prepare(MultiFDSendParams *p, Error 
**errp)
  */
 static int qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
 {
-/* Implement in next patch */
-return -1;
+struct qpl_data *qpl;
+
+qpl = g_new0(struct qpl_data, 1);
+if (init_qpl(qpl, p->page_count, p->page_size, p->id, errp) != 0) {
+g_free(qpl);
+return -1;
+}
+p->data = qpl;
+return 0;
 }
 
 /**
@@ -104,7 +216,11 @@ static int qpl_recv_setup(MultiFDRecvParams *p, Error 
**errp)
  */
 static void qpl_recv_cleanup(MultiFDRecvParams *p)
 {
-/* Implement in next patch */
+struct qpl_data *qpl = p->data;
+
+deinit_qpl(qpl);
+g_free(p->data);
+p->data = NULL;
 }
 
 /**
-- 
2.39.3




[PATCH v7 3/7] configure: add --enable-qpl build option

2024-06-04 Thread Yuan Liu
add --enable-qpl and --disable-qpl options to enable and disable
the QPL compression method for multifd migration.

The Query Processing Library (QPL) is an open-source library
that supports data compression and decompression features. It
is based on the deflate compression algorithm and use Intel
In-Memory Analytics Accelerator(IAA) hardware for compression
and decompression acceleration.

For more live migration with IAA, please refer to the document
docs/devel/migration/qpl-compression.rst

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 meson.build   | 8 
 meson_options.txt | 2 ++
 scripts/meson-buildoptions.sh | 3 +++
 3 files changed, 13 insertions(+)

diff --git a/meson.build b/meson.build
index 6386607144..d97f312a42 100644
--- a/meson.build
+++ b/meson.build
@@ -1197,6 +1197,12 @@ if not get_option('zstd').auto() or have_block
 required: get_option('zstd'),
 method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto() or have_system
+  qpl = dependency('qpl', version: '>=1.5.0',
+required: get_option('qpl'),
+method: 'pkg-config')
+endif
 virgl = not_found
 
 have_vhost_user_gpu = have_tools and host_os == 'linux' and pixman.found()
@@ -2331,6 +2337,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', 
has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -4439,6 +4446,7 @@ summary_info += {'snappy support':snappy}
 summary_info += {'bzip2 support': libbzip2}
 summary_info += {'lzfse support': liblzfse}
 summary_info += {'zstd support':  zstd}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'NUMA host support': numa}
 summary_info += {'capstone':  capstone}
 summary_info += {'libpmem support':   libpmem}
diff --git a/meson_options.txt b/meson_options.txt
index 4c1583eb40..dd680a5faf 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -259,6 +259,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+   description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 6ce5a8b72a..73ae8cedfc 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -220,6 +220,7 @@ meson_options_help() {
   printf "%s\n" '  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon   xkbcommon support'
   printf "%s\n" '  zstdzstd compression support'
+  printf "%s\n" '  qpl Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -558,6 +559,8 @@ _meson_option_parse() {
 --disable-xkbcommon) printf "%s" -Dxkbcommon=disabled ;;
 --enable-zstd) printf "%s" -Dzstd=enabled ;;
 --disable-zstd) printf "%s" -Dzstd=disabled ;;
+--enable-qpl) printf "%s" -Dqpl=enabled ;;
+--disable-qpl) printf "%s" -Dqpl=disabled ;;
 *) return 1 ;;
   esac
 }
-- 
2.43.0




[PATCH v7 4/7] migration/multifd: add qpl compression method

2024-06-04 Thread Yuan Liu
add the Query Processing Library (QPL) compression method

Introduce the qpl as a new multifd migration compression method, it can
use In-Memory Analytics Accelerator(IAA) to accelerate compression and
decompression, which can not only reduce network bandwidth requirement
but also reduce host compression and decompression CPU overhead.

How to enable qpl compression during migration:
migrate_set_parameter multifd-compression qpl

There is no qpl compression level parameter added since it only supports
level one, users do not need to specify the qpl compression level.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Peter Xu 
---
 hw/core/qdev-properties-system.c |  2 +-
 migration/meson.build|  1 +
 migration/multifd-qpl.c  | 20 
 migration/multifd.h  |  1 +
 qapi/migration.json  |  7 ++-
 5 files changed, 29 insertions(+), 2 deletions(-)
 create mode 100644 migration/multifd-qpl.c

diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index d79d6f4b53..6ccd7224f6 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -659,7 +659,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
 const PropertyInfo qdev_prop_multifd_compression = {
 .name = "MultiFDCompression",
 .description = "multifd_compression values, "
-   "none/zlib/zstd",
+   "none/zlib/zstd/qpl",
 .enum_table = _lookup,
 .get = qdev_propinfo_get_enum,
 .set = qdev_propinfo_set_enum,
diff --git a/migration/meson.build b/migration/meson.build
index bdc3244bce..5f146fe8a9 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -39,6 +39,7 @@ endif
 
 system_ss.add(when: rdma, if_true: files('rdma.c'))
 system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
+system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))
 
 specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
 if_true: files('ram.c',
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
new file mode 100644
index 00..056a68a060
--- /dev/null
+++ b/migration/multifd-qpl.c
@@ -0,0 +1,20 @@
+/*
+ * Multifd qpl compression accelerator implementation
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *
+ * Authors:
+ *  Yuan Liu
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include "qemu/module.h"
+
+static void multifd_qpl_register(void)
+{
+/* noop */
+}
+
+migration_init(multifd_qpl_register);
diff --git a/migration/multifd.h b/migration/multifd.h
index c9d9b09239..5b7d9b15f8 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -40,6 +40,7 @@ MultiFDRecvData *multifd_get_recv_data(void);
 #define MULTIFD_FLAG_NOCOMP (0 << 1)
 #define MULTIFD_FLAG_ZLIB (1 << 1)
 #define MULTIFD_FLAG_ZSTD (2 << 1)
+#define MULTIFD_FLAG_QPL (4 << 1)
 
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
diff --git a/qapi/migration.json b/qapi/migration.json
index a351fd3714..f97bc3bb93 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -554,11 +554,16 @@
 #
 # @zstd: use zstd compression method.
 #
+# @qpl: use qpl compression method. Query Processing Library(qpl) is based on
+#   the deflate compression algorithm and use the Intel In-Memory Analytics
+#   Accelerator(IAA) accelerated compression and decompression. (Since 9.1)
+#
 # Since: 5.0
 ##
 { 'enum': 'MultiFDCompression',
   'data': [ 'none', 'zlib',
-{ 'name': 'zstd', 'if': 'CONFIG_ZSTD' } ] }
+{ 'name': 'zstd', 'if': 'CONFIG_ZSTD' },
+{ 'name': 'qpl', 'if': 'CONFIG_QPL' } ] }
 
 ##
 # @MigMode:
-- 
2.43.0




[PATCH v7 1/7] docs/migration: add qpl compression feature

2024-06-04 Thread Yuan Liu
add Intel Query Processing Library (QPL) compression method
introduction

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 docs/devel/migration/features.rst|   1 +
 docs/devel/migration/qpl-compression.rst | 262 +++
 2 files changed, 263 insertions(+)
 create mode 100644 docs/devel/migration/qpl-compression.rst

diff --git a/docs/devel/migration/features.rst 
b/docs/devel/migration/features.rst
index d5ca7b86d5..bc98b65075 100644
--- a/docs/devel/migration/features.rst
+++ b/docs/devel/migration/features.rst
@@ -12,3 +12,4 @@ Migration has plenty of features to support different use 
cases.
virtio
mapped-ram
CPR
+   qpl-compression
diff --git a/docs/devel/migration/qpl-compression.rst 
b/docs/devel/migration/qpl-compression.rst
new file mode 100644
index 00..13fb7a67b1
--- /dev/null
+++ b/docs/devel/migration/qpl-compression.rst
@@ -0,0 +1,262 @@
+===
+QPL Compression
+===
+The Intel Query Processing Library (Intel ``QPL``) is an open-source library to
+provide compression and decompression features and it is based on deflate
+compression algorithm (RFC 1951).
+
+The ``QPL`` compression relies on Intel In-Memory Analytics 
Accelerator(``IAA``)
+and Shared Virtual Memory(``SVM``) technology, they are new features supported
+from Intel 4th Gen Intel Xeon Scalable processors, codenamed Sapphire Rapids
+processor(``SPR``).
+
+For more ``QPL`` introduction, please refer to `QPL Introduction
+<https://intel.github.io/qpl/documentation/introduction_docs/introduction.html>`_
+
+QPL Compression Framework
+=
+
+::
+
+  ++   +--+
+  | MultiFD Thread |   |accel-config tool |
+  +---++   ++-+
+  | |
+  | |
+  |compress/decompress  |
+  +---++| Setup IAA
+  |  QPL library   || Resources
+  +---+---++|
+  |   | |
+  |   +-+---+
+  |   Open IAA  |
+  |   Devices +-+-+
+  |   |idxd driver|
+  |   +-+-+
+  | |
+  | |
+  |   +-+-+
+  +---+IAA Devices|
+  Submit jobs +---+
+  via enqcmd
+
+
+QPL Build And Installation
+--
+
+.. code-block:: shell
+
+  $git clone --recursive https://github.com/intel/qpl.git qpl
+  $mkdir qpl/build
+  $cd qpl/build
+  $cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr 
-DQPL_LIBRARY_TYPE=SHARED ..
+  $sudo cmake --build . --target install
+
+For more details about ``QPL`` installation, please refer to `QPL Installation
+<https://intel.github.io/qpl/documentation/get_started_docs/installation.html>`_
+
+IAA Device Management
+-
+
+The number of ``IAA`` devices will vary depending on the Xeon product model.
+On a ``SPR`` server, there can be a maximum of 8 ``IAA`` devices, with up to
+4 devices per socket.
+
+By default, all ``IAA`` devices are disabled and need to be configured and
+enabled by users manually.
+
+Check the number of devices through the following command
+
+.. code-block:: shell
+
+  #lspci -d 8086:0cfe
+  6a:02.0 System peripheral: Intel Corporation Device 0cfe
+  6f:02.0 System peripheral: Intel Corporation Device 0cfe
+  74:02.0 System peripheral: Intel Corporation Device 0cfe
+  79:02.0 System peripheral: Intel Corporation Device 0cfe
+  e7:02.0 System peripheral: Intel Corporation Device 0cfe
+  ec:02.0 System peripheral: Intel Corporation Device 0cfe
+  f1:02.0 System peripheral: Intel Corporation Device 0cfe
+  f6:02.0 System peripheral: Intel Corporation Device 0cfe
+
+IAA Device Configuration And Enabling
+^
+
+The ``accel-config`` tool is used to enable ``IAA`` devices and configure
+``IAA`` hardware resources(work queues and engines). One ``IAA`` device
+has 8 work queues and 8 processing engines, multiple engines can be assigned
+to a work queue via ``group`` attribute.
+
+For ``accel-config`` installation, please refer to `accel-config installation
+<https://github.com/intel/idxd-config>`_
+
+One example of configuring and enabling an ``IAA`` device.
+
+.. code-block:: shell
+
+  #accel-config config-engine iax1/engine1.0 -g 0
+  #accel-config config-engine iax1/engine1.1 -g 0
+  #accel-config config-engine iax1/engine1.2 -g 0
+  #accel-config config-engine iax1/engine1.3 -g 0
+  #accel-config config-engine iax1/engine1.4 -g 0
+  #accel-config config-engine iax1/engine1.5 -g 0
+  #accel-config config-engine iax1/engine1.6 -g 0
+  #accel-config config-engine iax1/engine1.7 -g 0
+  #accel-config config-wq iax1/wq1.0 -g 0 -s 128 -p 10 -b 1 -t 128 -m shared 
-y user -n app1 -d user
+  #accel-config enable-device iax1
+  #acce

[PATCH v7 5/7] migration/multifd: implement initialization of qpl compression

2024-06-04 Thread Yuan Liu
during initialization, a software job is allocated to each channel
for software path fallabck when the IAA hardware is unavailable or
the hardware job submission fails. If the IAA hardware is available,
multiple hardware jobs are allocated for batch processing.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 328 +++-
 1 file changed, 327 insertions(+), 1 deletion(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 056a68a060..6791a204d5 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -9,12 +9,338 @@
  * This work is licensed under the terms of the GNU GPL, version 2 or later.
  * See the COPYING file in the top-level directory.
  */
+
 #include "qemu/osdep.h"
 #include "qemu/module.h"
+#include "qapi/error.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+typedef struct {
+/* the QPL hardware path job */
+qpl_job *job;
+/* indicates if fallback to software path is required */
+bool fallback_sw_path;
+/* output data from the software path */
+uint8_t *sw_output;
+/* output data length from the software path */
+uint32_t sw_output_len;
+} QplHwJob;
+
+typedef struct {
+/* array of hardware jobs, the number of jobs equals the number pages */
+QplHwJob *hw_jobs;
+/* the QPL software job for the slow path and software fallback */
+qpl_job *sw_job;
+/* the number of pages that the QPL needs to process at one time */
+uint32_t page_num;
+/* array of compressed page buffers */
+uint8_t *zbuf;
+/* array of compressed page lengths */
+uint32_t *zlen;
+/* the status of the hardware device */
+bool hw_avail;
+} QplData;
+
+/**
+ * check_hw_avail: check if IAA hardware is available
+ *
+ * If the IAA hardware does not exist or is unavailable,
+ * the QPL hardware job initialization will fail.
+ *
+ * Returns true if IAA hardware is available, otherwise false.
+ *
+ * @job_size: indicates the hardware job size if hardware is available
+ */
+static bool check_hw_avail(uint32_t *job_size)
+{
+qpl_path_t path = qpl_path_hardware;
+uint32_t size = 0;
+qpl_job *job;
+
+if (qpl_get_job_size(path, ) != QPL_STS_OK) {
+return false;
+}
+assert(size > 0);
+job = g_malloc0(size);
+if (qpl_init_job(path, job) != QPL_STS_OK) {
+g_free(job);
+return false;
+}
+g_free(job);
+*job_size = size;
+return true;
+}
+
+/**
+ * multifd_qpl_free_sw_job: clean up software job
+ *
+ * Free the software job resources.
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_free_sw_job(QplData *qpl)
+{
+assert(qpl);
+if (qpl->sw_job) {
+qpl_fini_job(qpl->sw_job);
+g_free(qpl->sw_job);
+qpl->sw_job = NULL;
+}
+}
+
+/**
+ * multifd_qpl_free_jobs: clean up hardware jobs
+ *
+ * Free all hardware job resources.
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_free_hw_job(QplData *qpl)
+{
+assert(qpl);
+if (qpl->hw_jobs) {
+for (int i = 0; i < qpl->page_num; i++) {
+qpl_fini_job(qpl->hw_jobs[i].job);
+g_free(qpl->hw_jobs[i].job);
+qpl->hw_jobs[i].job = NULL;
+}
+g_free(qpl->hw_jobs);
+qpl->hw_jobs = NULL;
+}
+}
+
+/**
+ * multifd_qpl_init_sw_job: initialize a software job
+ *
+ * Use the QPL software path to initialize a job
+ *
+ * @qpl: pointer to the QplData structure
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_init_sw_job(QplData *qpl, Error **errp)
+{
+qpl_path_t path = qpl_path_software;
+uint32_t size = 0;
+qpl_job *job = NULL;
+qpl_status status;
+
+status = qpl_get_job_size(path, );
+if (status != QPL_STS_OK) {
+error_setg(errp, "qpl_get_job_size failed with error %d", status);
+return -1;
+}
+job = g_malloc0(size);
+status = qpl_init_job(path, job);
+if (status != QPL_STS_OK) {
+error_setg(errp, "qpl_init_job failed with error %d", status);
+g_free(job);
+return -1;
+}
+qpl->sw_job = job;
+return 0;
+}
+
+/**
+ * multifd_qpl_init_jobs: initialize hardware jobs
+ *
+ * Use the QPL hardware path to initialize jobs
+ *
+ * @qpl: pointer to the QplData structure
+ * @size: the size of QPL hardware path job
+ * @errp: pointer to an error
+ */
+static void multifd_qpl_init_hw_job(QplData *qpl, uint32_t size, Error **errp)
+{
+qpl_path_t path = qpl_path_hardware;
+qpl_job *job = NULL;
+qpl_status status;
+
+qpl->hw_jobs = g_new0(QplHwJob, qpl->page_num);
+for (int i = 0; i < qpl->page_num; i++) {
+job = g_malloc0(size);
+status = qpl_init_job(path, job);
+/* the job initialization should succeed after check_hw_avail */
+assert(status == QPL_STS_O

[PATCH v7 7/7] tests/migration-test: add qpl compression test

2024-06-04 Thread Yuan Liu
add qpl to compression method test for multifd migration

the qpl compression supports software path and hardware
path(IAA device), and the hardware path is used first by
default. If the hardware path is unavailable, it will
automatically fallback to the software path for testing.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Peter Xu 
---
 tests/qtest/migration-test.c | 24 
 1 file changed, 24 insertions(+)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index b7e3406471..ef0c3f5e28 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -2661,6 +2661,15 @@ test_migrate_precopy_tcp_multifd_zstd_start(QTestState 
*from,
 }
 #endif /* CONFIG_ZSTD */
 
+#ifdef CONFIG_QPL
+static void *
+test_migrate_precopy_tcp_multifd_qpl_start(QTestState *from,
+QTestState *to)
+{
+return test_migrate_precopy_tcp_multifd_start_common(from, to, "qpl");
+}
+#endif /* CONFIG_QPL */
+
 static void test_multifd_tcp_uri_none(void)
 {
 MigrateCommon args = {
@@ -2741,6 +2750,17 @@ static void test_multifd_tcp_zstd(void)
 }
 #endif
 
+#ifdef CONFIG_QPL
+static void test_multifd_tcp_qpl(void)
+{
+MigrateCommon args = {
+.listen_uri = "defer",
+.start_hook = test_migrate_precopy_tcp_multifd_qpl_start,
+};
+test_precopy_common();
+}
+#endif
+
 #ifdef CONFIG_GNUTLS
 static void *
 test_migrate_multifd_tcp_tls_psk_start_match(QTestState *from,
@@ -3626,6 +3646,10 @@ int main(int argc, char **argv)
 migration_test_add("/migration/multifd/tcp/plain/zstd",
test_multifd_tcp_zstd);
 #endif
+#ifdef CONFIG_QPL
+migration_test_add("/migration/multifd/tcp/plain/qpl",
+   test_multifd_tcp_qpl);
+#endif
 #ifdef CONFIG_GNUTLS
 migration_test_add("/migration/multifd/tcp/tls/psk/match",
test_multifd_tcp_tls_psk_match);
-- 
2.43.0




[PATCH v7 2/7] migration/multifd: put IOV initialization into compression method

2024-06-04 Thread Yuan Liu
Different compression methods may require different numbers of IOVs.
Based on streaming compression of zlib and zstd, all pages will be
compressed to a data block, so two IOVs are needed for packet header
and compressed data block.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Fabiano Rosas 
Reviewed-by: Peter Xu 
---
 migration/multifd-zlib.c |  7 +++
 migration/multifd-zstd.c |  8 +++-
 migration/multifd.c  | 22 --
 3 files changed, 26 insertions(+), 11 deletions(-)

diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 737a9645d2..2ced69487e 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -70,6 +70,10 @@ static int zlib_send_setup(MultiFDSendParams *p, Error 
**errp)
 goto err_free_zbuff;
 }
 p->compress_data = z;
+
+/* Needs 2 IOVs, one for packet header and one for compressed data */
+p->iov = g_new0(struct iovec, 2);
+
 return 0;
 
 err_free_zbuff:
@@ -101,6 +105,9 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error 
**errp)
 z->buf = NULL;
 g_free(p->compress_data);
 p->compress_data = NULL;
+
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index 256858df0a..ca17b7e310 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -52,7 +52,6 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
 struct zstd_data *z = g_new0(struct zstd_data, 1);
 int res;
 
-p->compress_data = z;
 z->zcs = ZSTD_createCStream();
 if (!z->zcs) {
 g_free(z);
@@ -77,6 +76,10 @@ static int zstd_send_setup(MultiFDSendParams *p, Error 
**errp)
 error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
 return -1;
 }
+p->compress_data = z;
+
+/* Needs 2 IOVs, one for packet header and one for compressed data */
+p->iov = g_new0(struct iovec, 2);
 return 0;
 }
 
@@ -98,6 +101,9 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error 
**errp)
 z->zbuff = NULL;
 g_free(p->compress_data);
 p->compress_data = NULL;
+
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
diff --git a/migration/multifd.c b/migration/multifd.c
index f317bff077..d82885fdbb 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -137,6 +137,13 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error 
**errp)
 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
 }
 
+if (multifd_use_packets()) {
+/* We need one extra place for the packet header */
+p->iov = g_new0(struct iovec, p->page_count + 1);
+} else {
+p->iov = g_new0(struct iovec, p->page_count);
+}
+
 return 0;
 }
 
@@ -150,6 +157,8 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error 
**errp)
  */
 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
 {
+g_free(p->iov);
+p->iov = NULL;
 return;
 }
 
@@ -228,6 +237,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error 
**errp)
  */
 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
 {
+p->iov = g_new0(struct iovec, p->page_count);
 return 0;
 }
 
@@ -240,6 +250,8 @@ static int nocomp_recv_setup(MultiFDRecvParams *p, Error 
**errp)
  */
 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
 {
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
@@ -783,8 +795,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams 
*p, Error **errp)
 p->packet_len = 0;
 g_free(p->packet);
 p->packet = NULL;
-g_free(p->iov);
-p->iov = NULL;
 multifd_send_state->ops->send_cleanup(p, errp);
 
 return *errp == NULL;
@@ -1179,11 +1189,6 @@ bool multifd_send_setup(void)
 p->packet = g_malloc0(p->packet_len);
 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
-
-/* We need one extra place for the packet header */
-p->iov = g_new0(struct iovec, page_count + 1);
-} else {
-p->iov = g_new0(struct iovec, page_count);
 }
 p->name = g_strdup_printf("multifdsend_%d", i);
 p->page_size = qemu_target_page_size();
@@ -1353,8 +1358,6 @@ static void 
multifd_recv_cleanup_channel(MultiFDRecvParams *p)
 p->packet_len = 0;
 g_free(p->packet);
 p->packet = NULL;
-g_free(p->iov);
-p->iov = NULL;
 g_free(p->normal);
 p->normal = NULL;
 g_free(p->zero);
@@ -1602,7 +1605,6 @@ int multifd_recv_setup(Error **errp)
 p->packet = g_malloc0(p->packet_len);
 }
 p->name = g_strdup_printf("multifdrecv_%d", i);
-p->iov = g_new0(struct iovec, page_count);
 p->normal = g_new0(ram_addr_t, page_count);
 p->zero = g_new0(ram_addr_t, page_count);
 p->page_count = page_count;
-- 
2.43.0




[PATCH v7 6/7] migration/multifd: implement qpl compression and decompression

2024-06-04 Thread Yuan Liu
QPL compression and decompression will use IAA hardware first.
If IAA hardware is not available, it will automatically fall
back to QPL software path, if the software job also fails,
the uncompressed page is sent directly.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 412 +++-
 1 file changed, 408 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 6791a204d5..18b3384bd5 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,9 +13,14 @@
 #include "qemu/osdep.h"
 #include "qemu/module.h"
 #include "qapi/error.h"
+#include "qapi/qapi-types-migration.h"
+#include "exec/ramblock.h"
 #include "multifd.h"
 #include "qpl/qpl.h"
 
+/* Maximum number of retries to resubmit a job if IAA work queues are full */
+#define MAX_SUBMIT_RETRY_NUM (3)
+
 typedef struct {
 /* the QPL hardware path job */
 qpl_job *job;
@@ -260,6 +265,219 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams 
*p, Error **errp)
 p->iov = NULL;
 }
 
+/**
+ * multifd_qpl_prepare_job: prepare the job
+ *
+ * Set the QPL job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @is_compression: indicates compression and decompression
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the length of the output data
+ */
+static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
+uint8_t *input, uint32_t input_len,
+uint8_t *output, uint32_t output_len)
+{
+job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+job->next_in_ptr = input;
+job->next_out_ptr = output;
+job->available_in = input_len;
+job->available_out = output_len;
+job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+/* only supports compression level 1 */
+job->level = 1;
+}
+
+/**
+ * multifd_qpl_prepare_job: prepare the compression job
+ *
+ * Set the compression job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the length of the output data
+ */
+static void multifd_qpl_prepare_comp_job(qpl_job *job, uint8_t *input,
+ uint32_t input_len, uint8_t *output,
+ uint32_t output_len)
+{
+multifd_qpl_prepare_job(job, true, input, input_len, output, output_len);
+}
+
+/**
+ * multifd_qpl_prepare_job: prepare the decompression job
+ *
+ * Set the decompression job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the length of the output data
+ */
+static void multifd_qpl_prepare_decomp_job(qpl_job *job, uint8_t *input,
+   uint32_t input_len, uint8_t *output,
+   uint32_t output_len)
+{
+multifd_qpl_prepare_job(job, false, input, input_len, output, output_len);
+}
+
+/**
+ * multifd_qpl_fill_iov: fill in the IOV
+ *
+ * Fill in the QPL packet IOV
+ *
+ * @p: Params for the channel being used
+ * @data: pointer to the IOV data
+ * @len: The length of the IOV data
+ */
+static void multifd_qpl_fill_iov(MultiFDSendParams *p, uint8_t *data,
+ uint32_t len)
+{
+p->iov[p->iovs_num].iov_base = data;
+p->iov[p->iovs_num].iov_len = len;
+p->iovs_num++;
+p->next_packet_size += len;
+}
+
+/**
+ * multifd_qpl_fill_packet: fill the compressed page into the QPL packet
+ *
+ * Fill the compressed page length and IOV into the QPL packet
+ *
+ * @idx: The index of the compressed length array
+ * @p: Params for the channel being used
+ * @data: pointer to the compressed page buffer
+ * @len: The length of the compressed page
+ */
+static void multifd_qpl_fill_packet(uint32_t idx, MultiFDSendParams *p,
+uint8_t *data, uint32_t len)
+{
+QplData *qpl = p->compress_data;
+
+qpl->zlen[idx] = cpu_to_be32(len);
+multifd_qpl_fill_iov(p, data, len);
+}
+
+/**
+ * multifd_qpl_submit_job: submit a job to the hardware
+ *
+ * Submit a QPL hardware job to the IAA device
+ *
+ * Returns true if the job is submitted successfully, otherwise false.
+ *
+ * @job: pointer to the qpl_job structure
+ */
+static bool multifd_qpl_submit_job(qpl_job *job)
+{
+qpl_status status;
+uint32_t num = 0;
+
+retry:
+status = qpl_submit_job(job);
+if (status ==

[PATCH v8 1/7] docs/migration: add qpl compression feature

2024-06-11 Thread Yuan Liu
add Intel Query Processing Library (QPL) compression method
introduction

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Fabiano Rosas 
Acked-by: Peter Xu 
---
 docs/devel/migration/features.rst|   1 +
 docs/devel/migration/qpl-compression.rst | 260 +++
 2 files changed, 261 insertions(+)
 create mode 100644 docs/devel/migration/qpl-compression.rst

diff --git a/docs/devel/migration/features.rst 
b/docs/devel/migration/features.rst
index d5ca7b86d5..bc98b65075 100644
--- a/docs/devel/migration/features.rst
+++ b/docs/devel/migration/features.rst
@@ -12,3 +12,4 @@ Migration has plenty of features to support different use 
cases.
virtio
mapped-ram
CPR
+   qpl-compression
diff --git a/docs/devel/migration/qpl-compression.rst 
b/docs/devel/migration/qpl-compression.rst
new file mode 100644
index 00..990992d786
--- /dev/null
+++ b/docs/devel/migration/qpl-compression.rst
@@ -0,0 +1,260 @@
+===
+QPL Compression
+===
+The Intel Query Processing Library (Intel ``QPL``) is an open-source library to
+provide compression and decompression features and it is based on deflate
+compression algorithm (RFC 1951).
+
+The ``QPL`` compression relies on Intel In-Memory Analytics 
Accelerator(``IAA``)
+and Shared Virtual Memory(``SVM``) technology, they are new features supported
+from Intel 4th Gen Intel Xeon Scalable processors, codenamed Sapphire Rapids
+processor(``SPR``).
+
+For more ``QPL`` introduction, please refer to `QPL Introduction
+<https://intel.github.io/qpl/documentation/introduction_docs/introduction.html>`_
+
+QPL Compression Framework
+=
+
+::
+
+  ++   +--+
+  | MultiFD Thread |   |accel-config tool |
+  +---++   ++-+
+  | |
+  | |
+  |compress/decompress  |
+  +---++| Setup IAA
+  |  QPL library   || Resources
+  +---+---++|
+  |   | |
+  |   +-+---+
+  |   Open IAA  |
+  |   Devices +-+-+
+  |   |idxd driver|
+  |   +-+-+
+  | |
+  | |
+  |   +-+-+
+  +---+IAA Devices|
+  Submit jobs +---+
+  via enqcmd
+
+
+QPL Build And Installation
+--
+
+.. code-block:: shell
+
+  $git clone --recursive https://github.com/intel/qpl.git qpl
+  $mkdir qpl/build
+  $cd qpl/build
+  $cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr 
-DQPL_LIBRARY_TYPE=SHARED ..
+  $sudo cmake --build . --target install
+
+For more details about ``QPL`` installation, please refer to `QPL Installation
+<https://intel.github.io/qpl/documentation/get_started_docs/installation.html>`_
+
+IAA Device Management
+-
+
+The number of ``IAA`` devices will vary depending on the Xeon product model.
+On a ``SPR`` server, there can be a maximum of 8 ``IAA`` devices, with up to
+4 devices per socket.
+
+By default, all ``IAA`` devices are disabled and need to be configured and
+enabled by users manually.
+
+Check the number of devices through the following command
+
+.. code-block:: shell
+
+  #lspci -d 8086:0cfe
+  6a:02.0 System peripheral: Intel Corporation Device 0cfe
+  6f:02.0 System peripheral: Intel Corporation Device 0cfe
+  74:02.0 System peripheral: Intel Corporation Device 0cfe
+  79:02.0 System peripheral: Intel Corporation Device 0cfe
+  e7:02.0 System peripheral: Intel Corporation Device 0cfe
+  ec:02.0 System peripheral: Intel Corporation Device 0cfe
+  f1:02.0 System peripheral: Intel Corporation Device 0cfe
+  f6:02.0 System peripheral: Intel Corporation Device 0cfe
+
+IAA Device Configuration And Enabling
+^
+
+The ``accel-config`` tool is used to enable ``IAA`` devices and configure
+``IAA`` hardware resources(work queues and engines). One ``IAA`` device
+has 8 work queues and 8 processing engines, multiple engines can be assigned
+to a work queue via ``group`` attribute.
+
+For ``accel-config`` installation, please refer to `accel-config installation
+<https://github.com/intel/idxd-config>`_
+
+One example of configuring and enabling an ``IAA`` device.
+
+.. code-block:: shell
+
+  #accel-config config-engine iax1/engine1.0 -g 0
+  #accel-config config-engine iax1/engine1.1 -g 0
+  #accel-config config-engine iax1/engine1.2 -g 0
+  #accel-config config-engine iax1/engine1.3 -g 0
+  #accel-config config-engine iax1/engine1.4 -g 0
+  #accel-config config-engine iax1/engine1.5 -g 0
+  #accel-config config-engine iax1/engine1.6 -g 0
+  #accel-config config-engine iax1/engine1.7 -g 0
+  #accel-config config-wq iax1/wq1.0 -g 0 -s 128 -p 10 -b 1 -t 128 -m shared 
-y user -n app1 -d use

[PATCH v8 4/7] migration/multifd: add qpl compression method

2024-06-11 Thread Yuan Liu
add the Query Processing Library (QPL) compression method

Introduce the qpl as a new multifd migration compression method, it can
use In-Memory Analytics Accelerator(IAA) to accelerate compression and
decompression, which can not only reduce network bandwidth requirement
but also reduce host compression and decompression CPU overhead.

How to enable qpl compression during migration:
migrate_set_parameter multifd-compression qpl

There is no qpl compression level parameter added since it only supports
level one, users do not need to specify the qpl compression level.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Peter Xu 
Reviewed-by: Fabiano Rosas 
---
 hw/core/qdev-properties-system.c |  2 +-
 migration/meson.build|  1 +
 migration/multifd-qpl.c  | 20 
 migration/multifd.h  |  1 +
 qapi/migration.json  |  7 ++-
 5 files changed, 29 insertions(+), 2 deletions(-)
 create mode 100644 migration/multifd-qpl.c

diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index d79d6f4b53..6ccd7224f6 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -659,7 +659,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
 const PropertyInfo qdev_prop_multifd_compression = {
 .name = "MultiFDCompression",
 .description = "multifd_compression values, "
-   "none/zlib/zstd",
+   "none/zlib/zstd/qpl",
 .enum_table = _lookup,
 .get = qdev_propinfo_get_enum,
 .set = qdev_propinfo_set_enum,
diff --git a/migration/meson.build b/migration/meson.build
index bdc3244bce..5f146fe8a9 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -39,6 +39,7 @@ endif
 
 system_ss.add(when: rdma, if_true: files('rdma.c'))
 system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
+system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))
 
 specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
 if_true: files('ram.c',
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
new file mode 100644
index 00..056a68a060
--- /dev/null
+++ b/migration/multifd-qpl.c
@@ -0,0 +1,20 @@
+/*
+ * Multifd qpl compression accelerator implementation
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *
+ * Authors:
+ *  Yuan Liu
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include "qemu/module.h"
+
+static void multifd_qpl_register(void)
+{
+/* noop */
+}
+
+migration_init(multifd_qpl_register);
diff --git a/migration/multifd.h b/migration/multifd.h
index c9d9b09239..5b7d9b15f8 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -40,6 +40,7 @@ MultiFDRecvData *multifd_get_recv_data(void);
 #define MULTIFD_FLAG_NOCOMP (0 << 1)
 #define MULTIFD_FLAG_ZLIB (1 << 1)
 #define MULTIFD_FLAG_ZSTD (2 << 1)
+#define MULTIFD_FLAG_QPL (4 << 1)
 
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
diff --git a/qapi/migration.json b/qapi/migration.json
index a351fd3714..f97bc3bb93 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -554,11 +554,16 @@
 #
 # @zstd: use zstd compression method.
 #
+# @qpl: use qpl compression method. Query Processing Library(qpl) is based on
+#   the deflate compression algorithm and use the Intel In-Memory Analytics
+#   Accelerator(IAA) accelerated compression and decompression. (Since 9.1)
+#
 # Since: 5.0
 ##
 { 'enum': 'MultiFDCompression',
   'data': [ 'none', 'zlib',
-{ 'name': 'zstd', 'if': 'CONFIG_ZSTD' } ] }
+{ 'name': 'zstd', 'if': 'CONFIG_ZSTD' },
+{ 'name': 'qpl', 'if': 'CONFIG_QPL' } ] }
 
 ##
 # @MigMode:
-- 
2.43.0




[PATCH v8 3/7] configure: add --enable-qpl build option

2024-06-11 Thread Yuan Liu
add --enable-qpl and --disable-qpl options to enable and disable
the QPL compression method for multifd migration.

The Query Processing Library (QPL) is an open-source library
that supports data compression and decompression features. It
is based on the deflate compression algorithm and use Intel
In-Memory Analytics Accelerator(IAA) hardware for compression
and decompression acceleration.

For more live migration with IAA, please refer to the document
docs/devel/migration/qpl-compression.rst

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Fabiano Rosas 
---
 meson.build   | 8 
 meson_options.txt | 2 ++
 scripts/meson-buildoptions.sh | 3 +++
 3 files changed, 13 insertions(+)

diff --git a/meson.build b/meson.build
index ec59effca2..43f260f9eb 100644
--- a/meson.build
+++ b/meson.build
@@ -1201,6 +1201,12 @@ if not get_option('zstd').auto() or have_block
 required: get_option('zstd'),
 method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto() or have_system
+  qpl = dependency('qpl', version: '>=1.5.0',
+required: get_option('qpl'),
+method: 'pkg-config')
+endif
 virgl = not_found
 
 have_vhost_user_gpu = have_tools and host_os == 'linux' and pixman.found()
@@ -2335,6 +2341,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', 
has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -4449,6 +4456,7 @@ summary_info += {'snappy support':snappy}
 summary_info += {'bzip2 support': libbzip2}
 summary_info += {'lzfse support': liblzfse}
 summary_info += {'zstd support':  zstd}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'NUMA host support': numa}
 summary_info += {'capstone':  capstone}
 summary_info += {'libpmem support':   libpmem}
diff --git a/meson_options.txt b/meson_options.txt
index 4c1583eb40..dd680a5faf 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -259,6 +259,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+   description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 6ce5a8b72a..73ae8cedfc 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -220,6 +220,7 @@ meson_options_help() {
   printf "%s\n" '  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon   xkbcommon support'
   printf "%s\n" '  zstdzstd compression support'
+  printf "%s\n" '  qpl Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -558,6 +559,8 @@ _meson_option_parse() {
 --disable-xkbcommon) printf "%s" -Dxkbcommon=disabled ;;
 --enable-zstd) printf "%s" -Dzstd=enabled ;;
 --disable-zstd) printf "%s" -Dzstd=disabled ;;
+--enable-qpl) printf "%s" -Dqpl=enabled ;;
+--disable-qpl) printf "%s" -Dqpl=disabled ;;
 *) return 1 ;;
   esac
 }
-- 
2.43.0




[PATCH v8 7/7] tests/migration-test: add qpl compression test

2024-06-11 Thread Yuan Liu
add qpl to compression method test for multifd migration

the qpl compression supports software path and hardware
path(IAA device), and the hardware path is used first by
default. If the hardware path is unavailable, it will
automatically fallback to the software path for testing.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Peter Xu 
Reviewed-by: Fabiano Rosas 
---
 tests/qtest/migration-test.c | 24 
 1 file changed, 24 insertions(+)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index b7e3406471..ef0c3f5e28 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -2661,6 +2661,15 @@ test_migrate_precopy_tcp_multifd_zstd_start(QTestState 
*from,
 }
 #endif /* CONFIG_ZSTD */
 
+#ifdef CONFIG_QPL
+static void *
+test_migrate_precopy_tcp_multifd_qpl_start(QTestState *from,
+QTestState *to)
+{
+return test_migrate_precopy_tcp_multifd_start_common(from, to, "qpl");
+}
+#endif /* CONFIG_QPL */
+
 static void test_multifd_tcp_uri_none(void)
 {
 MigrateCommon args = {
@@ -2741,6 +2750,17 @@ static void test_multifd_tcp_zstd(void)
 }
 #endif
 
+#ifdef CONFIG_QPL
+static void test_multifd_tcp_qpl(void)
+{
+MigrateCommon args = {
+.listen_uri = "defer",
+.start_hook = test_migrate_precopy_tcp_multifd_qpl_start,
+};
+test_precopy_common();
+}
+#endif
+
 #ifdef CONFIG_GNUTLS
 static void *
 test_migrate_multifd_tcp_tls_psk_start_match(QTestState *from,
@@ -3626,6 +3646,10 @@ int main(int argc, char **argv)
 migration_test_add("/migration/multifd/tcp/plain/zstd",
test_multifd_tcp_zstd);
 #endif
+#ifdef CONFIG_QPL
+migration_test_add("/migration/multifd/tcp/plain/qpl",
+   test_multifd_tcp_qpl);
+#endif
 #ifdef CONFIG_GNUTLS
 migration_test_add("/migration/multifd/tcp/tls/psk/match",
test_multifd_tcp_tls_psk_match);
-- 
2.43.0




[PATCH v8 2/7] migration/multifd: put IOV initialization into compression method

2024-06-11 Thread Yuan Liu
Different compression methods may require different numbers of IOVs.
Based on streaming compression of zlib and zstd, all pages will be
compressed to a data block, so two IOVs are needed for packet header
and compressed data block.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Fabiano Rosas 
Reviewed-by: Peter Xu 
---
 migration/multifd-zlib.c |  7 +++
 migration/multifd-zstd.c |  8 +++-
 migration/multifd.c  | 22 --
 3 files changed, 26 insertions(+), 11 deletions(-)

diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 737a9645d2..2ced69487e 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -70,6 +70,10 @@ static int zlib_send_setup(MultiFDSendParams *p, Error 
**errp)
 goto err_free_zbuff;
 }
 p->compress_data = z;
+
+/* Needs 2 IOVs, one for packet header and one for compressed data */
+p->iov = g_new0(struct iovec, 2);
+
 return 0;
 
 err_free_zbuff:
@@ -101,6 +105,9 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error 
**errp)
 z->buf = NULL;
 g_free(p->compress_data);
 p->compress_data = NULL;
+
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index 256858df0a..ca17b7e310 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -52,7 +52,6 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
 struct zstd_data *z = g_new0(struct zstd_data, 1);
 int res;
 
-p->compress_data = z;
 z->zcs = ZSTD_createCStream();
 if (!z->zcs) {
 g_free(z);
@@ -77,6 +76,10 @@ static int zstd_send_setup(MultiFDSendParams *p, Error 
**errp)
 error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
 return -1;
 }
+p->compress_data = z;
+
+/* Needs 2 IOVs, one for packet header and one for compressed data */
+p->iov = g_new0(struct iovec, 2);
 return 0;
 }
 
@@ -98,6 +101,9 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error 
**errp)
 z->zbuff = NULL;
 g_free(p->compress_data);
 p->compress_data = NULL;
+
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
diff --git a/migration/multifd.c b/migration/multifd.c
index f317bff077..d82885fdbb 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -137,6 +137,13 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error 
**errp)
 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
 }
 
+if (multifd_use_packets()) {
+/* We need one extra place for the packet header */
+p->iov = g_new0(struct iovec, p->page_count + 1);
+} else {
+p->iov = g_new0(struct iovec, p->page_count);
+}
+
 return 0;
 }
 
@@ -150,6 +157,8 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error 
**errp)
  */
 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
 {
+g_free(p->iov);
+p->iov = NULL;
 return;
 }
 
@@ -228,6 +237,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error 
**errp)
  */
 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
 {
+p->iov = g_new0(struct iovec, p->page_count);
 return 0;
 }
 
@@ -240,6 +250,8 @@ static int nocomp_recv_setup(MultiFDRecvParams *p, Error 
**errp)
  */
 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
 {
+g_free(p->iov);
+p->iov = NULL;
 }
 
 /**
@@ -783,8 +795,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams 
*p, Error **errp)
 p->packet_len = 0;
 g_free(p->packet);
 p->packet = NULL;
-g_free(p->iov);
-p->iov = NULL;
 multifd_send_state->ops->send_cleanup(p, errp);
 
 return *errp == NULL;
@@ -1179,11 +1189,6 @@ bool multifd_send_setup(void)
 p->packet = g_malloc0(p->packet_len);
 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
-
-/* We need one extra place for the packet header */
-p->iov = g_new0(struct iovec, page_count + 1);
-} else {
-p->iov = g_new0(struct iovec, page_count);
 }
 p->name = g_strdup_printf("multifdsend_%d", i);
 p->page_size = qemu_target_page_size();
@@ -1353,8 +1358,6 @@ static void 
multifd_recv_cleanup_channel(MultiFDRecvParams *p)
 p->packet_len = 0;
 g_free(p->packet);
 p->packet = NULL;
-g_free(p->iov);
-p->iov = NULL;
 g_free(p->normal);
 p->normal = NULL;
 g_free(p->zero);
@@ -1602,7 +1605,6 @@ int multifd_recv_setup(Error **errp)
 p->packet = g_malloc0(p->packet_len);
 }
 p->name = g_strdup_printf("multifdrecv_%d", i);
-p->iov = g_new0(struct iovec, page_count);
 p->normal = g_new0(ram_addr_t, page_count);
 p->zero = g_new0(ram_addr_t, page_count);
 p->page_count = page_count;
-- 
2.43.0




[PATCH v8 5/7] migration/multifd: implement initialization of qpl compression

2024-06-11 Thread Yuan Liu
during initialization, a software job is allocated to each channel
for software path fallabck when the IAA hardware is unavailable or
the hardware job submission fails. If the IAA hardware is available,
multiple hardware jobs are allocated for batch processing.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
Reviewed-by: Fabiano Rosas 
---
 migration/multifd-qpl.c | 328 +++-
 1 file changed, 327 insertions(+), 1 deletion(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 056a68a060..6791a204d5 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -9,12 +9,338 @@
  * This work is licensed under the terms of the GNU GPL, version 2 or later.
  * See the COPYING file in the top-level directory.
  */
+
 #include "qemu/osdep.h"
 #include "qemu/module.h"
+#include "qapi/error.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+typedef struct {
+/* the QPL hardware path job */
+qpl_job *job;
+/* indicates if fallback to software path is required */
+bool fallback_sw_path;
+/* output data from the software path */
+uint8_t *sw_output;
+/* output data length from the software path */
+uint32_t sw_output_len;
+} QplHwJob;
+
+typedef struct {
+/* array of hardware jobs, the number of jobs equals the number pages */
+QplHwJob *hw_jobs;
+/* the QPL software job for the slow path and software fallback */
+qpl_job *sw_job;
+/* the number of pages that the QPL needs to process at one time */
+uint32_t page_num;
+/* array of compressed page buffers */
+uint8_t *zbuf;
+/* array of compressed page lengths */
+uint32_t *zlen;
+/* the status of the hardware device */
+bool hw_avail;
+} QplData;
+
+/**
+ * check_hw_avail: check if IAA hardware is available
+ *
+ * If the IAA hardware does not exist or is unavailable,
+ * the QPL hardware job initialization will fail.
+ *
+ * Returns true if IAA hardware is available, otherwise false.
+ *
+ * @job_size: indicates the hardware job size if hardware is available
+ */
+static bool check_hw_avail(uint32_t *job_size)
+{
+qpl_path_t path = qpl_path_hardware;
+uint32_t size = 0;
+qpl_job *job;
+
+if (qpl_get_job_size(path, ) != QPL_STS_OK) {
+return false;
+}
+assert(size > 0);
+job = g_malloc0(size);
+if (qpl_init_job(path, job) != QPL_STS_OK) {
+g_free(job);
+return false;
+}
+g_free(job);
+*job_size = size;
+return true;
+}
+
+/**
+ * multifd_qpl_free_sw_job: clean up software job
+ *
+ * Free the software job resources.
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_free_sw_job(QplData *qpl)
+{
+assert(qpl);
+if (qpl->sw_job) {
+qpl_fini_job(qpl->sw_job);
+g_free(qpl->sw_job);
+qpl->sw_job = NULL;
+}
+}
+
+/**
+ * multifd_qpl_free_jobs: clean up hardware jobs
+ *
+ * Free all hardware job resources.
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_free_hw_job(QplData *qpl)
+{
+assert(qpl);
+if (qpl->hw_jobs) {
+for (int i = 0; i < qpl->page_num; i++) {
+qpl_fini_job(qpl->hw_jobs[i].job);
+g_free(qpl->hw_jobs[i].job);
+qpl->hw_jobs[i].job = NULL;
+}
+g_free(qpl->hw_jobs);
+qpl->hw_jobs = NULL;
+}
+}
+
+/**
+ * multifd_qpl_init_sw_job: initialize a software job
+ *
+ * Use the QPL software path to initialize a job
+ *
+ * @qpl: pointer to the QplData structure
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_init_sw_job(QplData *qpl, Error **errp)
+{
+qpl_path_t path = qpl_path_software;
+uint32_t size = 0;
+qpl_job *job = NULL;
+qpl_status status;
+
+status = qpl_get_job_size(path, );
+if (status != QPL_STS_OK) {
+error_setg(errp, "qpl_get_job_size failed with error %d", status);
+return -1;
+}
+job = g_malloc0(size);
+status = qpl_init_job(path, job);
+if (status != QPL_STS_OK) {
+error_setg(errp, "qpl_init_job failed with error %d", status);
+g_free(job);
+return -1;
+}
+qpl->sw_job = job;
+return 0;
+}
+
+/**
+ * multifd_qpl_init_jobs: initialize hardware jobs
+ *
+ * Use the QPL hardware path to initialize jobs
+ *
+ * @qpl: pointer to the QplData structure
+ * @size: the size of QPL hardware path job
+ * @errp: pointer to an error
+ */
+static void multifd_qpl_init_hw_job(QplData *qpl, uint32_t size, Error **errp)
+{
+qpl_path_t path = qpl_path_hardware;
+qpl_job *job = NULL;
+qpl_status status;
+
+qpl->hw_jobs = g_new0(QplHwJob, qpl->page_num);
+for (int i = 0; i < qpl->page_num; i++) {
+job = g_malloc0(size);
+status = qpl_init_job(path, job);
+/* the job initialization should succeed after check_hw_avail */
+   

[PATCH v8 6/7] migration/multifd: implement qpl compression and decompression

2024-06-11 Thread Yuan Liu
QPL compression and decompression will use IAA hardware path if the IAA
hardware is available. Otherwise the QPL library software path is used.

The hardware path will automatically fall back to QPL software path if
the IAA queues are busy. In some scenarios, this may happen frequently,
such as configuring 4 channels but only one IAA device is available. In
the case of insufficient IAA hardware resources, retry and fallback can
help optimize performance:

 1. Retry + SW fallback:
total time: 14649 ms
downtime: 25 ms
throughput: 17666.57 mbps
pages-per-second: 1509647

 2. No fallback, always wait for work queues to become available
total time: 18381 ms
downtime: 25 ms
throughput: 13698.65 mbps
pages-per-second: 859607

If both the hardware and software paths fail, the uncompressed page is
sent directly.

Signed-off-by: Yuan Liu 
Reviewed-by: Nanhai Zou 
---
 migration/multifd-qpl.c | 424 +++-
 1 file changed, 420 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 6791a204d5..9265098ee7 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,9 +13,14 @@
 #include "qemu/osdep.h"
 #include "qemu/module.h"
 #include "qapi/error.h"
+#include "qapi/qapi-types-migration.h"
+#include "exec/ramblock.h"
 #include "multifd.h"
 #include "qpl/qpl.h"
 
+/* Maximum number of retries to resubmit a job if IAA work queues are full */
+#define MAX_SUBMIT_RETRY_NUM (3)
+
 typedef struct {
 /* the QPL hardware path job */
 qpl_job *job;
@@ -260,6 +265,225 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams 
*p, Error **errp)
 p->iov = NULL;
 }
 
+/**
+ * multifd_qpl_prepare_job: prepare the job
+ *
+ * Set the QPL job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @is_compression: indicates compression and decompression
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the length of the output data
+ */
+static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
+uint8_t *input, uint32_t input_len,
+uint8_t *output, uint32_t output_len)
+{
+job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+job->next_in_ptr = input;
+job->next_out_ptr = output;
+job->available_in = input_len;
+job->available_out = output_len;
+job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+/* only supports compression level 1 */
+job->level = 1;
+}
+
+/**
+ * multifd_qpl_prepare_comp_job: prepare the compression job
+ *
+ * Set the compression job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @input: pointer to the input data buffer
+ * @output: pointer to the output data buffer
+ * @size: the page size
+ */
+static void multifd_qpl_prepare_comp_job(qpl_job *job, uint8_t *input,
+ uint8_t *output, uint32_t size)
+{
+/*
+ * Set output length to less than the page size to force the job to
+ * fail in case it compresses to a larger size. We'll send that page
+ * without compression and skip the decompression operation on the
+ * destination.
+ */
+multifd_qpl_prepare_job(job, true, input, size, output, size - 1);
+}
+
+/**
+ * multifd_qpl_prepare_decomp_job: prepare the decompression job
+ *
+ * Set the decompression job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @input: pointer to the input data buffer
+ * @len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @size: the page size
+ */
+static void multifd_qpl_prepare_decomp_job(qpl_job *job, uint8_t *input,
+   uint32_t len, uint8_t *output,
+   uint32_t size)
+{
+multifd_qpl_prepare_job(job, false, input, len, output, size);
+}
+
+/**
+ * multifd_qpl_fill_iov: fill in the IOV
+ *
+ * Fill in the QPL packet IOV
+ *
+ * @p: Params for the channel being used
+ * @data: pointer to the IOV data
+ * @len: The length of the IOV data
+ */
+static void multifd_qpl_fill_iov(MultiFDSendParams *p, uint8_t *data,
+ uint32_t len)
+{
+p->iov[p->iovs_num].iov_base = data;
+p->iov[p->iovs_num].iov_len = len;
+p->iovs_num++;
+p->next_packet_size += len;
+}
+
+/**
+ * multifd_qpl_fill_packet: fill the compressed page into the QPL packet
+ *
+ * Fill the compressed page length and IOV into the QPL packet
+ *
+ * @idx: The index of the compressed length array
+ * @p: Params for the channel being used
+ * @data: pointer to the compressed page buffer
+ * @len