Hi Matthias,

Thanks for your thorough explanations and suggestions! But I observed some
strange behaviors when reducing the driver size.

Say, I simply reduced the driver size to 10GB with anything else unchanged,
it worked fine for 20GB case, but errors occurred for 10GB case. Similar
behaviors have been observed after I further reduced the driver to 5GB.
Some error traces for driver size 10GB are attached below:

17/03/24 09:39:35 INFO ShuffleBlockFetcherIterator: Getting 313 non-empty
blocks out of 3
13 blocks
17/03/24 09:39:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
fetches in 4 ms
17/03/24 09:39:38 INFO BlockManagerInfo: Removed broadcast_4_piece0 on
198.202.119.114:37
680 in memory (size: 3.2 KB, free: 5.2 GB)
17/03/24 09:39:41 INFO UnifiedMemoryManager: Will not store rdd_8_32 as the
required spac
e (1048576 bytes) exceeds our memory limit (241164 bytes)
17/03/24 09:39:41 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0
KB for computing block rdd_8_32 in memory.
17/03/24 09:39:41 WARN MemoryStore: Not enough space to cache rdd_8_32 in
memory! (comput
ed 384.0 B so far)
17/03/24 09:39:41 INFO MemoryStore: Memory use = 235.5 KB (blocks) + 0.0 B
(scratch space
 shared across 0 tasks(s)) = 235.5 KB. Storage limit = 235.5 KB.
17/03/24 09:39:41 WARN BlockManager: Persisting block rdd_8_32 to disk
instead.
17/03/24 09:39:47 INFO BlockManagerInfo: Added rdd_8_32 on disk on
198.202.119.114:37680
(size: 3.9 GB)
17/03/24 09:39:47 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
939)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:868)
        at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
        at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
        at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:462)
        at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:698)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
17/03/24 09:39:47 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 939,
localhost, executor driver): java.lang.IllegalArgumentException: Size
exceeds Integer.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:868)
          at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
        at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
        at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:462)
        at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:698)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)...

If needed, I can attach the whole log.

Also, I want to verify that after the instruction sp_csvrblk completes, the
data is loaded into memory, either in control program or Spark executors,
right? I have tried to verify this in the source code, but it is not that
straightforward.


Regards,
Mingyang

On Thu, Mar 23, 2017 at 11:36 PM Matthias Boehm <mboe...@googlemail.com>
wrote:

well, after thinking some more about this issue, I have to correct myself
but the workarounds still apply. The problem is not the "in-memory reblock"
but the collect of the reblocked RDD, which is similarly handed over to the
bufferpool and eventually evicted accordingly.

For a subset of operations such as aggregates and matrix multiplication, we
actually select the execution type transitively, i.e., if the input comes
from spark, the operation is cheap, and reduces the data size, we
transitively select a distributed operation even though the operation would
fit in memory. I would classify the missing transitive operator selection
for right indexing operations as a bug - we'll fix this in our upcoming
0.14 release. Thanks for catching it.

Regards,
Matthias

On Thu, Mar 23, 2017 at 9:55 PM, Matthias Boehm <mboe...@googlemail.com>
wrote:

