Hi Matthias, Thanks for your thorough explanations! And I have some other questions.
1. I am curious about the behaviors of the read operation within createvar. How can I differentiate whether the inputs are loaded in the driver memory or loaded in executors? Can I assume the inputs are loaded in executors if a Spark checkpoint instruction is invoked? 2. I am also curious how do you put a sum operation in a different DAG? Currently, I put a "print one entry" instruction within a for loop, is it sufficient to trigger the whole matrix multiplication without some shortcuts like a dot product between a row and a column? At least, from the HOP explains, the whole matrix multiplication is scheduled. 3. About generating a "specific" sparse matrix in SystemML. Say, I need a sparse matrix of 200,000,000 x 10,000,000 and there is exactly one non-zero value in each row (the position could be random). Is there any efficient way to do it? Currently, I am generating such matrix externally in text format, and it cannot be easily converted to binary format with a simple read/write script (it took quite a long time and failed). Regards, Mingyang On Thu, Apr 20, 2017 at 2:08 AM Matthias Boehm <mboe...@googlemail.com> wrote: > Hi Mingyang, > > thanks for the questions - this is very valuable feedback. I was able to > reproduce your performance issue on scenario 1 and I have a patch, which > I'll push to master tomorrow after a more thorough testing. Below are the > details and the answers to your questions: > > 1) Expected performance and bottlenecks: In general, for these single > operation scripts, the read is indeed the expected bottleneck. However, > excessive GC is usually an indicator for internal performance issues that > can be addressed. Let's discuss the scenarios individually: > > a) Script 1 (in-memory operations): Given the mentioned data sizes, the > inputs are read into the driver and all operations are executed as > singlenode, in-memory operations. However, typically we read binary > matrices at 1GB/s and perform these matrix-vector operations at peak memory > bandwidth, i.e., 16-64GB/s on a single node. > > The problem in your scenario is the read of the ultra-sparse matrix FK, > which has a sparsity of 10^-6, i.e., roughly a single cell per row. In my > environment the stats looked as follows: > > Total elapsed time: 48.274 sec. > Total compilation time: 1.957 sec. > Total execution time: 46.317 sec. > Number of compiled MR Jobs: 0. > Number of executed MR Jobs: 0. > Cache hits (Mem, WB, FS, HDFS): 6/0/0/3. > Cache writes (WB, FS, HDFS): 4/0/0. > Cache times (ACQr/m, RLS, EXP): 45.078/0.001/0.005/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/0. > HOP DAGs recompile time: 0.000 sec. > Total JIT compile time: 9.24 sec. > Total JVM GC count: 23. > Total JVM GC time: 35.181 sec. > Heavy hitter instructions (name, time, count): > -- 1) ba+* 45.927 sec 3 > -- 2) uak+ 0.228 sec 1 > -- 3) + 0.138 sec 1 > -- 4) rand 0.023 sec 2 > -- 5) print 0.001 sec 1 > -- 6) == 0.001 sec 1 > -- 7) createvar 0.000 sec 9 > -- 8) rmvar 0.000 sec 10 > -- 9) assignvar 0.000 sec 1 > -- 10) cpvar 0.000 sec 1 > > With the patch (that essentially leverages our CSR instead of MCSR sparse > format for temporarily read blocks in order to reduce the size overhead and > allow for efficient reuse), the execution time improved to the following > > Total elapsed time: 14.860 sec. > Total compilation time: 1.922 sec. > Total execution time: 12.938 sec. > Number of compiled MR Jobs: 0. > Number of executed MR Jobs: 0. > Cache hits (Mem, WB, FS, HDFS): 6/0/0/3. > Cache writes (WB, FS, HDFS): 4/0/0. > Cache times (ACQr/m, RLS, EXP): 10.227/0.001/0.006/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/0. > HOP DAGs recompile time: 0.000 sec. > Total JIT compile time: 7.529 sec. > Total JVM GC count: 6. > Total JVM GC time: 4.174 sec. > Heavy hitter instructions (name, time, count): > -- 1) ba+* 12.442 sec 3 > -- 2) uak+ 0.380 sec 1 > -- 3) + 0.097 sec 1 > -- 4) rand 0.018 sec 2 > -- 5) == 0.001 sec 1 > -- 6) print 0.000 sec 1 > -- 7) createvar 0.000 sec 9 > -- 8) rmvar 0.000 sec 10 > -- 9) cpvar 0.000 sec 1 > -- 10) assignvar 0.000 sec 1 > > b) Script 2 (distributed operations): This scenario looks as expected. > However, the stats output can be a little misleading due to Sparks lazy > evaluation. Since the read and matrix-vector multiplication are just > transformations, the collect action then triggers the entire pipeline and > accordingly shows up as the heavy hitter. Again, here are the stats from my > environment (where I used a sum in a different DAG to trigger compute): > > Total elapsed time: 62.681 sec. > Total compilation time: 1.790 sec. > Total execution time: 60.891 sec. > Number of compiled Spark inst: 2. > Number of executed Spark inst: 2. > Cache hits (Mem, WB, FS, HDFS): 1/0/0/1. > Cache writes (WB, FS, HDFS): 1/0/0. > Cache times (ACQr/m, RLS, EXP): 26.323/0.001/0.004/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/1. > HOP DAGs recompile time: 0.005 sec. > Spark ctx create time (lazy): 33.687 sec. > Spark trans counts (par,bc,col):0/1/1. > Spark trans times (par,bc,col): 0.000/0.011/26.322 secs. > Total JIT compile time: 19.571 sec. > Total JVM GC count: 12. > Total JVM GC time: 0.536 sec. > Heavy hitter instructions (name, time, count): > -- 1) sp_chkpoint 34.272 sec 1 > -- 2) uak+ 26.474 sec 1 > -- 3) sp_mapmm 0.026 sec 1 > -- 4) rand 0.023 sec 1 > -- 5) rmvar 0.011 sec 7 > -- 6) == 0.001 sec 1 > -- 7) print 0.000 sec 1 > -- 8) createvar 0.000 sec 4 > -- 9) assignvar 0.000 sec 1 > -- 10) cpvar 0.000 sec 1 > > Note, that 33s out of 62s are required for spark context creation > (allocating and initializing the yarn containers for executors). The > collect is then triggered by the sum (uak+, i.e., unary aggregate kahan > plus) which includes the collect. Furthermore, there is an unnecessary > checkpoint instructions which caches the input into storage level > mem-and-disk. SystemML has rewrites to remove these unnecessary checkpoints > but they do not apply here. Finally, note that the spark context creation > and initial read are one time costs that are amortized over many > iterations. > > 2) Performance tuning: The biggest tuning knobs are certainly the memory > configurations. Increasing the driver heap size can help to reduce garbage > collection overhead of singlenode operations and allow broadcasting larger > matrices because these broadcasts have to be constructed at the driver. > There are many additional tuning options such as compression, NUMA > awareness, and code generation but these require a more detailed > description. > > 3) Time breakdown: The stats output as shown above has some indicators > where time is spent. For example, ACQr (acquire read) shows the time for > pinning input matrices into driver memory before singlenode operations. > This bufferpool primitive includes the local read time from HDFS, restore > of evicted matrices, and collect of pending RDD operation outputs. The > Spark transfer counts and times (par,bc,col) give a more detailed view on > the time for RDD parallelization (driver->executors), broadcasts > (driver->executors), and collect (executors->driver). For distributed > operations, it's much more complex as the individual phases of read and > compute are overlapping, but the Spark UI provides very nice summary > statistics. > > 4) Resource estimation: Right now this requires a semi-manual > configuration. You can look at the explain hops output which gives you the > memory estimates of all operations. So if you want to execute all > operations in the driver, set the max heap such that the largest operation > fits into 70% of the max heap. Additionally, memory configurations also > impact operator selection - for example, we only compile broadcast-based > matrix multiplications if the smaller input fits twice in the driver and in > the broadcast budget of executors (which ensures that the broadcasts are > not spilled out). Looking forward, having an automated resource advisor > would be a very useful feature especially for cloud environments to assist > with cluster provisioning. > > I hope this answers your questions and thanks again for catching this > performance issue. > > > Regards, > Matthias > > > On Wed, Apr 19, 2017 at 5:48 PM, Mingyang Wang <miw...@eng.ucsd.edu> > wrote: > >> Hi all, >> >> I have run some simple matrix multiplication in SystemML and found that >> JVM >> GC time and Spark collect time are dominant. >> >> For example, given 4 executors with 20 cores and 100GB memory each, and a >> driver with 10GB memory, one setting is >> >> R = read($R) # 1,000,000 x 80 -> 612M >> S = read($S) # 20,000,000 x 20 -> 3G >> FK = read($FK) # 20,000,000 x 1,000,000 (sparse) -> 358M >> wS = Rand(rows=ncol(S), cols=1, min=0, max=1, pdf="uniform") >> wR = Rand(rows=ncol(R), cols=1, min=0, max=1, pdf="uniform") >> >> temp = S %*% wS + FK %*% (R %*% wR) >> # some code to enforce the execution >> >> It took 77.597s to execute while JVM GC took 70.282s. >> >> Another setting is >> >> T = read($T) # 20,000,000 x 100 -> 15G >> w = Rand(rows=ncol(T), cols=1, min=0, max=1, pdf="uniform") >> >> temp = T %*% w >> # some code to enforce the execution >> >> It took 92.582s to execute while Spark collect took 91.991s. >> >> My questions are >> 1. Are these behaviors expected, as it seems only a tiny fraction of time >> are spent on computation? >> 2. How can I tweak the configuration to tune the performance? >> 3. Is there any way to measure the time spent on data loading, >> computation, >> disk accesses, and communication separately? >> 4. Any rule of thumb to estimate the memory needed for a program in >> SystemML? >> >> I really appreciate your inputs! >> >> >> Best, >> Mingyang Wang >> > >