[ 
https://issues.apache.org/jira/browse/SYSTEMML-1281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872610#comment-15872610
 ] 

Felix Schüler commented on SYSTEMML-1281:
-----------------------------------------

I have not really looked into reproducing/debugging it, so this is a little bit 
guessing here:

When we implemented the Flink conversions we ran into a similar issue which had 
to do with the number of slots that were allocated on one TaskManager (I think 
the corresponding names for Spark are task and executor). 
The problem in Flink was even worse because flink lazily allocated operator 
memory which made it hard for systemml to estimate remaining memory. Now here 
could be a similar issue where you have too many threads running on one machine 
that all at the same time allocate buffers inside Spark Aggregation UDFs and 
therefore surpass the available heap space. 

Decreasing the number of executors/threads that run on one machine might help? 
In Flink we put in some executor-aware memory estimate that would look at how 
many executors are running and what the total memory budget per worker node is. 
Each executor would then only get the correct fraction.

> OOM Error On Binary Write
> -------------------------
>
>                 Key: SYSTEMML-1281
>                 URL: https://issues.apache.org/jira/browse/SYSTEMML-1281
>             Project: SystemML
>          Issue Type: Bug
>    Affects Versions: SystemML 0.13
>            Reporter: Mike Dusenberry
>            Priority: Blocker
>
> I'm running into the following heap space OOM error while attempting to save 
> a large Spark DataFrame to a SystemML binary format via DML {{write}} 
> statements.
> Script:
> {code}
> tr_sample_filename = os.path.join("data", "train_{}{}.parquet".format(size, 
> "_grayscale" if grayscale else ""))
> val_sample_filename = os.path.join("data", "val_{}{}.parquet".format(size, 
> "_grayscale" if grayscale else ""))
> train_df = sqlContext.read.load(tr_sample_filename)
> val_df = sqlContext.read.load(val_sample_filename)
> train_df, val_df
> # Note: Must use the row index column, or X may not
> # necessarily correspond correctly to Y
> X_df = train_df.select("__INDEX", "sample")
> X_val_df = val_df.select("__INDEX", "sample")
> y_df = train_df.select("__INDEX", "tumor_score")
> y_val_df = val_df.select("__INDEX", "tumor_score")
> X_df, X_val_df, y_df, y_val_df
> script = """
> # Scale images to [-1,1]
> X = X / 255
> X_val = X_val / 255
> X = X * 2 - 1
> X_val = X_val * 2 - 1
> # One-hot encode the labels
> num_tumor_classes = 3
> n = nrow(y)
> n_val = nrow(y_val)
> Y = table(seq(1, n), y, n, num_tumor_classes)
> Y_val = table(seq(1, n_val), y_val, n_val, num_tumor_classes)
> """
> outputs = ("X", "X_val", "Y", "Y_val")
> script = dml(script).input(X=X_df, X_val=X_val_df, y=y_df, 
> y_val=y_val_df).output(*outputs)
> X, X_val, Y, Y_val = ml.execute(script).get(*outputs)
> X, X_val, Y, Y_val
> script = """
> write(X, "data/systemml/X_"+size+"_"+c+"_binary", format="binary")
> write(Y, "data/systemml/Y_"+size+"_"+c+"_binary", format="binary")
> write(X_val, "data/systemml/X_val_"+size+"_"+c+"_binary", format="binary")
> write(Y_val, "data/systemml/Y_val_"+size+"_"+c+"_binary", format="binary")
> """
> script = dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val, size=size, c=c)
> ml.execute(script)
> {code}
> General error:
> {code}
> Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception 
> occurred while executing runtime program
>       at 
> org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(ScriptExecutor.java:371)
>       at 
> org.apache.sysml.api.mlcontext.ScriptExecutor.execute(ScriptExecutor.java:292)
>       at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:293)
>       ... 12 more
> Caused by: org.apache.sysml.runtime.DMLRuntimeException: 
> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in program 
> block generated from statement block between lines 1 and 11 -- Error 
> evaluating instruction: CP°mvvar°X°¶_Var49¶°binaryblock
>       at 
> org.apache.sysml.runtime.controlprogram.Program.execute(Program.java:130)
>       at 
> org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(ScriptExecutor.java:369)
>       ... 14 more
> Caused by: org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error 
> in program block generated from statement block between lines 1 and 11 -- 
> Error evaluating instruction: CP°mvvar°X°¶_Var49¶°binaryblock
>       at 
> org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:320)
>       at 
> org.apache.sysml.runtime.controlprogram.ProgramBlock.executeInstructions(ProgramBlock.java:221)
>       at 
> org.apache.sysml.runtime.controlprogram.ProgramBlock.execute(ProgramBlock.java:168)
>       at 
> org.apache.sysml.runtime.controlprogram.Program.execute(Program.java:123)
>       ... 15 more
> Caused by: org.apache.sysml.runtime.controlprogram.caching.CacheException: 
> Move to data/systemml/X_256_3_binary failed.
>       at 
> org.apache.sysml.runtime.controlprogram.caching.CacheableData.moveData(CacheableData.java:1329)
>       at 
> org.apache.sysml.runtime.instructions.cp.VariableCPInstruction.processMoveInstruction(VariableCPInstruction.java:706)
>       at 
> org.apache.sysml.runtime.instructions.cp.VariableCPInstruction.processInstruction(VariableCPInstruction.java:511)
>       at 
> org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:290)
>       ... 18 more
> Caused by: org.apache.sysml.runtime.controlprogram.caching.CacheException: 
> Export to data/systemml/X_256_3_binary failed.
>       at 
> org.apache.sysml.runtime.controlprogram.caching.CacheableData.exportData(CacheableData.java:800)
>       at 
> org.apache.sysml.runtime.controlprogram.caching.CacheableData.exportData(CacheableData.java:688)
>       at 
> org.apache.sysml.runtime.controlprogram.caching.CacheableData.moveData(CacheableData.java:1315)
>       ... 21 more
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 269 in stage 40.0 failed 4 times, most recent failure: Lost task 269.3 
> in stage 40.0 (TID 61177, 9.30.110.145, executor 10): ExecutorLostFailure 
> (executor 10 exited caused by one of the running tasks) Reason: Remote RPC 
> client disassociated. Likely due to containers exceeding thresholds, or 
> network issues. Check driver logs for WARN messages.
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1456)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1444)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1443)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1671)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1626)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1615)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2015)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2036)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1154)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1095)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1095)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1095)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1069)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
>       at 
> org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:803)
>       at 
> org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext.writeRDDtoHDFS(SparkExecutionContext.java:976)
>       at 
> org.apache.sysml.runtime.controlprogram.caching.MatrixObject.writeBlobFromRDDtoHDFS(MatrixObject.java:558)
>       at 
> org.apache.sysml.runtime.controlprogram.caching.CacheableData.exportData(CacheableData.java:796)
>       ... 23 more
> {code}
> Actual OOM error on one of many tasks:
> {code}
> java.lang.OutOfMemoryError: Java heap space
>     at 
> org.apache.sysml.runtime.matrix.data.MatrixBlock.allocateDenseBlock(MatrixBlock.java:363)
>     at 
> org.apache.sysml.runtime.matrix.data.MatrixBlock.copyDenseToDense(MatrixBlock.java:1308)
>     at 
> org.apache.sysml.runtime.matrix.data.MatrixBlock.copy(MatrixBlock.java:1271)
>     at 
> org.apache.sysml.runtime.matrix.data.MatrixBlock.copy(MatrixBlock.java:1249)
>     at 
> org.apache.sysml.runtime.matrix.data.MatrixBlock.<init>(MatrixBlock.java:153)
>     at 
> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$CreateBlockCombinerFunction.call(RDDAggregateUtils.java:260)
>     at 
> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$CreateBlockCombinerFunction.call(RDDAggregateUtils.java:251)
>     at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
>     at 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
>     at 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
>     at 
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
>     at 
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>     at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
>     at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>     at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>     at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>     at org.apache.spark.scheduler.Task.run(Task.scala:113)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to