Tim,
> I’m trying to perform on-the-fly compression during serialization of my
> custom Image class using the Blosc library.
Not sure if this helps answering your question, but HPX already supports
compressing the serialized data for actions you markup accordingly. For this
you need to:
a) build HPX with compression support. We currently support 3 compression
schemes bzip2, zlib, and snappy. All can be enabled at the same time by adding
-DHPX_WITH_COMPRESSION_BZIP2=On, -DHPX_WITH_COMPRESSION_ZLIB=On, and/or
-DHPX_WITH_COMPRESSION_SNAPPY=On to the cmake command line.
b) by marking up the actions you want HPX to compress the data for - for
instance:
void test(your_image_data_type const& data)
{
}
HPX_DECLARE_PLAIN_ACTION(test, test_action);
#if defined(HPX_HAVE_COMPRESSION_BZIP2)
HPX_ACTION_USES_BZIP2_COMPRESSION(test_action);
#elif defined(HPX_HAVE_COMPRESSION_ZLIB)
HPX_ACTION_USES_ZLIB_COMPRESSION(test_action);
#elif defined(HPX_HAVE_COMPRESSION_SNAPPY)
HPX_ACTION_USES_SNAPPY_COMPRESSION(test_action);
#endif
HPX_PLAIN_ACTION(test_action);
(or similar for component actions).
For a full example see here:
https://github.com/STEllAR-GROUP/hpx/blob/master/tests/unit/parcelset/put_parcels_with_compression.cpp
HTH
Regards Hartmut
---------------
http://boost-spirit.com
http://stellar.cct.lsu.edu
> While compression during save() seems to work fine, the decompression at
> load() randomly crashes or throws an exception due to corrupted source
> data.
> For verification I also included the same decompression routine directly
> after the compression step in save(), where it always produces correct
> results, as expected.
>
> See my code below. Note, that serialization without compression works
> fine. I’m using the MPI parcel port.
>
>
> --------------------------------------
> private:
> friend class hpx::serialization::access;
>
> /**
> * Serialization.
> */
> template <typename Archive>
> void save(Archive& ar, const unsigned int) const
> {
> ar << this->offset << this->size << this->scanlineBytes;
>
> int numBytes = this->size.y * this->scanlineBytes;
>
> if (compression)
> {
> byte* buffer = new byte[numBytes + BLOSC_MAX_OVERHEAD];
>
> // lz4 (level 4) - 4 threads
> int numBytesCompressed = blosc_compress_ctx(4,
> BLOSC_NOSHUFFLE, sizeof(byte), numBytes, this->pixels, buffer, numBytes +
> BLOSC_MAX_OVERHEAD, "lz4", 0, 4);
>
> if (numBytesCompressed <= 0)
> LogError() << "(save) Compressed size: " <<
> numBytesCompressed;
>
> ar << numBytesCompressed;
> ar << hpx::serialization::make_array<byte>(buffer,
> numBytesCompressed);
>
>
> // For debugging only: Verify that decompression works on
> previously compressed data
> {
> byte* tmp = new byte[numBytes];
>
> int numBytesDecompressed = blosc_decompress_ctx(buffer,
> tmp, numBytes, 4);
>
> // Check if decompression succeeded
> if (numBytesDecompressed <= 0)
> LogError() << "(save verification) Decompressed bytes:
> " << numBytesDecompressed;
> else if (numBytesDecompressed != numBytes)
> LogWarning() << "(save verification) Decompress
> mismatch! Pixel bytes: " << numBytes << ", Decompressed bytes: " <<
> numBytesDecompressed;
>
> delete[] tmp;
> }
>
> delete[] buffer;
> }
> else
> {
> ar << hpx::serialization::make_array<byte>(this->pixels,
> numBytes);
> }
> }
>
> /**
> * Deserialization.
> */
> template <typename Archive>
> void load(Archive& ar, const unsigned int)
> {
> ar >> this->offset >> this->size >> this->scanlineBytes;
>
> int numBytes = this->size.y * this->scanlineBytes;
> this->pixels = (byte*) boost::alignment::aligned_alloc(32,
> numBytes);
>
> if (compression)
> {
> int numBytesCompressed;
> ar >> numBytesCompressed;
>
> byte* buffer = new byte[numBytesCompressed];
> ar >> hpx::serialization::make_array<byte>(buffer,
> numBytesCompressed);
>
> int numBytesDecompressed = blosc_decompress_ctx(buffer, this-
> >pixels, numBytes, 4);
>
> // Check if decompression succeeded
> if (numBytesDecompressed <= 0)
> LogError() << "(load) Decompressed bytes: " <<
> numBytesDecompressed;
> else if (numBytesDecompressed != numBytes)
> LogWarning() << "(load) Decompress mismatch! Pixel bytes:
> " << numBytes << ", Decompressed bytes: " << numBytesDecompressed;
>
> delete[] buffer;
> }
> else
> {
> ar >> hpx::serialization::make_array<byte>(this->pixels,
> numBytes);
> }
> }
>
> HPX_SERIALIZATION_SPLIT_MEMBER();
>
> --------------------------------------
>
>
>
> Since I’m using a temporary buffer for the compressed results, what about
> the following theory: Does "ar <<
> hpx::serialization::make_array<byte>(buffer,
> numBytesCompressed);” directly copy the buffer contents? Or is only the
> buffer address and size stored and its contents are transferred at a later
> stage (after I have already deleted the temporary buffer)? If so,
> what would be the ideal approach in this case?
>
>
> Another general issue I noted while tracking load()/save() calls: For
> each transferred image, there are usually 3x save() and 1x load()
> calls. What’s the reason for this?
>
>
> Thank you very much!
>
> Best,
> Tim
>
>
>
_______________________________________________
hpx-users mailing list
[email protected]
https://mail.cct.lsu.edu/mailman/listinfo/hpx-users