1) It's hard to debug these cases remotely, but errors like "Size exceeds Integer.MAX_VALUE" usually come from too large partitions that exceed Spark's 2GB limitation. The logs would be helpful here, especially the number of partitions of the second (i.e., post-shuffle) stage of csvrblk.

2) If you're interested in the caching logic, you can look at the following code for Spark and CP, respectively. First, for Spark, we inject caching directives (CheckpointSPInstruction) for out-of-core matrices after any persistent read, reblock, or for variables used read-only in loops, unless they are used in simple update chains. You'll see the status of cached RDDs in the Web UI. Second, for CP, the buffer pool (see CacheableData and LazyWriteBuffer) handles caching: during operations matrices are pinned via strong references, while in unpinned state they are only softly reachable which means that the garbage collector is free to drop them under memory pressure, in which case we would have to restore them into memory. You can verify that no eviction happened by looking at the cache statistics, specifically cache hits and writes to FS, i.e., our cache directory on local file system.


Regards,
Matthias

On 3/24/2017 10:10 AM, Mingyang Wang wrote:
Hi Matthias,

Thanks for your thorough explanations and suggestions! But I observed some
strange behaviors when reducing the driver size.

Say, I simply reduced the driver size to 10GB with anything else unchanged,
it worked fine for 20GB case, but errors occurred for 10GB case. Similar
behaviors have been observed after I further reduced the driver to 5GB.
Some error traces for driver size 10GB are attached below:

17/03/24 09:39:35 INFO ShuffleBlockFetcherIterator: Getting 313 non-empty
blocks out of 3
13 blocks
17/03/24 09:39:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
fetches in 4 ms
17/03/24 09:39:38 INFO BlockManagerInfo: Removed broadcast_4_piece0 on
198.202.119.114:37
680 in memory (size: 3.2 KB, free: 5.2 GB)
17/03/24 09:39:41 INFO UnifiedMemoryManager: Will not store rdd_8_32 as the
required spac
e (1048576 bytes) exceeds our memory limit (241164 bytes)
17/03/24 09:39:41 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0
KB for computing block rdd_8_32 in memory.
17/03/24 09:39:41 WARN MemoryStore: Not enough space to cache rdd_8_32 in
memory! (comput
ed 384.0 B so far)
17/03/24 09:39:41 INFO MemoryStore: Memory use = 235.5 KB (blocks) + 0.0 B
(scratch space
 shared across 0 tasks(s)) = 235.5 KB. Storage limit = 235.5 KB.
17/03/24 09:39:41 WARN BlockManager: Persisting block rdd_8_32 to disk
instead.
17/03/24 09:39:47 INFO BlockManagerInfo: Added rdd_8_32 on disk on
198.202.119.114:37680
(size: 3.9 GB)
17/03/24 09:39:47 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
939)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:868)
        at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
        at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
        at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:462)
        at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:698)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
17/03/24 09:39:47 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 939,
localhost, executor driver): java.lang.IllegalArgumentException: Size
exceeds Integer.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:868)
          at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
        at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
        at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:462)
        at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:698)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)...

If needed, I can attach the whole log.

Also, I want to verify that after the instruction sp_csvrblk completes, the
data is loaded into memory, either in control program or Spark executors,
right? I have tried to verify this in the source code, but it is not that
straightforward.


Regards,
Mingyang

On Thu, Mar 23, 2017 at 11:36 PM Matthias Boehm <mboe...@googlemail.com>
wrote:

well, after thinking some more about this issue, I have to correct myself
but the workarounds still apply. The problem is not the "in-memory reblock"
but the collect of the reblocked RDD, which is similarly handed over to the
bufferpool and eventually evicted accordingly.

For a subset of operations such as aggregates and matrix multiplication, we
actually select the execution type transitively, i.e., if the input comes
from spark, the operation is cheap, and reduces the data size, we
transitively select a distributed operation even though the operation would
fit in memory. I would classify the missing transitive operator selection
for right indexing operations as a bug - we'll fix this in our upcoming
0.14 release. Thanks for catching it.

Regards,
Matthias

On Thu, Mar 23, 2017 at 9:55 PM, Matthias Boehm <mboe...@googlemail.com>
wrote:

