[ 
https://issues.apache.org/jira/browse/SYSTEMML-994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15537897#comment-15537897
 ] 

Matthias Boehm commented on SYSTEMML-994:
-----------------------------------------

after a closer look, it turned out that the matrix-to-frame converter had two 
fundamental problems (in addition to the minor points mentioned above) - the 
solutions are as follows: 

(3) Shuffle of matrix block instead of frame blocks: The current approach was 
to create for each matrix block an extended frame block, followed by a 
mergeByKey (including shuffle). However, as each matrix block of column 
blocksize 1K is extended to a frameblock in the number of columns (64K in this 
case) and because frameblocks are always represented in dense, this created 
temporary memory requirements 64x (+overhead) larger than the inputs. The 
solution is simple, instead of shuffling frame blocks, we should create and 
shuffle extended matrix blocks (which are represented in sparse) and only 
convert full matrix blocks to full frame blocks. 

(4) Frame block conversion via column set appends: For the specific case of 
dense matrix blocks to frame blocks, we use a specific cache-conscious 
row-major to column-major conversion with column appends. However, the recent 
change of array-based frame block schema/meta data in SYSTEMML-557, requires 
array reallocations per column append, which really hurts with large number of 
columns (due to O(n^2) cell allocations/copies). We should append the entire 
prepared column set once, which avoids any reallocations altogether.


> GC OOM: Binary Matrix to Frame Conversion
> -----------------------------------------
>
>                 Key: SYSTEMML-994
>                 URL: https://issues.apache.org/jira/browse/SYSTEMML-994
>             Project: SystemML
>          Issue Type: Bug
>            Reporter: Mike Dusenberry
>            Priority: Blocker
>
> I currently have a SystemML matrix saved to HDFS in binary block format, and 
> am attempting to read it in, convert it to a {{frame}}, and then pass that to 
> an algorithm so that I can pull batches out of it with minimal overhead.
> When attempting to run this, I am repeatedly hitting the following GC limit:
> {code}
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>       at 
> org.apache.sysml.runtime.matrix.data.FrameBlock.ensureAllocatedColumns(FrameBlock.java:281)
>       at 
> org.apache.sysml.runtime.matrix.data.FrameBlock.copy(FrameBlock.java:979)
>       at 
> org.apache.sysml.runtime.matrix.data.FrameBlock.copy(FrameBlock.java:965)
>       at 
> org.apache.sysml.runtime.matrix.data.FrameBlock.<init>(FrameBlock.java:91)
>       at 
> org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils$CreateBlockCombinerFunction.call(FrameRDDAggregateUtils.java:57)
>       at 
> org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils$CreateBlockCombinerFunction.call(FrameRDDAggregateUtils.java:48)
>       at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
>       at 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:187)
>       at 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:186)
>       at 
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:148)
>       at 
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>       at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
>       at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Script:
> {code}
> train = read("train")
> val = read("val")
> trainf = as.frame(train)
> valf = as.frame(val)
> // Rest of algorithm, which passes the frames to DML functions, and performs 
> row indexing to pull out batches, convert to matrices, and train.
> {code}
> Cluster setup:
> * Spark Standalone
> * 1 Master, 9 Workers
> * 47 cores, 124 GB available to Spark on each Worker (1 core + 1GB saved for 
> OS)
> * spark.driver.memory 80g
> * spark.executor.memory 21g
> * spark.executor.cores 3
> * spark.default.parallelism 20000
> * spark.driver.maxResultSize 0
> * spark.akka.frameSize 128
> * spark.network.timeout 1000s
> Note: This is using today's latest build as of 09.29.16 1:30PM PST.
> cc [~mboehm7], [~acs_s]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to