quick update: The poor runtime of scenario (3) is now fixed in master. The reasons were unnecessary shuffle and load imbalance for spark rexpand operations with small input vector and large, ultra-sparse output matrix. Thanks for pointing this out Mingyang.
Regards, Matthias On Mon, May 8, 2017 at 3:09 PM, Matthias Boehm <mboe...@googlemail.com> wrote: > ok thanks for sharing - I'll have a look later this week. > > Regards, > Matthias > > On Mon, May 8, 2017 at 2:20 PM, Mingyang Wang <miw...@eng.ucsd.edu> wrote: > >> Hi Matthias, >> >> With a driver memory of 10GB, all operations were executed on CP, and I >> did >> observe that the version of reading FK as a vector and then converting it >> was faster, which took 8.337s (6.246s on GC) while the version of reading >> FK as a matrix took 31.680s (26.256s on GC). >> >> For the distributed caching, I have re-run all scripts with the following >> Spark configuration >> >> --driver-memory 1G \ >> --executor-memory 100G \ >> --executor-cores 20 \ >> --num-executors 1 \ >> --conf spark.driver.maxResultSize=0 \ >> --conf spark.rpc.message.maxSize=128 \ >> >> And it seems that both versions have some problems. >> >> 1) Sum of FK in matrix form >> ``` >> FK = read($FK) >> print("Sum of FK = " + sum(FK)) >> ``` >> Worked as expected. Took 8.786s. >> >> >> 2) Sum of FK in matrix form, with checkpoints >> ``` >> FK = read($FK) >> if (1 == 1) {} >> print("Sum of FK = " + sum(FK)) >> ``` >> It took 89.731s, with detailed stats shown below. >> >> 17/05/08 13:15:00 INFO api.ScriptExecutorUtils: SystemML Statistics: >> Total elapsed time: 91.619 sec. >> Total compilation time: 1.889 sec. >> Total execution time: 89.731 sec. >> Number of compiled Spark inst: 2. >> Number of executed Spark inst: 2. >> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. >> Cache writes (WB, FS, HDFS): 0/0/0. >> Cache times (ACQr/m, RLS, EXP): 0.000/0.001/0.000/0.000 sec. >> HOP DAGs recompiled (PRED, SB): 0/0. >> HOP DAGs recompile time: 0.000 sec. >> Spark ctx create time (lazy): 0.895 sec. >> Spark trans counts (par,bc,col):0/0/0. >> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. >> Total JIT compile time: 5.001 sec. >> Total JVM GC count: 8. >> Total JVM GC time: 0.161 sec. >> Heavy hitter instructions (name, time, count): >> -- 1) sp_uak+ 89.349 sec 1 >> -- 2) sp_chkpoint 0.381 sec 1 >> -- 3) == 0.001 sec 1 >> -- 4) + 0.000 sec 1 >> -- 5) print 0.000 sec 1 >> -- 6) castdts 0.000 sec 1 >> -- 7) createvar 0.000 sec 3 >> -- 8) rmvar 0.000 sec 7 >> -- 9) assignvar 0.000 sec 1 >> -- 10) cpvar 0.000 sec 1 >> >> >> 3) Sum of FK in vector form >> ``` >> FK_colvec = read($FK_colvec) >> FK = table(seq(1,nrow(FK_colvec)), FK_colvec, nrow(FK_colvec), 1e6) >> print("Sum of FK = " + sum(FK)) >> ``` >> Things really went wrong. It took ~10 mins. >> >> 17/05/08 13:26:36 INFO api.ScriptExecutorUtils: SystemML Statistics: >> Total elapsed time: 605.688 sec. >> Total compilation time: 1.857 sec. >> Total execution time: 603.832 sec. >> Number of compiled Spark inst: 2. >> Number of executed Spark inst: 2. >> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. >> Cache writes (WB, FS, HDFS): 0/0/0. >> Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. >> HOP DAGs recompiled (PRED, SB): 0/1. >> HOP DAGs recompile time: 0.002 sec. >> Spark ctx create time (lazy): 0.858 sec. >> Spark trans counts (par,bc,col):0/0/0. >> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. >> Total JIT compile time: 3.682 sec. >> Total JVM GC count: 5. >> Total JVM GC time: 0.064 sec. >> Heavy hitter instructions (name, time, count): >> -- 1) sp_uak+ 603.447 sec 1 >> -- 2) sp_rexpand 0.381 sec 1 >> -- 3) createvar 0.002 sec 3 >> -- 4) rmvar 0.000 sec 5 >> -- 5) + 0.000 sec 1 >> -- 6) print 0.000 sec 1 >> -- 7) castdts 0.000 sec 1 >> >> Also, from the executor log, there were some disk spilling: >> >> 17/05/08 13:20:00 INFO ExternalSorter: Thread 109 spilling in-memory >> map of 33.8 GB to disk (1 time so far) >> 17/05/08 13:20:20 INFO ExternalSorter: Thread 116 spilling in-memory >> map of 31.2 GB to disk (1 time so far) >> >> ... >> >> 17/05/08 13:24:50 INFO ExternalAppendOnlyMap: Thread 116 spilling >> in-memory map of 26.9 GB to disk (1 time so far) >> 17/05/08 13:25:08 INFO ExternalAppendOnlyMap: Thread 109 spilling >> in-memory map of 26.6 GB to disk (1 time so far) >> >> >> >> Regards, >> Mingyang >> >> On Sat, May 6, 2017 at 9:12 PM Matthias Boehm <mboe...@googlemail.com> >> wrote: >> >> > yes, even with the previous patch for improved memory efficiency of >> > ultra-sparse matrices in MCSR format, there is still some unnecessary >> > overhead that leads to garbage collection. For this reason, I would >> > recommend to read it as vector and convert it in memory to an >> ultra-sparse >> > matrix. I also just pushed a minor performance improvement for reading >> > ultra-sparse matrices but the major bottleneck still exist. >> > >> > The core issue is that we can't read these ultra-sparse matrices into a >> CSR >> > representation because it does not allow for efficient incremental >> > construction (with unordered inputs and multi-threaded read). However, I >> > created SYSTEMML-1587 to solve this in the general case. The idea is to >> > read ultra-sparse matrices into thread-local COO deltas and finally >> merge >> > it into a CSR representation. The initial results are very promising and >> > it's safe because the temporary memory requirements are covered by the >> MCSR >> > estimate, but it will take a while because I want to introduce this >> > consistently for all readers (single-/multi-threaded, all formats). >> > >> > In contrast to the read issue, I was not able to reproduce the described >> > performance issue of distributed caching. Could you please double check >> > that this test also used the current master build and perhaps share the >> > detailed setup again (e.g., num executors, data distribution, etc). >> Thanks. >> > >> > Regards, >> > Matthias >> > >> > >> > On Thu, May 4, 2017 at 9:55 PM, Mingyang Wang <miw...@eng.ucsd.edu> >> wrote: >> > >> > > Out of curiosity, I increased the driver memory to 10GB, and then all >> > > operations were executed on CP. It took 37.166s but JVM GC took >> 30.534s. >> > I >> > > was wondering whether this is the expected behavior? >> > > >> > > Total elapsed time: 38.093 sec. >> > > Total compilation time: 0.926 sec. >> > > Total execution time: 37.166 sec. >> > > Number of compiled Spark inst: 0. >> > > Number of executed Spark inst: 0. >> > > Cache hits (Mem, WB, FS, HDFS): 0/0/0/1. >> > > Cache writes (WB, FS, HDFS): 0/0/0. >> > > Cache times (ACQr/m, RLS, EXP): 30.400/0.000/0.001/0.000 sec. >> > > HOP DAGs recompiled (PRED, SB): 0/0. >> > > HOP DAGs recompile time: 0.000 sec. >> > > Spark ctx create time (lazy): 0.000 sec. >> > > Spark trans counts (par,bc,col):0/0/0. >> > > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. >> > > Total JIT compile time: 22.302 sec. >> > > Total JVM GC count: 11. >> > > Total JVM GC time: 30.534 sec. >> > > Heavy hitter instructions (name, time, count): >> > > -- 1) uak+ 37.166 sec 1 >> > > -- 2) == 0.001 sec 1 >> > > -- 3) + 0.000 sec 1 >> > > -- 4) print 0.000 sec 1 >> > > -- 5) rmvar 0.000 sec 5 >> > > -- 6) createvar 0.000 sec 1 >> > > -- 7) assignvar 0.000 sec 1 >> > > -- 8) cpvar 0.000 sec 1 >> > > >> > > Regards, >> > > Mingyang >> > > >> > > On Thu, May 4, 2017 at 9:48 PM Mingyang Wang <miw...@eng.ucsd.edu> >> > wrote: >> > > >> > > > Hi Matthias, >> > > > >> > > > Thanks for the patch. >> > > > >> > > > I have re-run the experiment and observed that there was indeed no >> more >> > > > memory pressure, but it still took ~90s for this simple script. I >> was >> > > > wondering what is the bottleneck for this case? >> > > > >> > > > >> > > > Total elapsed time: 94.800 sec. >> > > > Total compilation time: 1.826 sec. >> > > > Total execution time: 92.974 sec. >> > > > Number of compiled Spark inst: 2. >> > > > Number of executed Spark inst: 2. >> > > > Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. >> > > > Cache writes (WB, FS, HDFS): 0/0/0. >> > > > Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. >> > > > HOP DAGs recompiled (PRED, SB): 0/0. >> > > > HOP DAGs recompile time: 0.000 sec. >> > > > Spark ctx create time (lazy): 0.860 sec. >> > > > Spark trans counts (par,bc,col):0/0/0. >> > > > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. >> > > > Total JIT compile time: 3.498 sec. >> > > > Total JVM GC count: 5. >> > > > Total JVM GC time: 0.064 sec. >> > > > Heavy hitter instructions (name, time, count): >> > > > -- 1) sp_uak+ 92.597 sec 1 >> > > > -- 2) sp_chkpoint 0.377 sec 1 >> > > > -- 3) == 0.001 sec 1 >> > > > -- 4) print 0.000 sec 1 >> > > > -- 5) + 0.000 sec 1 >> > > > -- 6) castdts 0.000 sec 1 >> > > > -- 7) createvar 0.000 sec 3 >> > > > -- 8) rmvar 0.000 sec 7 >> > > > -- 9) assignvar 0.000 sec 1 >> > > > -- 10) cpvar 0.000 sec 1 >> > > > >> > > > Regards, >> > > > Mingyang >> > > > >> > > > On Wed, May 3, 2017 at 8:54 AM Matthias Boehm < >> mboe...@googlemail.com> >> > > > wrote: >> > > > >> > > >> to summarize, this was an issue of selecting serialized >> > representations >> > > >> for large ultra-sparse matrices. Thanks again for sharing your >> > feedback >> > > >> with us. >> > > >> >> > > >> 1) In-memory representation: In CSR every non-zero will require 12 >> > bytes >> > > >> - this is 240MB in your case. The overall memory consumption, >> however, >> > > >> depends on the distribution of non-zeros: In CSR, each block with >> at >> > > >> least one non-zero requires 4KB for row pointers. Assuming uniform >> > > >> distribution (the worst case), this gives us 80GB. This is likely >> the >> > > >> problem here. Every empty block would have an overhead of 44Bytes >> but >> > > >> for the worst-case assumption, there are no empty blocks left. We >> do >> > not >> > > >> use COO for checkpoints because it would slow down subsequent >> > > operations. >> > > >> >> > > >> 2) Serialized/on-disk representation: For sparse datasets that are >> > > >> expected to exceed aggregate memory, we used to use a serialized >> > > >> representation (with storage level MEM_AND_DISK_SER) which uses >> > sparse, >> > > >> ultra-sparse, or empty representations. In this form, ultra-sparse >> > > >> blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes. >> > > >> Therefore, with this representation selected, you're dataset should >> > > >> easily fit in aggregate memory. Also, note that chkpoint is only a >> > > >> transformation that persists the rdd, the subsequent operation then >> > > >> pulls the data into memory. >> > > >> >> > > >> At a high-level this was a bug. We missed ultra-sparse >> representations >> > > >> when introducing an improvement that stores sparse matrices in MCSR >> > > >> format in CSR format on checkpoints which eliminated the need to >> use a >> > > >> serialized storage level. I just deliver a fix. Now we store such >> > > >> ultra-sparse matrices again in serialized form which should >> > > >> significantly reduce the memory pressure. >> > > >> >> > > >> Regards, >> > > >> Matthias >> > > >> >> > > >> On 5/3/2017 9:38 AM, Mingyang Wang wrote: >> > > >> > Hi all, >> > > >> > >> > > >> > I was playing with a super sparse matrix FK, 2e7 by 1e6, with >> only >> > one >> > > >> > non-zero value on each row, that is 2e7 non-zero values in total. >> > > >> > >> > > >> > With driver memory of 1GB and executor memory of 100GB, I found >> the >> > > HOP >> > > >> > "Spark chkpoint", which is used to pin the FK matrix in memory, >> is >> > > >> really >> > > >> > expensive, as it invokes lots of disk operations. >> > > >> > >> > > >> > FK is stored in binary format with 24 blocks, each block is >> ~45MB, >> > and >> > > >> ~1GB >> > > >> > in total. >> > > >> > >> > > >> > For example, with the script as >> > > >> > >> > > >> > """ >> > > >> > FK = read($FK) >> > > >> > print("Sum of FK = " + sum(FK)) >> > > >> > """ >> > > >> > >> > > >> > things worked fine, and it took ~8s. >> > > >> > >> > > >> > While with the script as >> > > >> > >> > > >> > """ >> > > >> > FK = read($FK) >> > > >> > if (1 == 1) {} >> > > >> > print("Sum of FK = " + sum(FK)) >> > > >> > """ >> > > >> > >> > > >> > things changed. It took ~92s and I observed lots of disk spills >> from >> > > >> logs. >> > > >> > Based on the stats from Spark UI, it seems the materialized FK >> > > requires >> > > >> >> 54GB storage and thus introduces disk operations. >> > > >> > >> > > >> > I was wondering, is this the expected behavior of a super sparse >> > > matrix? >> > > >> > >> > > >> > >> > > >> > Regards, >> > > >> > Mingyang >> > > >> > >> > > >> >> > > > >> > > >> > >> > >