thanks for the feedback Mingyang. Let me quickly explain what happens here
and subsequently give you a couple of workarounds.

1. Understanding the Bottleneck: For any text inputs, we will initially
compile distributed reblock operations that convert the text
representation
into RDDs of matrix indexes and matrix blocks. At runtime, however,
SystemML decides to perform these conversions via a simple multi-threaded
read into memory if the data comfortably fits in the memory budget of the
driver (70% of max heap, i.e., 14GB in your case) and certain other
constraints are met. After the read, the in-memory matrix is handed over
to
the buffer pool (15% of max heap), and if it doesn't fit, it's written to
a
local cache directory, which is likely the reason for the performance
issue
you encountered. You can verify this by looking at the "-stats" output,
which includes the cache statistics,

Cache hits (Mem, WB, FS, HDFS):    1717/0/0/2.
Cache writes (WB, FS, HDFS):    573/0/1.
Cache times (ACQr/m, RLS, EXP):    9.799/0.005/0.106/0.250 sec

where ACQ stands for 'acquire read' which includes RDD collects and local
reads, and RLS stands for 'release' which includes evictions to local
disk.

2. Workarounds:
a) Binary data: In case of binary input data, you would likely not
encounter this issue, because we would not evict read binary matrices
since
they can be recovered from HDFS at any time. You could explicitly convert
the input data to its binary representation via a simple read - write
script (with format="binary"). From a benchmark perspective, this would be
the preferable option anyway, unless to want to benchmark the data
ingestion part form external formats.

b) Smaller Memory budget: Since all decisions on local vs distributed
operations respect available memory constraints, you can influence these
decisions by (1) a smaller driver memory to force more operations to
distribute execution, or (2) a larger driver memory, to make the buffer
pool large enough to avoid evictions.

c) Forced execution type: You can also force all operations - independent
of the driver memory budget - to distributed operations by putting "-exec
spark" into your command line invocation. From a benchmark perspective, I
would however, not recommend this.

Thanks again for the feedback. While writing this comment, I actually came
to the conclusion that we could handle even the case with input csv better
in order to avoid evictions in these scenarios.


Regards,
Matthias



On Thu, Mar 23, 2017 at 8:41 PM, Mingyang Wang <miw...@eng.ucsd.edu>
wrote:

Hi Everyone,

I got some some undesired results when running some benchmarks with
SystemML, and I really hope to get some helps from you guys.

Say, I want to read a large matrix in csv and retrieve the first cell
with
script

# simple_read.dml
data = read($RS)
print("One Cell data[1,1]: " + toString(data[1,1]))

and run it with the configuration such as

spark-submit \
    --driver-memory 20G \
    --executor-memory 100G \
    --executor-cores 20 \
    --num-executors 1 \
    --conf spark.driver.maxResultSize=0 \
    --conf spark.rpc.message.maxSize=128 \
    $SYSTEMML_HOME/target/SystemML.jar \
    -f simple_read.dml \
    -stats \
    -explain \
    -nvargs
RS=/oasis/scratch/comet/hyoung/temp_project/scatter_data/
TR20_FR4.00/RS_join.csv

When the input matrix is 2x10^7 by 103 (around 20GB), things worked fine,
as sp_rangeReIndex took 19.725s and sp_csvrblk took 11.330s. But when the
input matrix is reduced to 10^7 by 103 (around 10GB), interesting things
happened, as rangeReIndex took 92.024s and sp_csvrblk took 7.774s.

These results were obtained with SystemML v0.13.0 on Spark 2.1.0 in
standalone mode with 128GB memory and 24 cores.

From the log, it seems that for the latter case, the control program in
the
driver took the main job and led to lots of disk I/O, thus the whole
program had been slowed down.

I understand that assigning the control program some tasks is a key
component in SystemML. But this feature really brings in some chaos to
the
benchmarks in my case. Any suggestion about how to choose a better
configuration or make some detours so I can obtain fair benchmarks on a
wide range of data dimensions?

If needed, I can attach the logs.

I really appreciate your help!


Regards,
Mingyang Wang
Graduate Student in UCSD CSE Dept.




Reply via email to