Hi Matthias,

Thanks for your thorough explanations! And I have some other questions.

1. I am curious about the behaviors of the read operation within createvar.
How can I differentiate whether the inputs are loaded in the driver memory
or loaded in executors? Can I assume the inputs are loaded in executors if
a Spark checkpoint instruction is invoked?

2. I am also curious how do you put a sum operation in a different DAG?
Currently, I put a "print one entry" instruction within a for loop, is it
sufficient to trigger the whole matrix multiplication without some
shortcuts like a dot product between a row and a column? At least, from the
HOP explains, the whole matrix multiplication is scheduled.

3. About generating a "specific" sparse matrix in SystemML. Say, I need a
sparse matrix of 200,000,000 x 10,000,000 and there is exactly one non-zero
value in each row (the position could be random). Is there any efficient
way to do it? Currently, I am generating such matrix externally in text
format, and it cannot be easily converted to binary format with a simple
read/write script (it took quite a long time and failed).


Regards,
Mingyang

On Thu, Apr 20, 2017 at 2:08 AM Matthias Boehm <mboe...@googlemail.com>
wrote:

> Hi Mingyang,
>
> thanks for the questions - this is very valuable feedback. I was able to
> reproduce your performance issue on scenario 1 and I have a patch, which
> I'll push to master tomorrow after a more thorough testing. Below are the
> details and the answers to your questions:
>
> 1) Expected performance and bottlenecks: In general, for these single
> operation scripts, the read is indeed the expected bottleneck. However,
> excessive GC is usually an indicator for internal performance issues that
> can be addressed. Let's discuss the scenarios individually:
>
> a) Script 1 (in-memory operations): Given the mentioned data sizes, the
> inputs are read into the driver and all operations are executed as
> singlenode, in-memory operations. However, typically we read binary
> matrices at 1GB/s and perform these matrix-vector operations at peak memory
> bandwidth, i.e., 16-64GB/s on a single node.
>
> The problem in your scenario is the read of the ultra-sparse matrix FK,
> which has a sparsity of 10^-6, i.e., roughly a single cell per row. In my
> environment the stats looked as follows:
>
> Total elapsed time:             48.274 sec.
> Total compilation time:         1.957 sec.
> Total execution time:           46.317 sec.
> Number of compiled MR Jobs:     0.
> Number of executed MR Jobs:     0.
> Cache hits (Mem, WB, FS, HDFS): 6/0/0/3.
> Cache writes (WB, FS, HDFS):    4/0/0.
> Cache times (ACQr/m, RLS, EXP): 45.078/0.001/0.005/0.000 sec.
> HOP DAGs recompiled (PRED, SB): 0/0.
> HOP DAGs recompile time:        0.000 sec.
> Total JIT compile time:         9.24 sec.
> Total JVM GC count:             23.
> Total JVM GC time:              35.181 sec.
> Heavy hitter instructions (name, time, count):
> -- 1)   ba+*    45.927 sec      3
> -- 2)   uak+    0.228 sec       1
> -- 3)   +       0.138 sec       1
> -- 4)   rand    0.023 sec       2
> -- 5)   print   0.001 sec       1
> -- 6)   ==      0.001 sec       1
> -- 7)   createvar       0.000 sec       9
> -- 8)   rmvar   0.000 sec       10
> -- 9)   assignvar       0.000 sec       1
> -- 10)  cpvar   0.000 sec       1
>
> With the patch (that essentially leverages our CSR instead of MCSR sparse
> format for temporarily read blocks in order to reduce the size overhead and
> allow for efficient reuse), the execution time improved to the following
>
> Total elapsed time:             14.860 sec.
> Total compilation time:         1.922 sec.
> Total execution time:           12.938 sec.
> Number of compiled MR Jobs:     0.
> Number of executed MR Jobs:     0.
> Cache hits (Mem, WB, FS, HDFS): 6/0/0/3.
> Cache writes (WB, FS, HDFS):    4/0/0.
> Cache times (ACQr/m, RLS, EXP): 10.227/0.001/0.006/0.000 sec.
> HOP DAGs recompiled (PRED, SB): 0/0.
> HOP DAGs recompile time:        0.000 sec.
> Total JIT compile time:         7.529 sec.
> Total JVM GC count:             6.
> Total JVM GC time:              4.174 sec.
> Heavy hitter instructions (name, time, count):
> -- 1)   ba+*    12.442 sec      3
> -- 2)   uak+    0.380 sec       1
> -- 3)   +       0.097 sec       1
> -- 4)   rand    0.018 sec       2
> -- 5)   ==      0.001 sec       1
> -- 6)   print   0.000 sec       1
> -- 7)   createvar       0.000 sec       9
> -- 8)   rmvar   0.000 sec       10
> -- 9)   cpvar   0.000 sec       1
> -- 10)  assignvar       0.000 sec       1
>
> b) Script 2 (distributed operations): This scenario looks as expected.
> However, the stats output can be a little misleading due to Sparks lazy
> evaluation. Since the read and matrix-vector multiplication are just
> transformations, the collect action then triggers the entire pipeline and
> accordingly shows up as the heavy hitter. Again, here are the stats from my
> environment (where I used a sum in a different DAG to trigger compute):
>
> Total elapsed time:             62.681 sec.
> Total compilation time:         1.790 sec.
> Total execution time:           60.891 sec.
> Number of compiled Spark inst:  2.
> Number of executed Spark inst:  2.
> Cache hits (Mem, WB, FS, HDFS): 1/0/0/1.
> Cache writes (WB, FS, HDFS):    1/0/0.
> Cache times (ACQr/m, RLS, EXP): 26.323/0.001/0.004/0.000 sec.
> HOP DAGs recompiled (PRED, SB): 0/1.
> HOP DAGs recompile time:        0.005 sec.
> Spark ctx create time (lazy):   33.687 sec.
> Spark trans counts (par,bc,col):0/1/1.
> Spark trans times (par,bc,col): 0.000/0.011/26.322 secs.
> Total JIT compile time:         19.571 sec.
> Total JVM GC count:             12.
> Total JVM GC time:              0.536 sec.
> Heavy hitter instructions (name, time, count):
> -- 1)   sp_chkpoint     34.272 sec      1
> -- 2)   uak+    26.474 sec      1
> -- 3)   sp_mapmm        0.026 sec       1
> -- 4)   rand    0.023 sec       1
> -- 5)   rmvar   0.011 sec       7
> -- 6)   ==      0.001 sec       1
> -- 7)   print   0.000 sec       1
> -- 8)   createvar       0.000 sec       4
> -- 9)   assignvar       0.000 sec       1
> -- 10)  cpvar   0.000 sec       1
>
> Note, that 33s out of 62s are required for spark context creation
> (allocating and initializing the yarn containers for executors). The
> collect is then triggered by the sum (uak+, i.e., unary aggregate kahan
> plus) which includes the collect. Furthermore, there is an unnecessary
> checkpoint instructions which caches the input into storage level
> mem-and-disk. SystemML has rewrites to remove these unnecessary checkpoints
> but they do not apply here. Finally, note that the spark context creation
> and initial read are one time costs that are amortized over many
> iterations.
>
> 2) Performance tuning: The biggest tuning knobs are certainly the memory
> configurations. Increasing the driver heap size can help to reduce garbage
> collection overhead of singlenode operations and allow broadcasting larger
> matrices because these broadcasts have to be constructed at the driver.
> There are many additional tuning options such as compression, NUMA
> awareness, and code generation but these require a more detailed
> description.
>
> 3) Time breakdown: The stats output as shown above has some indicators
> where time is spent. For example, ACQr (acquire read) shows the time for
> pinning input matrices into driver memory before singlenode operations.
> This bufferpool primitive includes the local read time from HDFS, restore
> of evicted matrices, and collect of pending RDD operation outputs. The
> Spark transfer counts and times (par,bc,col) give a more detailed view on
> the time for RDD parallelization (driver->executors), broadcasts
> (driver->executors), and collect (executors->driver). For distributed
> operations, it's much more complex as the individual phases of read and
> compute are overlapping, but the Spark UI provides very nice summary
> statistics.
>
> 4) Resource estimation: Right now this requires a semi-manual
> configuration. You can look at the explain hops output which gives you the
> memory estimates of all operations. So if you want to execute all
> operations in the driver, set the max heap such that the largest operation
> fits into 70% of the max heap. Additionally, memory configurations also
> impact operator selection - for example, we only compile broadcast-based
> matrix multiplications if the smaller input fits twice in the driver and in
> the broadcast budget of executors (which ensures that the broadcasts are
> not spilled out). Looking forward, having an automated resource advisor
> would be a very useful feature especially for cloud environments to assist
> with cluster provisioning.
>
> I hope this answers your questions and thanks again for catching this
> performance issue.
>
>
> Regards,
> Matthias
>
>
> On Wed, Apr 19, 2017 at 5:48 PM, Mingyang Wang <miw...@eng.ucsd.edu>
> wrote:
>
>> Hi all,
>>
>> I have run some simple matrix multiplication in SystemML and found that
>> JVM
>> GC time and Spark collect time are dominant.
>>
>> For example, given 4 executors with 20 cores and 100GB memory each, and a
>> driver with 10GB memory, one setting is
>>
>> R = read($R) # 1,000,000 x 80 -> 612M
>> S = read($S) # 20,000,000 x 20 -> 3G
>> FK = read($FK) # 20,000,000 x 1,000,000 (sparse) -> 358M
>> wS = Rand(rows=ncol(S), cols=1, min=0, max=1, pdf="uniform")
>> wR = Rand(rows=ncol(R), cols=1, min=0, max=1, pdf="uniform")
>>
>> temp = S %*% wS + FK %*% (R %*% wR)
>> # some code to enforce the execution
>>
>> It took 77.597s to execute while JVM GC took 70.282s.
>>
>> Another setting is
>>
>> T = read($T) # 20,000,000 x 100 -> 15G
>> w = Rand(rows=ncol(T), cols=1, min=0, max=1, pdf="uniform")
>>
>> temp = T %*% w
>> # some code to enforce the execution
>>
>> It took 92.582s to execute while Spark collect took 91.991s.
>>
>> My questions are
>> 1. Are these behaviors expected, as it seems only a tiny fraction of time
>> are spent on computation?
>> 2. How can I tweak the configuration to tune the performance?
>> 3. Is there any way to measure the time spent on data loading,
>> computation,
>> disk accesses, and communication separately?
>> 4. Any rule of thumb to estimate the memory needed for a program in
>> SystemML?
>>
>> I really appreciate your inputs!
>>
>>
>> Best,
>> Mingyang Wang
>>
>
>

Reply via email to