feasibility of ignite and alluxio for interfacing MPI and Spark
Do Ignite and Alluxio offer reasonable means of transferring data, in memory, from Spark to MPI? A straightforward way to transfer data is use piping, but unless you have MPI processes running in a one-to-one mapping to the Spark partitions, this will require some complicated logic to get working (you'll have to handle multiple tasks sending their data to one process). It seems like potentially Ignite and Alluxio might allow you to pull the data you want into each of your MPI processes without worrying about such a requirement, but it's not clear to me from the high-level descriptions of the systems whether this is something that can be readily realized. Is this the case? Another issue is that with the piping solution, you only need to store two copies of the data: one each on the Spark and MPI sides. With Ignite and Alluxio, would you need three? It seems that they let you replace the standard RDDs with RDDs backed with their memory stores, but do those perform as efficiently as the standard Spark RDDs that are persisted in memory? More generally, I'd be interested to know if there are existing solutions to this problem of transferring data between MPI and Spark. Thanks for any insight you can offer! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/feasibility-of-ignite-and-alluxio-for-interfacing-MPI-and-Spark-tp27745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
how to write pyspark interface to scala code?
I have Scala Spark code for computing a matrix factorization. I'd like to make it possible to use this code from PySpark, so users can pass in a python RDD and receive back one without knowing or caring that Scala code is being called. Please point me to an example of code (e.g. somewhere in the Spark codebase, if it's clean enough) from which I can learn how to do this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-write-pyspark-interface-to-scala-code-tp26765.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
cause of RPC error?
I am simply trying to load an RDD from disk with transposeRowsRDD.avro(baseInputFname).rdd.map( ) and I get this error in my log: 16/02/04 11:44:07 ERROR TaskSchedulerImpl: Lost executor 7 on nid00788: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. When I check the log for that node (I guess this is what it means by driver?) I see: 16/02/04 11:43:55 INFO TorrentBroadcast: Started reading broadcast variable 152 16/02/04 11:43:55 INFO MemoryStore: Block broadcast_152_piece0 stored as bytes in memory (estimated size 19.3 KB, free 8.8 GB) 16/02/04 11:43:55 INFO TorrentBroadcast: Reading broadcast variable 152 took 4 ms 16/02/04 11:43:55 INFO MemoryStore: Block broadcast_152 stored as values in memory (estimated size 364.3 KB, free 8.8 GB) 16/02/04 11:43:56 INFO MemoryStore: Block rdd_1634_1637 stored as values in memory (estimated size 24.0 B, free 8.8 GB) 16/02/04 11:43:56 INFO Executor: Finished task 1637.0 in stage 0.0 (TID 1637). 2602 bytes result sent to driver 16/02/04 11:43:56 INFO CoarseGrainedExecutorBackend: Got assigned task 1643 16/02/04 11:43:56 INFO Executor: Running task 1643.0 in stage 0.0 (TID 1643) 16/02/04 11:43:56 INFO CacheManager: Partition rdd_1634_1643 not found, computing it 16/02/04 11:43:56 INFO HadoopRDD: Input split: file:/global/cscratch1/sd/gittens/CFSRA/CFSRAparquetTranspose/CFSRAparquetTranspose0/part-r-7-ddeb3951-d7da-4926-a16c-d54d71850131.avro:134217728+33554432 So the executor seems to have crashed without any error message being emitted, with plenty of memory on hand (I also grepped for WARNING messages, didn't see any). Any idea on what might be happening, or how to debug? Several other executors are also lost with the same behavior. I'm using Spark in standalone mode. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cause-of-RPC-error-tp26151.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: cause of RPC error?
To clarify, that's the tail of the node stderr log, so the last message shown is at the EOF. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cause-of-RPC-error-tp26151p26152.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
failure to parallelize an RDD
I transpose a matrix (colChunkOfA) stored as a 200-by-54843210 as an array of rows in Array[Array[Float]] format into another matrix (rowChunk) also stored row-wise as a 54843210-by-200 Array[Array[Float]] using the following code: val rowChunk = new Array[Tuple2[Int,Array[Float]]](numCols) val colIndices = (0 until colChunkOfA.length).toArray (0 until numCols).foreach( rowIdx => { rowChunk(rowIdx) = Tuple2(rowIdx, colIndices.map(colChunkOfA(_)(rowIdx))) }) This succeeds, but the following code which attempts to turn rowChunk into an RDD fails silently: spark-submit just ends, and none of the executor logs indicate any errors occurring. val parallelRowChunkRDD = sc.parallelize(rowChunk).cache parallelRowChunkRDD.count What is the culprit here? Here is the log output starting from the count instruction: 16/01/13 02:23:38 INFO SparkContext: Starting job: count at transposeAvroToAvroChunks.scala:129 16/01/13 02:23:38 INFO DAGScheduler: Got job 3 (count at transposeAvroToAvroChunks.scala:129) with 928 output partitions 16/01/13 02:23:38 INFO DAGScheduler: Final stage: ResultStage 3(count at transposeAvroToAvroChunks.scala:129) 16/01/13 02:23:38 INFO DAGScheduler: Parents of final stage: List() 16/01/13 02:23:38 INFO DAGScheduler: Missing parents: List() 16/01/13 02:23:38 INFO DAGScheduler: Submitting ResultStage 3 (ParallelCollectionRDD[2448] at parallelize at transposeAvroToAvroChunks.scala:128), which has no missing parents 16/01/13 02:23:38 INFO MemoryStore: ensureFreeSpace(1048) called with curMem=50917367, maxMem=127452201615 16/01/13 02:23:38 INFO MemoryStore: Block broadcast_615 stored as values in memory (estimated size 1048.0 B, free 118.7 GB) 16/01/13 02:23:38 INFO MemoryStore: ensureFreeSpace(740) called with curMem=50918415, maxMem=127452201615 16/01/13 02:23:38 INFO MemoryStore: Block broadcast_615_piece0 stored as bytes in memory (estimated size 740.0 B, free 118.7 GB) 16/01/13 02:23:38 INFO BlockManagerInfo: Added broadcast_615_piece0 in memory on 172.31.36.112:36581 (size: 740.0 B, free: 118.7 GB) 16/01/13 02:23:38 INFO SparkContext: Created broadcast 615 from broadcast at DAGScheduler.scala:861 16/01/13 02:23:38 INFO DAGScheduler: Submitting 928 missing tasks from ResultStage 3 (ParallelCollectionRDD[2448] at parallelize at transposeAvroToAvroChunks.scala:128) 16/01/13 02:23:38 INFO TaskSchedulerImpl: Adding task set 3.0 with 928 tasks 16/01/13 02:23:39 WARN TaskSetManager: Stage 3 contains a task of very large size (47027 KB). The maximum recommended task size is 100 KB. 16/01/13 02:23:39 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 1219, 172.31.34.184, PROCESS_LOCAL, 48156290 bytes) ... 16/01/13 02:27:13 INFO TaskSetManager: Starting task 927.0 in stage 3.0 (TID 2146, 172.31.42.67, PROCESS_LOCAL, 48224789 bytes) 16/01/13 02:27:17 INFO BlockManagerInfo: Removed broadcast_419_piece0 on 172.31.36.112:36581 in memory (size: 17.4 KB, free: 118.7 GB) 16/01/13 02:27:21 INFO BlockManagerInfo: Removed broadcast_419_piece0 on 172.31.35.157:51059 in memory (size: 17.4 KB, free: 10.4 GB) 16/01/13 02:27:21 INFO BlockManagerInfo: Removed broadcast_419_piece0 on 172.31.47.118:34888 in memory (size: 17.4 KB, free: 10.4 GB) 16/01/13 02:27:22 INFO BlockManagerInfo: Removed broadcast_419_piece0 on 172.31.38.42:48582 in memory (size: 17.4 KB, free: 10.4 GB) 16/01/13 02:27:38 INFO BlockManagerInfo: Added broadcast_615_piece0 in memory on 172.31.41.68:59281 (size: 740.0 B, free: 10.4 GB) 16/01/13 02:27:55 INFO BlockManagerInfo: Added broadcast_615_piece0 in memory on 172.31.47.118:59575 (size: 740.0 B, free: 10.4 GB) 16/01/13 02:28:47 INFO BlockManagerInfo: Added broadcast_615_piece0 in memory on 172.31.40.24:55643 (size: 740.0 B, free: 10.4 GB) 16/01/13 02:28:49 INFO BlockManagerInfo: Added broadcast_615_piece0 in memory on 172.31.47.118:53671 (size: 740.0 B, free: 10.4 GB) This is the end of the log, so it looks like all 928 tasks got started, but presumably somewhere in running, they ran into an error. Nothing shows up in the executor logs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/failure-to-parallelize-an-RDD-tp25950.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to make a dataframe of Array[Doubles] ?
My attempts to create a dataframe of Array[Doubles], I get an error about RDD[Array[Double]] not having a toDF function: import sqlContext.implicits._ val testvec = Array( Array(1.0, 2.0, 3.0, 4.0), Array(5.0, 6.0, 7.0, 8.0)) val testrdd = sc.parallelize(testvec) testrdd.toDF gives :29: error: value toDF is not a member of org.apache.spark.rdd.RDD[Array[Double]] testrdd.toD on the other hand, if I make the dataframe more complicated, e.g. Tuple2[String, Array[Double]], the transformation goes through: val testvec = Array( ("row 1", Array(1.0, 2.0, 3.0, 4.0)), ("row 2", Array(5.0, 6.0, 7.0, 8.0)) ) val testrdd = sc.parallelize(testvec) testrdd.toDF gives testrdd: org.apache.spark.rdd.RDD[(String, Array[Double])] = ParallelCollectionRDD[1] at parallelize at :29 res3: org.apache.spark.sql.DataFrame = [_1: string, _2: array] What's the cause of this, and how can I get around it to create a dataframe of Array[Double]? My end goal is to store that dataframe in Parquet (yes, I do want to store all the values in a single column, not individual columns) I am using Spark 1.5.2 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-dataframe-of-Array-Doubles-tp25704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
distcp suddenly broken with spark-ec2 script setup
I've been using the same method to launch my clusters then pull my data from S3 to local hdfs: $SPARKHOME/ec2/spark-ec2 -k mykey -i ~/.ssh/mykey.pem -s 29 --instance-type=r3.8xlarge --placement-group=pcavariants --copy-aws-credentials --hadoop-major-version=2 --spot-price=2.8 launch mycluster --region=us-west-2 then ephemeral-hdfs/bin/hadoop distcp s3n://agittens/CFSRArawtars CFSRArawtars Before this worked as I'd expect. Within the last several days, I've been getting this error when I run the distcp command: 2015-12-10 00:16:43,113 WARN httpclient.RestS3Service (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' - Unexpected response code 404, expected 200 2015-12-10 00:16:43,207 WARN httpclient.RestS3Service (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200 2015-12-10 00:16:43,422 WARN httpclient.RestS3Service (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' - Unexpected response code 404, expected 200 2015-12-10 00:16:43,513 WARN httpclient.RestS3Service (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200 2015-12-10 00:16:43,737 WARN httpclient.RestS3Service (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' - Unexpected response code 404, expected 200 2015-12-10 00:16:43,830 WARN httpclient.RestS3Service (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200 2015-12-10 00:16:44,015 WARN httpclient.RestS3Service (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' - Unexpected response code 404, expected 200 2015-12-10 00:16:46,141 WARN conf.Configuration (Configuration.java:warnOnceIfDeprecated(824)) - io.sort.mb is deprecated. Instead, use mapreduce.task.io.sort.mb 2015-12-10 00:16:46,141 WARN conf.Configuration (Configuration.java:warnOnceIfDeprecated(824)) - io.sort.factor is deprecated. Instead, use mapreduce.task.io.sort.factor 2015-12-10 00:16:46,630 INFO service.AbstractService (AbstractService.java:init(81)) - Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited. 2015-12-10 00:16:46,630 INFO service.AbstractService (AbstractService.java:start(94)) - Service:org.apache.hadoop.yarn.client.YarnClientImpl is started. 2015-12-10 00:16:47,135 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(368)) - number of splits:21 Then the job hangs and does nothing until I kill it. Any idea what the problem is and how to fix it, or a work-around for getting my data off S3 quickly? It is around 4 TB. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distcp-suddenly-broken-with-spark-ec2-script-setup-tp25658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet runs out of memory when reading in a huge matrix
I am trying to multiply against a large matrix that is stored in parquet format, so am being careful not to store the RDD in memory, but am getting an OOM error from the parquet reader: 15/12/06 05:23:36 WARN TaskSetManager: Lost task 950.0 in stage 4.0 (TID 28398, 172.31.34.233): java.lang.OutOfMemoryError: Java heap space at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:755) at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:494) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) ... Specifically, the matrix is a 46752-by-54843120 dense matrix of 32-bit floats that is stored in parquet format (each row is about 1.7GB uncompressed). The following code loads this matrix as a Spark IndexedRowMatrix and multiplies it by a random vector (the rows are stored with an associated string label, and the floats have to be converted to doubles because IndexedRows can only use doubles): val rows = { sqlContext.read.parquet(datafname).rdd.map { case SQLRow(rowname: String, values: WrappedArray[Float]) => // DenseVectors have to be doubles val vector = new DenseVector(values.toArray.map(v => v.toDouble)) new IndexedRow(indexLUT(rowname), vector) } } val nrows : Long = 46752 val ncols = 54843120 val A = new IndexedRowMatrix(rows, nrows, ncols) A.rows.unpersist() // doesn't help avoid OOM val x = new DenseMatrix(ncols, 1, BDV.rand(ncols).data) A.multiply(x).rows.collect I am using the following options when running --driver-memory 220G --num-executors 203 --executor-cores 4 --executor-memory 25G --conf spark.storage.memoryFraction=0 There are 25573 partitions to the parquet file, so the uncompressed Float values of each partition should be less than 4Gb; I expect this should imply that the current executor memory is much more than sufficient (I cannot raise the executor-memory setting). Any ideas why this is running into OOM errors and how to fix it? The only thought I've had is that it could be related to the fact that there are only about 4.5K physical part- files for the parquet dataset, but Spark partitions it into 25K partitions when loading the dataframe, so some rows must be being distributed across partitions on different executors, so maybe it is caching portions of rows to aid with shuffling ... if this is the case, any suggestions on how to ameliorate this situation? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-runs-out-of-memory-when-reading-in-a-huge-matrix-tp25590.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
controlling parquet file sizes for faster transfer to S3 from HDFS
Is there a way to control how large the part- files are for a parquet dataset? I'm currently using e.g. results.toDF.coalesce(60).write.mode("append").parquet(outputdir) to manually reduce the number of parts, but this doesn't map linearly to fewer parts: I noticed that coalescing to 30 actually gives smaller parts. I'd like to be able to specify the size of the parts- directly rather than guess and check what coalesce value to use. Why I care: my data is ~3Tb in Parquet form, with about 16 thousand files of around 200MB each. Transferring this from HDFS on EC2 to S3 based on the transfer rate I calculated from the yarn webui's progress indicator will take more than 4 hours. By way of comparison, when I transferred 3.8 Tb of data out from S3 to HDFS on EC2, that only took about 1.5 hours; there the files were 1.7 Gb each. Minimizing the transfer time is important because I'll be taking the dataset out of S3 many times. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/controlling-parquet-file-sizes-for-faster-transfer-to-S3-from-HDFS-tp25490.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why does a 3.8 T dataset take up 11.59 Tb on HDFS
I downloaded a 3.8 T dataset from S3 to a freshly launched spark-ec2 cluster with 16.73 Tb storage, using distcp. The dataset is a collection of tar files of about 1.7 Tb each. Nothing else was stored in the HDFS, but after completing the download, the namenode page says that 11.59 Tb are in use. When I use hdfs du -h -s, I see that the dataset only takes up 3.8 Tb as expected. I navigated through the entire HDFS hierarchy from /, and don't see where the missing space is. Any ideas what is going on and how to rectify it? I'm using the spark-ec2 script to launch, with the command spark-ec2 -k key -i ~/.ssh/key.pem -s 29 --instance-type=r3.8xlarge --placement-group=pcavariants --copy-aws-credentials --hadoop-major-version=yarn --spot-price=2.8 --region=us-west-2 launch conversioncluster and am not modifying any configuration files for Hadoop. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-does-a-3-8-T-dataset-take-up-11-59-Tb-on-HDFS-tp25471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
out of memory error with Parquet
I'm using Spark to read in a data from many files and write it back out in Parquet format for ease of use later on. Currently, I'm using this code: val fnamesRDD = sc.parallelize(fnames, ceil(fnames.length.toFloat/numfilesperpartition).toInt) val results = fnamesRDD.mapPartitionsWithIndex((index, fnames) => extractData(fnames, variablenames, index)) results.toDF.saveAsParquetFile(outputdir) extractData returns an array of tuples of (filename, array of floats) corresponding to all the files in the partition. Each partition results in about .6Gb data, corresponding to just 3 files per partition. The problem is, I have 100s of files to convert, and apparently saveAsParquetFile tries to pull down the data from too many of the conversion tasks onto the driver at a time, so causes an OOM. E.g., I got an error about it trying to pull down >4GB of data corresponding to 9 tasks onto the driver. I could increase the driver memory, but this wouldn't help if saveAsParquet then decided to pull in 100 tasks at a time. Is there a way to avoid this OOM error? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/out-of-memory-error-with-Parquet-tp25381.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: out of memory error with Parquet
Never mind; when I switched to Spark 1.5.0, my code works as written and is pretty fast! Looking at some Parquet related Spark jiras, it seems that Parquet is known to have some memory issues with buffering and writing, and that at least some were resolved in Spark 1.5.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/out-of-memory-error-with-Parquet-tp25381p25382.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark on yarn is slower than spark-ec2 standalone, how to tune?
I'm using a manually installation of Spark under Yarn to run a 30 node r3.8xlarge EC2 cluster (each node has 244Gb RAM, 600Gb SDD). All my code runs much faster on a cluster launched w/ the spark-ec2 script, but there's a mysterious problem with nodes becoming inaccessible, so I switched to using Spark under Yarn because I figure Yarn wouldn't let Spark eat up all the resources and render a machine inaccessible. So far, this seems to be the case. Now my code runs to completion, but much slower, so I'm wondering how I can tune my Spark under Yarn installation to make it as fast as the standalone spark install. The current code I'm interested in speeding up just loads a dense 1Tb matrix from Parquet format and then computes a low rank approximation by essentially doing a bunch of distributed matrix multiplies. Before my code completed in half an hour from loading to writing the output, now I expect it to take 4 or so hours to complete. My spark-submit options are --master yarn \ --num-executors 29 \ --driver-memory 180G \ --executor-memory 180G \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=$LOGDIR \ --conf spark.driver.maxResultSize=50G \ --conf spark.task.maxFailures=4 \ --conf spark.worker.timeout=120 \ --conf spark.network.timeout=120 \ the huge timeouts were necessary on EC2 to avoid losing executors. Not sure that they've remained necessary when switching to Yarn. My yarn-site.xml has the following settings: property nameyarn.nodemanager.resource.memory-mb/name value236000/value /property property nameyarn.scheduler.minimum-allocation-mb/name value59000/value /property property nameyarn.scheduler.maximum-allocation-mb/name value22/value /property Any suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-on-yarn-is-slower-than-spark-ec2-standalone-how-to-tune-tp24282.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
what is cause of, and how to recover from, unresponsive nodes w/ spark-ec2 script
I'm using the spark-ec2 script to launch a 30 node r3.8xlarge cluster. Occasionally several nodes will become unresponsive: I will notice that hdfs complains it can't find some blocks, then when I go to restart hadoop, the messages indicate that the connection to some nodes timed out, then when I check, I can't ssh into those nodes at all. Is this a problem others have experienced? What is causing this random failure--- or where can I look to find relevant logs---, and how can I recover from this other than to destroy the cluster and start anew (time-consuming, tedious, and requiring that I pull down my large dataset from S3 to HDFS once again, but this is what I've been doing currently)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-cause-of-and-how-to-recover-from-unresponsive-nodes-w-spark-ec2-script-tp24235.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark hangs at broadcasting during a filter
I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some of the rows of A are relevant, so the following code first loads the triplets are text, splits them into Tuple3[Int, Int, Double], drops triplets whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int, Double]]] for each row (if I'm judging datatypes correctly). val valsrows = sc.textFile(valsinpath).map(_.split(,)). map(x = (x(1).toInt, (x(0).toInt, x(2).toDouble))). filter(x = !droprows.contains(x._1)). groupByKey. map(x = (x._1, x._2.toSeq.sortBy(_._1))) Spark hangs during a broadcast that occurs during the filter step (according to the Spark UI). The last two lines in the log before it pauses are: 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB) 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB) I've left Spark running for up to 17 minutes one time, and it never continues past this point. I'm using a cluster of 30 r3.8xlarge EC2 instances (244Gb, 32 cores) with spark in standalone mode with 220G executor and driver memory, and using the kyroserializer. Any ideas on what could be causing this hang? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
small accumulator gives out of memory error
When I call the following minimal working example, the accumulator matrix is 32-by-100K, and each executor has 64G but I get an out of memory error: Exception in thread main java.lang.OutOfMemoryError: Requested array size exceeds VM limit Here BDM is a Breeze DenseMatrix object BDMAccumulatorParam extends AccumulatorParam[BDM[Double]] { def zero(initialValue: BDM[Double]): BDM[Double] = { BDM.zeros[Double](initialValue.rows, initialValue.cols) } def addInPlace(m1: BDM[Double], m2: BDM[Double]) : BDM[Double] = { m1 += m2 } } def testfun(mat: IndexedRowMatrix, lhs: DenseMatrix) : DenseMatrix = { val accum = mat.rows.context.accumulator(BDM.zeros[Double](lhs.numRows.toInt, mat.numCols.toInt))(BDMAccumulatorParam) mat.rows.foreach(row = accum += BDM.ones[Double](lhs.numRows.toInt, mat.numCols.toInt)) fromBreeze(accum.value) } Any ideas or suggestions on how to avoid this error? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/small-accumulator-gives-out-of-memory-error-tp23864.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java heap error
I'm trying to compute the Frobenius norm error in approximating an IndexedRowMatrix A with the product L*R where L and R are Breeze DenseMatrices. I've written the following function that computes the squared error over each partition of rows then sums up to get the total squared error (ignore the mean argument, it is not used). It works on a smaller dataset that I've been using to test my code, but fails on the full-sized A with an error about the java heap being out of size. A here is an 8mil-by-100K matrix partitioned into 5015 parts, L is 8mil-by-16, and R is 16-by-100K. Together L and R take up way less memory than I have on each executor (64 Gb), so I don't understand the cause of the error. def calcCenteredFrobNormErr(mat: IndexedRowMatrix, lhsTall: BDM[Double], rhsFat: BDM[Double], mean: BDV[Double] ) : Double = { val lhsFactor = mat.rows.context.broadcast(lhsTall) val rhsFactor = mat.rows.context.broadcast(rhsFat) def partitionDiffFrobNorm2(rowiter : Iterator[IndexedRow], lhsFactor: Broadcast[BDM[Double]], rhsFactor: Broadcast[BDM[Double]]) : Iterator[Double] = { val lhsTall = lhsFactor.value val rhsFat = rhsFactor.value val rowlist = rowiter.toList val numrows = rowlist.length val matSubMat = BDM.zeros[Double](numrows, mat.numCols.toInt) val lhsSubMat = BDM.zeros[Double](numrows, lhsTall.cols) var currowindex = 0 rowlist.foreach( (currow: IndexedRow) = { currow.vector.foreachActive { case (j, v) = matSubMat(currowindex, j) = v } lhsSubMat(currowindex, ::) := lhsTall(currow.index.toInt, ::) currowindex += 1 } ) val diffmat = matSubMat - lhsSubMat * rhsFat List(sum(diffmat :* diffmat)).iterator } report(Beginning to compute Frobenius norm, true) val res = mat.rows.mapPartitions(rowiter = partitionDiffFrobNorm2(rowiter, lhsFactor, rhsFactor)).reduce(_ + _) report(Finished computing Frobenius norm, true) math.sqrt(res) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-heap-error-tp23856.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
out of memory error in treeAggregate
I'm using the following function to compute B*A where B is a 32-by-8mil Breeze DenseMatrix and A is a 8mil-by-100K IndexedRowMatrix. // computes BA where B is a local matrix and A is distributed: let b_i denote the // ith col of B and a_i denote the ith row of A, then BA = sum(b_i a_i) def leftMultiplyCenteredMatrixBy(mat: IndexedRowMatrix, lhs: DenseMatrix, avg: BDV[Double]) : DenseMatrix = { val lhsBrz = lhs.toBreeze.asInstanceOf[BDM[Double]] val result = mat.rows.treeAggregate(BDM.zeros[Double](lhs.numRows.toInt, mat.numCols.toInt))( seqOp = (U: BDM[Double], row: IndexedRow) = { val rowBrz = row.vector.toBreeze.asInstanceOf[BSV[Double]] - avg U += lhsBrz(::, row.index.toInt) * rowBrz.t }, combOp = (U1, U2) = U1 += U2 ) fromBreeze(result) } The accumulator used by the treeAggregate call is only 32-by-100K, and B is less than a Gb. The executors have 64Gb RAM, yet the call fails with the error Exception in thread main java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1072) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1067) at org.apache.spark.mllib.linalg.distributed.SVDVariants$.leftMultiplyCenteredMatrixBy(SVDVariants.scala:120) Any idea what's going on/how to fix it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/out-of-memory-error-in-treeAggregate-tp23859.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: breeze.linalg.DenseMatrix not found
I get the same error even when I define covOperator not to use a matrix at all: def covOperator(v : BDV[Double]) :BDV[Double] = { v } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/breeze-linalg-DenseMatrix-not-found-tp23537p23538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
breeze.linalg.DenseMatrix not found
I'm trying to compute the eigendecomposition of a matrix in a portion of my code, using mllib.linalg.EigenValueDecomposition (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala ) as follows: val tol = 1e-10 val maxIter = 300 var testmat = DenseMatrix.ones(50,50).toBreeze.asInstanceOf[BDM[Double]] testmat *= testmat.t def covOperator(v : BDV[Double]) :BDV[Double] = { testmat*v } val (lambda2, u2) = EigenValueDecomposition.symmetricEigs(covOperator, 50, rank, tol, maxIter) The code compiles, but fails when I spark-submit the jar, with a java.lang.NoClassDefFoundError: breeze/linalg/DenseMatrix error at the line where I call EigenValueDecomposition Any ideas what the issue might be? I use breeze.linalg.DenseMatrix as BDM elsewhere in the code and had no runtime issues until I inserted the call to EigenValueDecomposition. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/breeze-linalg-DenseMatrix-not-found-tp23537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org