to summarize, this was an issue of selecting serialized representations
for large ultra-sparse matrices. Thanks again for sharing your feedback
with us.
1) In-memory representation: In CSR every non-zero will require 12 bytes
- this is 240MB in your case. The overall memory consumption, however,
depends on the distribution of non-zeros: In CSR, each block with at
least one non-zero requires 4KB for row pointers. Assuming uniform
distribution (the worst case), this gives us 80GB. This is likely the
problem here. Every empty block would have an overhead of 44Bytes but
for the worst-case assumption, there are no empty blocks left. We do not
use COO for checkpoints because it would slow down subsequent operations.
2) Serialized/on-disk representation: For sparse datasets that are
expected to exceed aggregate memory, we used to use a serialized
representation (with storage level MEM_AND_DISK_SER) which uses sparse,
ultra-sparse, or empty representations. In this form, ultra-sparse
blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes.
Therefore, with this representation selected, you're dataset should
easily fit in aggregate memory. Also, note that chkpoint is only a
transformation that persists the rdd, the subsequent operation then
pulls the data into memory.
At a high-level this was a bug. We missed ultra-sparse representations
when introducing an improvement that stores sparse matrices in MCSR
format in CSR format on checkpoints which eliminated the need to use a
serialized storage level. I just deliver a fix. Now we store such
ultra-sparse matrices again in serialized form which should
significantly reduce the memory pressure.
Regards,
Matthias
On 5/3/2017 9:38 AM, Mingyang Wang wrote:
Hi all,
I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one
non-zero value on each row, that is 2e7 non-zero values in total.
With driver memory of 1GB and executor memory of 100GB, I found the HOP
"Spark chkpoint", which is used to pin the FK matrix in memory, is really
expensive, as it invokes lots of disk operations.
FK is stored in binary format with 24 blocks, each block is ~45MB, and ~1GB
in total.
For example, with the script as
"""
FK = read($FK)
print("Sum of FK = " + sum(FK))
"""
things worked fine, and it took ~8s.
While with the script as
"""
FK = read($FK)
if (1 == 1) {}
print("Sum of FK = " + sum(FK))
"""
things changed. It took ~92s and I observed lots of disk spills from logs.
Based on the stats from Spark UI, it seems the materialized FK requires
54GB storage and thus introduces disk operations.
I was wondering, is this the expected behavior of a super sparse matrix?
Regards,
Mingyang