well, after thinking some more about this issue, I have to correct myself
but the workarounds still apply. The problem is not the "in-memory reblock"
but the collect of the reblocked RDD, which is similarly handed over to the
bufferpool and eventually evicted accordingly.

For a subset of operations such as aggregates and matrix multiplication, we
actually select the execution type transitively, i.e., if the input comes
from spark, the operation is cheap, and reduces the data size, we
transitively select a distributed operation even though the operation would
fit in memory. I would classify the missing transitive operator selection
for right indexing operations as a bug - we'll fix this in our upcoming
0.14 release. Thanks for catching it.

Regards,
Matthias

On Thu, Mar 23, 2017 at 9:55 PM, Matthias Boehm <mboe...@googlemail.com>
wrote:

> thanks for the feedback Mingyang. Let me quickly explain what happens here
> and subsequently give you a couple of workarounds.
>
> 1. Understanding the Bottleneck: For any text inputs, we will initially
> compile distributed reblock operations that convert the text representation
> into RDDs of matrix indexes and matrix blocks. At runtime, however,
> SystemML decides to perform these conversions via a simple multi-threaded
> read into memory if the data comfortably fits in the memory budget of the
> driver (70% of max heap, i.e., 14GB in your case) and certain other
> constraints are met. After the read, the in-memory matrix is handed over to
> the buffer pool (15% of max heap), and if it doesn't fit, it's written to a
> local cache directory, which is likely the reason for the performance issue
> you encountered. You can verify this by looking at the "-stats" output,
> which includes the cache statistics,
>
> Cache hits (Mem, WB, FS, HDFS):    1717/0/0/2.
> Cache writes (WB, FS, HDFS):    573/0/1.
> Cache times (ACQr/m, RLS, EXP):    9.799/0.005/0.106/0.250 sec
>
> where ACQ stands for 'acquire read' which includes RDD collects and local
> reads, and RLS stands for 'release' which includes evictions to local disk.
>
> 2. Workarounds:
> a) Binary data: In case of binary input data, you would likely not
> encounter this issue, because we would not evict read binary matrices since
> they can be recovered from HDFS at any time. You could explicitly convert
> the input data to its binary representation via a simple read - write
> script (with format="binary"). From a benchmark perspective, this would be
> the preferable option anyway, unless to want to benchmark the data
> ingestion part form external formats.
>
> b) Smaller Memory budget: Since all decisions on local vs distributed
> operations respect available memory constraints, you can influence these
> decisions by (1) a smaller driver memory to force more operations to
> distribute execution, or (2) a larger driver memory, to make the buffer
> pool large enough to avoid evictions.
>
> c) Forced execution type: You can also force all operations - independent
> of the driver memory budget - to distributed operations by putting "-exec
> spark" into your command line invocation. From a benchmark perspective, I
> would however, not recommend this.
>
> Thanks again for the feedback. While writing this comment, I actually came
> to the conclusion that we could handle even the case with input csv better
> in order to avoid evictions in these scenarios.
>
>
> Regards,
> Matthias
>
>
>
> On Thu, Mar 23, 2017 at 8:41 PM, Mingyang Wang <miw...@eng.ucsd.edu>
> wrote:
>
>> Hi Everyone,
>>
>> I got some some undesired results when running some benchmarks with
>> SystemML, and I really hope to get some helps from you guys.
>>
>> Say, I want to read a large matrix in csv and retrieve the first cell with
>> script
>>
>> # simple_read.dml
>> data = read($RS)
>> print("One Cell data[1,1]: " + toString(data[1,1]))
>>
>> and run it with the configuration such as
>>
>> spark-submit \
>>     --driver-memory 20G \
>>     --executor-memory 100G \
>>     --executor-cores 20 \
>>     --num-executors 1 \
>>     --conf spark.driver.maxResultSize=0 \
>>     --conf spark.rpc.message.maxSize=128 \
>>     $SYSTEMML_HOME/target/SystemML.jar \
>>     -f simple_read.dml \
>>     -stats \
>>     -explain \
>>     -nvargs
>> RS=/oasis/scratch/comet/hyoung/temp_project/scatter_data/
>> TR20_FR4.00/RS_join.csv
>>
>> When the input matrix is 2x10^7 by 103 (around 20GB), things worked fine,
>> as sp_rangeReIndex took 19.725s and sp_csvrblk took 11.330s. But when the
>> input matrix is reduced to 10^7 by 103 (around 10GB), interesting things
>> happened, as rangeReIndex took 92.024s and sp_csvrblk took 7.774s.
>>
>> These results were obtained with SystemML v0.13.0 on Spark 2.1.0 in
>> standalone mode with 128GB memory and 24 cores.
>>
>> From the log, it seems that for the latter case, the control program in
>> the
>> driver took the main job and led to lots of disk I/O, thus the whole
>> program had been slowed down.
>>
>> I understand that assigning the control program some tasks is a key
>> component in SystemML. But this feature really brings in some chaos to the
>> benchmarks in my case. Any suggestion about how to choose a better
>> configuration or make some detours so I can obtain fair benchmarks on a
>> wide range of data dimensions?
>>
>> If needed, I can attach the logs.
>>
>> I really appreciate your help!
>>
>>
>> Regards,
>> Mingyang Wang
>> Graduate Student in UCSD CSE Dept.
>>
>
>

Reply via email to