> thanks for the feedback Mingyang. Let me quickly explain what happens here
> and subsequently give you a couple of workarounds.
>
> 1. Understanding the Bottleneck: For any text inputs, we will initially
> compile distributed reblock operations that convert the text
representation
> into RDDs of matrix indexes and matrix blocks. At runtime, however,
> SystemML decides to perform these conversions via a simple multi-threaded
> read into memory if the data comfortably fits in the memory budget of the
> driver (70% of max heap, i.e., 14GB in your case) and certain other
> constraints are met. After the read, the in-memory matrix is handed over
to
> the buffer pool (15% of max heap), and if it doesn't fit, it's written to
a
> local cache directory, which is likely the reason for the performance
issue
> you encountered. You can verify this by looking at the "-stats" output,
> which includes the cache statistics,
>
> Cache hits (Mem, WB, FS, HDFS):    1717/0/0/2.
> Cache writes (WB, FS, HDFS):    573/0/1.
> Cache times (ACQr/m, RLS, EXP):    9.799/0.005/0.106/0.250 sec
>
> where ACQ stands for 'acquire read' which includes RDD collects and local
> reads, and RLS stands for 'release' which includes evictions to local
disk.
>
> 2. Workarounds:
> a) Binary data: In case of binary input data, you would likely not
> encounter this issue, because we would not evict read binary matrices
since
> they can be recovered from HDFS at any time. You could explicitly convert
> the input data to its binary representation via a simple read - write
> script (with format="binary"). From a benchmark perspective, this would be
> the preferable option anyway, unless to want to benchmark the data
> ingestion part form external formats.
>
> b) Smaller Memory budget: Since all decisions on local vs distributed
> operations respect available memory constraints, you can influence these
> decisions by (1) a smaller driver memory to force more operations to
> distribute execution, or (2) a larger driver memory, to make the buffer
> pool large enough to avoid evictions.
>
> c) Forced execution type: You can also force all operations - independent
> of the driver memory budget - to distributed operations by putting "-exec
> spark" into your command line invocation. From a benchmark perspective, I
> would however, not recommend this.
>
> Thanks again for the feedback. While writing this comment, I actually came
> to the conclusion that we could handle even the case with input csv better
> in order to avoid evictions in these scenarios.
>
>
> Regards,
> Matthias
>
>
>
> On Thu, Mar 23, 2017 at 8:41 PM, Mingyang Wang <miw...@eng.ucsd.edu>
> wrote:
>
>> Hi Everyone,
>>
>> I got some some undesired results when running some benchmarks with
>> SystemML, and I really hope to get some helps from you guys.
>>
>> Say, I want to read a large matrix in csv and retrieve the first cell
with
>> script
>>
>> # simple_read.dml
>> data = read($RS)
>> print("One Cell data[1,1]: " + toString(data[1,1]))
>>
>> and run it with the configuration such as
>>
>> spark-submit \
>>     --driver-memory 20G \
>>     --executor-memory 100G \
>>     --executor-cores 20 \
>>     --num-executors 1 \
>>     --conf spark.driver.maxResultSize=0 \
>>     --conf spark.rpc.message.maxSize=128 \
>>     $SYSTEMML_HOME/target/SystemML.jar \
>>     -f simple_read.dml \
>>     -stats \
>>     -explain \
>>     -nvargs
>> RS=/oasis/scratch/comet/hyoung/temp_project/scatter_data/
>> TR20_FR4.00/RS_join.csv
>>
>> When the input matrix is 2x10^7 by 103 (around 20GB), things worked fine,
>> as sp_rangeReIndex took 19.725s and sp_csvrblk took 11.330s. But when the
>> input matrix is reduced to 10^7 by 103 (around 10GB), interesting things
>> happened, as rangeReIndex took 92.024s and sp_csvrblk took 7.774s.
>>
>> These results were obtained with SystemML v0.13.0 on Spark 2.1.0 in
>> standalone mode with 128GB memory and 24 cores.
>>
>> From the log, it seems that for the latter case, the control program in
>> the
>> driver took the main job and led to lots of disk I/O, thus the whole
>> program had been slowed down.
>>
>> I understand that assigning the control program some tasks is a key
>> component in SystemML. But this feature really brings in some chaos to
the
>> benchmarks in my case. Any suggestion about how to choose a better
>> configuration or make some detours so I can obtain fair benchmarks on a
>> wide range of data dimensions?
>>
>> If needed, I can attach the logs.
>>
>> I really appreciate your help!
>>
>>
>> Regards,
>> Mingyang Wang
>> Graduate Student in UCSD CSE Dept.
>>
>
>

Reply via email to