1) Understanding execution plans: Our local bufferpool reads matrices in a lazy manner on the first singlenode, i.e., CP, operation that tries to pin the matrix into memory. Similarly, distributed matrices are read into aggregated memory on the first Spark instruction. Hence, you can differentiate these different scenarios by following the data dependencies, i.e., what kind of instructions use the particular matrix. Spark checkpoint instructions are a good indicator too but there are special cases where they will not exist.

2) Forcing computation: I typically use 'if(1==1){}' to create a statement block cut (and thus a DAG cut) and subsequently simply a 'print(sum(temp))' because we apply most algebraic rewrites only within the scope of individual statement blocks.

3) Permutation matrices: If FK has a single entry of value 1 per row, you could store it as a column vector with FK2 = rowIndexMax(FK) and subsequently reconstruct it via FK = table(seq(1,nrow(FK2)), FK2, nrow(FK2), N), for which we will compile a dedicated operator that does row expansions. You don't necessarily need the last two argument which only ensure padding and thus matching dimensions for the subsequent matrix multiplication.


Regards,
Matthias

On 4/20/2017 11:05 AM, Mingyang Wang wrote:
Hi Matthias,

Thanks for your thorough explanations! And I have some other questions.

1. I am curious about the behaviors of the read operation within createvar.
How can I differentiate whether the inputs are loaded in the driver memory
or loaded in executors? Can I assume the inputs are loaded in executors if
a Spark checkpoint instruction is invoked?

2. I am also curious how do you put a sum operation in a different DAG?
Currently, I put a "print one entry" instruction within a for loop, is it
sufficient to trigger the whole matrix multiplication without some
shortcuts like a dot product between a row and a column? At least, from the
HOP explains, the whole matrix multiplication is scheduled.

3. About generating a "specific" sparse matrix in SystemML. Say, I need a
sparse matrix of 200,000,000 x 10,000,000 and there is exactly one non-zero
value in each row (the position could be random). Is there any efficient
way to do it? Currently, I am generating such matrix externally in text
format, and it cannot be easily converted to binary format with a simple
read/write script (it took quite a long time and failed).


Regards,
Mingyang

On Thu, Apr 20, 2017 at 2:08 AM Matthias Boehm <mboe...@googlemail.com>
wrote:

Hi Mingyang,

thanks for the questions - this is very valuable feedback. I was able to
reproduce your performance issue on scenario 1 and I have a patch, which
I'll push to master tomorrow after a more thorough testing. Below are the
details and the answers to your questions:

1) Expected performance and bottlenecks: In general, for these single
operation scripts, the read is indeed the expected bottleneck. However,
excessive GC is usually an indicator for internal performance issues that
can be addressed. Let's discuss the scenarios individually:

a) Script 1 (in-memory operations): Given the mentioned data sizes, the
inputs are read into the driver and all operations are executed as
singlenode, in-memory operations. However, typically we read binary
matrices at 1GB/s and perform these matrix-vector operations at peak memory
bandwidth, i.e., 16-64GB/s on a single node.

The problem in your scenario is the read of the ultra-sparse matrix FK,
which has a sparsity of 10^-6, i.e., roughly a single cell per row. In my
environment the stats looked as follows:

Total elapsed time:             48.274 sec.
Total compilation time:         1.957 sec.
Total execution time:           46.317 sec.
Number of compiled MR Jobs:     0.
Number of executed MR Jobs:     0.
Cache hits (Mem, WB, FS, HDFS): 6/0/0/3.
Cache writes (WB, FS, HDFS):    4/0/0.
Cache times (ACQr/m, RLS, EXP): 45.078/0.001/0.005/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/0.
HOP DAGs recompile time:        0.000 sec.
Total JIT compile time:         9.24 sec.
Total JVM GC count:             23.
Total JVM GC time:              35.181 sec.
Heavy hitter instructions (name, time, count):
-- 1)   ba+*    45.927 sec      3
-- 2)   uak+    0.228 sec       1
-- 3)   +       0.138 sec       1
-- 4)   rand    0.023 sec       2
-- 5)   print   0.001 sec       1
-- 6)   ==      0.001 sec       1
-- 7)   createvar       0.000 sec       9
-- 8)   rmvar   0.000 sec       10
-- 9)   assignvar       0.000 sec       1
-- 10)  cpvar   0.000 sec       1

With the patch (that essentially leverages our CSR instead of MCSR sparse
format for temporarily read blocks in order to reduce the size overhead and
allow for efficient reuse), the execution time improved to the following

Total elapsed time:             14.860 sec.
Total compilation time:         1.922 sec.
Total execution time:           12.938 sec.
Number of compiled MR Jobs:     0.
Number of executed MR Jobs:     0.
Cache hits (Mem, WB, FS, HDFS): 6/0/0/3.
Cache writes (WB, FS, HDFS):    4/0/0.
Cache times (ACQr/m, RLS, EXP): 10.227/0.001/0.006/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/0.
HOP DAGs recompile time:        0.000 sec.
Total JIT compile time:         7.529 sec.
Total JVM GC count:             6.
Total JVM GC time:              4.174 sec.
Heavy hitter instructions (name, time, count):
-- 1)   ba+*    12.442 sec      3
-- 2)   uak+    0.380 sec       1
-- 3)   +       0.097 sec       1
-- 4)   rand    0.018 sec       2
-- 5)   ==      0.001 sec       1
-- 6)   print   0.000 sec       1
-- 7)   createvar       0.000 sec       9
-- 8)   rmvar   0.000 sec       10
-- 9)   cpvar   0.000 sec       1
-- 10)  assignvar       0.000 sec       1

