[
https://issues.apache.org/jira/browse/SYSTEMML-994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15537897#comment-15537897
]
Matthias Boehm commented on SYSTEMML-994:
-----------------------------------------
after a closer look, it turned out that the matrix-to-frame converter had two
fundamental problems (in addition to the minor points mentioned above) - the
solutions are as follows:
(3) Shuffle of matrix block instead of frame blocks: The current approach was
to create for each matrix block an extended frame block, followed by a
mergeByKey (including shuffle). However, as each matrix block of column
blocksize 1K is extended to a frameblock in the number of columns (64K in this
case) and because frameblocks are always represented in dense, this created
temporary memory requirements 64x (+overhead) larger than the inputs. The
solution is simple, instead of shuffling frame blocks, we should create and
shuffle extended matrix blocks (which are represented in sparse) and only
convert full matrix blocks to full frame blocks.
(4) Frame block conversion via column set appends: For the specific case of
dense matrix blocks to frame blocks, we use a specific cache-conscious
row-major to column-major conversion with column appends. However, the recent
change of array-based frame block schema/meta data in SYSTEMML-557, requires
array reallocations per column append, which really hurts with large number of
columns (due to O(n^2) cell allocations/copies). We should append the entire
prepared column set once, which avoids any reallocations altogether.
> GC OOM: Binary Matrix to Frame Conversion
> -----------------------------------------
>
> Key: SYSTEMML-994
> URL: https://issues.apache.org/jira/browse/SYSTEMML-994
> Project: SystemML
> Issue Type: Bug
> Reporter: Mike Dusenberry
> Priority: Blocker
>
> I currently have a SystemML matrix saved to HDFS in binary block format, and
> am attempting to read it in, convert it to a {{frame}}, and then pass that to
> an algorithm so that I can pull batches out of it with minimal overhead.
> When attempting to run this, I am repeatedly hitting the following GC limit:
> {code}
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.sysml.runtime.matrix.data.FrameBlock.ensureAllocatedColumns(FrameBlock.java:281)
> at
> org.apache.sysml.runtime.matrix.data.FrameBlock.copy(FrameBlock.java:979)
> at
> org.apache.sysml.runtime.matrix.data.FrameBlock.copy(FrameBlock.java:965)
> at
> org.apache.sysml.runtime.matrix.data.FrameBlock.<init>(FrameBlock.java:91)
> at
> org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils$CreateBlockCombinerFunction.call(FrameRDDAggregateUtils.java:57)
> at
> org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils$CreateBlockCombinerFunction.call(FrameRDDAggregateUtils.java:48)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:187)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:186)
> at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:148)
> at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Script:
> {code}
> train = read("train")
> val = read("val")
> trainf = as.frame(train)
> valf = as.frame(val)
> // Rest of algorithm, which passes the frames to DML functions, and performs
> row indexing to pull out batches, convert to matrices, and train.
> {code}
> Cluster setup:
> * Spark Standalone
> * 1 Master, 9 Workers
> * 47 cores, 124 GB available to Spark on each Worker (1 core + 1GB saved for
> OS)
> * spark.driver.memory 80g
> * spark.executor.memory 21g
> * spark.executor.cores 3
> * spark.default.parallelism 20000
> * spark.driver.maxResultSize 0
> * spark.akka.frameSize 128
> * spark.network.timeout 1000s
> Note: This is using today's latest build as of 09.29.16 1:30PM PST.
> cc [~mboehm7], [~acs_s]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)