b) Script 2 (distributed operations): This scenario looks as expected.
However, the stats output can be a little misleading due to Sparks lazy
evaluation. Since the read and matrix-vector multiplication are just
transformations, the collect action then triggers the entire pipeline and
accordingly shows up as the heavy hitter. Again, here are the stats from my
environment (where I used a sum in a different DAG to trigger compute):

Total elapsed time:             62.681 sec.
Total compilation time:         1.790 sec.
Total execution time:           60.891 sec.
Number of compiled Spark inst:  2.
Number of executed Spark inst:  2.
Cache hits (Mem, WB, FS, HDFS): 1/0/0/1.
Cache writes (WB, FS, HDFS):    1/0/0.
Cache times (ACQr/m, RLS, EXP): 26.323/0.001/0.004/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/1.
HOP DAGs recompile time:        0.005 sec.
Spark ctx create time (lazy):   33.687 sec.
Spark trans counts (par,bc,col):0/1/1.
Spark trans times (par,bc,col): 0.000/0.011/26.322 secs.
Total JIT compile time:         19.571 sec.
Total JVM GC count:             12.
Total JVM GC time:              0.536 sec.
Heavy hitter instructions (name, time, count):
-- 1)   sp_chkpoint     34.272 sec      1
-- 2)   uak+    26.474 sec      1
-- 3)   sp_mapmm        0.026 sec       1
-- 4)   rand    0.023 sec       1
-- 5)   rmvar   0.011 sec       7
-- 6)   ==      0.001 sec       1
-- 7)   print   0.000 sec       1
-- 8)   createvar       0.000 sec       4
-- 9)   assignvar       0.000 sec       1
-- 10)  cpvar   0.000 sec       1

Note, that 33s out of 62s are required for spark context creation
(allocating and initializing the yarn containers for executors). The
collect is then triggered by the sum (uak+, i.e., unary aggregate kahan
plus) which includes the collect. Furthermore, there is an unnecessary
checkpoint instructions which caches the input into storage level
mem-and-disk. SystemML has rewrites to remove these unnecessary checkpoints
but they do not apply here. Finally, note that the spark context creation
and initial read are one time costs that are amortized over many
iterations.

2) Performance tuning: The biggest tuning knobs are certainly the memory
configurations. Increasing the driver heap size can help to reduce garbage
collection overhead of singlenode operations and allow broadcasting larger
matrices because these broadcasts have to be constructed at the driver.
There are many additional tuning options such as compression, NUMA
awareness, and code generation but these require a more detailed
description.

3) Time breakdown: The stats output as shown above has some indicators
where time is spent. For example, ACQr (acquire read) shows the time for
pinning input matrices into driver memory before singlenode operations.
This bufferpool primitive includes the local read time from HDFS, restore
of evicted matrices, and collect of pending RDD operation outputs. The
Spark transfer counts and times (par,bc,col) give a more detailed view on
the time for RDD parallelization (driver->executors), broadcasts
(driver->executors), and collect (executors->driver). For distributed
operations, it's much more complex as the individual phases of read and
compute are overlapping, but the Spark UI provides very nice summary
statistics.

4) Resource estimation: Right now this requires a semi-manual
configuration. You can look at the explain hops output which gives you the
memory estimates of all operations. So if you want to execute all
operations in the driver, set the max heap such that the largest operation
fits into 70% of the max heap. Additionally, memory configurations also
impact operator selection - for example, we only compile broadcast-based
matrix multiplications if the smaller input fits twice in the driver and in
the broadcast budget of executors (which ensures that the broadcasts are
not spilled out). Looking forward, having an automated resource advisor
would be a very useful feature especially for cloud environments to assist
with cluster provisioning.

I hope this answers your questions and thanks again for catching this
performance issue.


Regards,
Matthias


On Wed, Apr 19, 2017 at 5:48 PM, Mingyang Wang <miw...@eng.ucsd.edu>
wrote:

Hi all,

I have run some simple matrix multiplication in SystemML and found that
JVM
GC time and Spark collect time are dominant.

For example, given 4 executors with 20 cores and 100GB memory each, and a
driver with 10GB memory, one setting is

R = read($R) # 1,000,000 x 80 -> 612M
S = read($S) # 20,000,000 x 20 -> 3G
FK = read($FK) # 20,000,000 x 1,000,000 (sparse) -> 358M
wS = Rand(rows=ncol(S), cols=1, min=0, max=1, pdf="uniform")
wR = Rand(rows=ncol(R), cols=1, min=0, max=1, pdf="uniform")

temp = S %*% wS + FK %*% (R %*% wR)
# some code to enforce the execution

It took 77.597s to execute while JVM GC took 70.282s.

Another setting is

T = read($T) # 20,000,000 x 100 -> 15G
w = Rand(rows=ncol(T), cols=1, min=0, max=1, pdf="uniform")

temp = T %*% w
# some code to enforce the execution

It took 92.582s to execute while Spark collect took 91.991s.

My questions are
1. Are these behaviors expected, as it seems only a tiny fraction of time
are spent on computation?
2. How can I tweak the configuration to tune the performance?
3. Is there any way to measure the time spent on data loading,
computation,
disk accesses, and communication separately?
4. Any rule of thumb to estimate the memory needed for a program in
SystemML?

I really appreciate your inputs!


Best,
Mingyang Wang




Reply via email to