feasibility of ignite and alluxio for interfacing MPI and Spark

2016-09-16 Thread AlexG
Do Ignite and Alluxio offer reasonable means of transferring data, in memory,
from Spark to MPI? A straightforward way to transfer data is use piping, but
unless you have MPI processes running in a one-to-one mapping to the Spark
partitions, this will require some complicated logic to get working (you'll
have to handle multiple tasks sending their data to one process). 

It seems like potentially Ignite and Alluxio might allow you to pull the
data you want into each of your MPI processes without worrying about such a
requirement, but it's not clear to me from the high-level descriptions of
the systems whether this is something that can be readily realized. Is this
the case?

Another issue is that with the piping solution, you only need to store two
copies of the data: one each on the Spark and MPI sides. With Ignite and
Alluxio, would you need three? It seems that they let you replace the
standard RDDs with RDDs backed with their memory stores, but do those
perform as efficiently as the standard Spark RDDs that are persisted in
memory?

More generally, I'd be interested to know if there are existing solutions to
this problem of transferring data between MPI and Spark. Thanks for any
insight you can offer!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/feasibility-of-ignite-and-alluxio-for-interfacing-MPI-and-Spark-tp27745.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how to write pyspark interface to scala code?

2016-04-12 Thread AlexG
I have Scala Spark code for computing a matrix factorization. I'd like to
make it possible to use this code from PySpark, so users can pass in a
python RDD and receive back one without knowing or caring that Scala code is
being called.

Please point me to an example of code (e.g. somewhere in the Spark codebase,
if it's clean enough) from which I can learn how to do this.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-write-pyspark-interface-to-scala-code-tp26765.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



cause of RPC error?

2016-02-04 Thread AlexG
I am simply trying to load an RDD from disk with
transposeRowsRDD.avro(baseInputFname).rdd.map( )
and I get this error in my log:

16/02/04 11:44:07 ERROR TaskSchedulerImpl: Lost executor 7 on nid00788:
Remote RPC client disassociated. Likely due to containers exceeding
thresholds, or network issues. Check driver logs for WARN messages.

When I check the log for that node (I guess this is what it means by
driver?) I see:

16/02/04 11:43:55 INFO TorrentBroadcast: Started reading broadcast variable
152
16/02/04 11:43:55 INFO MemoryStore: Block broadcast_152_piece0 stored as
bytes in memory (estimated size 19.3 KB, free 8.8 GB)
16/02/04 11:43:55 INFO TorrentBroadcast: Reading broadcast variable 152 took
4 ms
16/02/04 11:43:55 INFO MemoryStore: Block broadcast_152 stored as values in
memory (estimated size 364.3 KB, free 8.8 GB)
16/02/04 11:43:56 INFO MemoryStore: Block rdd_1634_1637 stored as values in
memory (estimated size 24.0 B, free 8.8 GB)
16/02/04 11:43:56 INFO Executor: Finished task 1637.0 in stage 0.0 (TID
1637). 2602 bytes result sent to driver
16/02/04 11:43:56 INFO CoarseGrainedExecutorBackend: Got assigned task 1643
16/02/04 11:43:56 INFO Executor: Running task 1643.0 in stage 0.0 (TID 1643)
16/02/04 11:43:56 INFO CacheManager: Partition rdd_1634_1643 not found,
computing it
16/02/04 11:43:56 INFO HadoopRDD: Input split:
file:/global/cscratch1/sd/gittens/CFSRA/CFSRAparquetTranspose/CFSRAparquetTranspose0/part-r-7-ddeb3951-d7da-4926-a16c-d54d71850131.avro:134217728+33554432

So the executor seems to have crashed without any error message being
emitted, with plenty of memory on hand (I also grepped for WARNING messages,
didn't see any). Any idea on what might be happening, or how to debug?
Several other executors are also lost with the same behavior. I'm using
Spark in standalone mode.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cause-of-RPC-error-tp26151.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: cause of RPC error?

2016-02-04 Thread AlexG
To clarify, that's the tail of the node stderr log, so the last message shown
is at the EOF.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cause-of-RPC-error-tp26151p26152.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



failure to parallelize an RDD

2016-01-12 Thread AlexG
I transpose a matrix (colChunkOfA) stored as a 200-by-54843210 as an array of
rows in Array[Array[Float]] format into another matrix (rowChunk) also
stored row-wise as a 54843210-by-200 Array[Array[Float]] using the following
code:

val rowChunk = new Array[Tuple2[Int,Array[Float]]](numCols)
val colIndices = (0 until colChunkOfA.length).toArray

(0 until numCols).foreach( rowIdx => { 
  rowChunk(rowIdx) = Tuple2(rowIdx, colIndices.map(colChunkOfA(_)(rowIdx)))
})   

This succeeds, but the following code which attempts to turn rowChunk into
an RDD fails silently: spark-submit just ends, and none of the executor logs
indicate any errors occurring. 

val parallelRowChunkRDD = sc.parallelize(rowChunk).cache
parallelRowChunkRDD.count

What is the culprit here?

Here is the log output starting from the count instruction:

16/01/13 02:23:38 INFO SparkContext: Starting job: count at
transposeAvroToAvroChunks.scala:129
16/01/13 02:23:38 INFO DAGScheduler: Got job 3 (count at
transposeAvroToAvroChunks.scala:129) with 928 output partitions
16/01/13 02:23:38 INFO DAGScheduler: Final stage: ResultStage 3(count at
transposeAvroToAvroChunks.scala:129)
16/01/13 02:23:38 INFO DAGScheduler: Parents of final stage: List()
16/01/13 02:23:38 INFO DAGScheduler: Missing parents: List()
16/01/13 02:23:38 INFO DAGScheduler: Submitting ResultStage 3
(ParallelCollectionRDD[2448] at parallelize at
transposeAvroToAvroChunks.scala:128), which has no missing parents
16/01/13 02:23:38 INFO MemoryStore: ensureFreeSpace(1048) called with
curMem=50917367, maxMem=127452201615
16/01/13 02:23:38 INFO MemoryStore: Block broadcast_615 stored as values in
memory (estimated size 1048.0 B, free 118.7 GB)
16/01/13 02:23:38 INFO MemoryStore: ensureFreeSpace(740) called with
curMem=50918415, maxMem=127452201615
16/01/13 02:23:38 INFO MemoryStore: Block broadcast_615_piece0 stored as
bytes in memory (estimated size 740.0 B, free 118.7 GB)
16/01/13 02:23:38 INFO BlockManagerInfo: Added broadcast_615_piece0 in
memory on 172.31.36.112:36581 (size: 740.0 B, free: 118.7 GB)
16/01/13 02:23:38 INFO SparkContext: Created broadcast 615 from broadcast at
DAGScheduler.scala:861
16/01/13 02:23:38 INFO DAGScheduler: Submitting 928 missing tasks from
ResultStage 3 (ParallelCollectionRDD[2448] at parallelize at
transposeAvroToAvroChunks.scala:128)
16/01/13 02:23:38 INFO TaskSchedulerImpl: Adding task set 3.0 with 928 tasks
16/01/13 02:23:39 WARN TaskSetManager: Stage 3 contains a task of very large
size (47027 KB). The maximum recommended task size is 100 KB.
16/01/13 02:23:39 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
1219, 172.31.34.184, PROCESS_LOCAL, 48156290 bytes)
...
16/01/13 02:27:13 INFO TaskSetManager: Starting task 927.0 in stage 3.0 (TID
2146, 172.31.42.67, PROCESS_LOCAL, 48224789 bytes)
16/01/13 02:27:17 INFO BlockManagerInfo: Removed broadcast_419_piece0 on
172.31.36.112:36581 in memory (size: 17.4 KB, free: 118.7 GB)
16/01/13 02:27:21 INFO BlockManagerInfo: Removed broadcast_419_piece0 on
172.31.35.157:51059 in memory (size: 17.4 KB, free: 10.4 GB)
16/01/13 02:27:21 INFO BlockManagerInfo: Removed broadcast_419_piece0 on
172.31.47.118:34888 in memory (size: 17.4 KB, free: 10.4 GB)
16/01/13 02:27:22 INFO BlockManagerInfo: Removed broadcast_419_piece0 on
172.31.38.42:48582 in memory (size: 17.4 KB, free: 10.4 GB)
16/01/13 02:27:38 INFO BlockManagerInfo: Added broadcast_615_piece0 in
memory on 172.31.41.68:59281 (size: 740.0 B, free: 10.4 GB)
16/01/13 02:27:55 INFO BlockManagerInfo: Added broadcast_615_piece0 in
memory on 172.31.47.118:59575 (size: 740.0 B, free: 10.4 GB)
16/01/13 02:28:47 INFO BlockManagerInfo: Added broadcast_615_piece0 in
memory on 172.31.40.24:55643 (size: 740.0 B, free: 10.4 GB)
16/01/13 02:28:49 INFO BlockManagerInfo: Added broadcast_615_piece0 in
memory on 172.31.47.118:53671 (size: 740.0 B, free: 10.4 GB)
 
This is the end of the log, so it looks like all 928 tasks got started, but
presumably somewhere in running, they ran into an error. Nothing shows up in
the executor logs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/failure-to-parallelize-an-RDD-tp25950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to make a dataframe of Array[Doubles] ?

2015-12-14 Thread AlexG
My attempts to create a dataframe of Array[Doubles], I get an error about
RDD[Array[Double]] not having a toDF function:

import sqlContext.implicits._
val testvec = Array( Array(1.0, 2.0, 3.0, 4.0), Array(5.0, 6.0, 7.0, 8.0))
val testrdd = sc.parallelize(testvec)
testrdd.toDF

gives

:29: error: value toDF is not a member of
org.apache.spark.rdd.RDD[Array[Double]]
  testrdd.toD

on the other hand, if I make the dataframe more complicated, e.g.
Tuple2[String, Array[Double]], the transformation goes through:

val testvec = Array( ("row 1", Array(1.0, 2.0, 3.0, 4.0)), ("row 2",
Array(5.0, 6.0, 7.0, 8.0)) )
val testrdd = sc.parallelize(testvec)
testrdd.toDF

gives
testrdd: org.apache.spark.rdd.RDD[(String, Array[Double])] =
ParallelCollectionRDD[1] at parallelize at :29
res3: org.apache.spark.sql.DataFrame = [_1: string, _2: array]

What's the cause of this, and how can I get around it to create a dataframe
of Array[Double]? My end goal is to store that dataframe in Parquet (yes, I
do want to store all the values in a single column, not individual columns)

I am using Spark 1.5.2




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-dataframe-of-Array-Doubles-tp25704.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



distcp suddenly broken with spark-ec2 script setup

2015-12-09 Thread AlexG
I've been using the same method to launch my clusters then pull my data from
S3 to local hdfs:

$SPARKHOME/ec2/spark-ec2 -k mykey -i ~/.ssh/mykey.pem -s 29
--instance-type=r3.8xlarge --placement-group=pcavariants
--copy-aws-credentials --hadoop-major-version=2 --spot-price=2.8 launch
mycluster --region=us-west-2

then

ephemeral-hdfs/bin/hadoop distcp s3n://agittens/CFSRArawtars CFSRArawtars

Before this worked as I'd expect. Within the last several days, I've been
getting this error when I run the distcp command:
2015-12-10 00:16:43,113 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:16:43,207 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response
'/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
2015-12-10 00:16:43,422 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:16:43,513 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response
'/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
2015-12-10 00:16:43,737 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:16:43,830 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response
'/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
2015-12-10 00:16:44,015 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:16:46,141 WARN  conf.Configuration
(Configuration.java:warnOnceIfDeprecated(824)) - io.sort.mb is deprecated.
Instead, use mapreduce.task.io.sort.mb
2015-12-10 00:16:46,141 WARN  conf.Configuration
(Configuration.java:warnOnceIfDeprecated(824)) - io.sort.factor is
deprecated. Instead, use mapreduce.task.io.sort.factor
2015-12-10 00:16:46,630 INFO  service.AbstractService
(AbstractService.java:init(81)) -
Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited.
2015-12-10 00:16:46,630 INFO  service.AbstractService
(AbstractService.java:start(94)) -
Service:org.apache.hadoop.yarn.client.YarnClientImpl is started.
2015-12-10 00:16:47,135 INFO  mapreduce.JobSubmitter
(JobSubmitter.java:submitJobInternal(368)) - number of splits:21

Then the job hangs and does nothing until I kill it. Any idea what the
problem is and how to fix it, or a work-around for getting my data off S3
quickly? It is around 4 TB.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distcp-suddenly-broken-with-spark-ec2-script-setup-tp25658.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parquet runs out of memory when reading in a huge matrix

2015-12-05 Thread AlexG
I am trying to multiply against a large matrix that is stored in parquet
format, so am being careful not to store the RDD in memory, but am getting
an OOM error from the parquet reader:
 
15/12/06 05:23:36 WARN TaskSetManager: Lost task 950.0 in stage 4.0   
(TID 28398, 172.31.34.233): java.lang.OutOfMemoryError: Java heap space
at
org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:755)
at
org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:494)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
...

Specifically, the matrix is a 46752-by-54843120 dense matrix of 32-bit
floats that is stored in parquet format (each row is about 1.7GB
uncompressed). 

The following code loads this matrix as a Spark IndexedRowMatrix and
multiplies it by a random vector (the rows are stored with an associated
string label, and the floats have to be converted to doubles because
IndexedRows can only use doubles):

val rows = {
  sqlContext.read.parquet(datafname).rdd.map {
case SQLRow(rowname: String, values: WrappedArray[Float]) =>
// DenseVectors have to be doubles
  val vector = new DenseVector(values.toArray.map(v => v.toDouble)) 
  new IndexedRow(indexLUT(rowname), vector)
}
}

val nrows : Long = 46752
val ncols = 54843120
val A = new IndexedRowMatrix(rows, nrows, ncols)
A.rows.unpersist() // doesn't help avoid OOM

val x = new DenseMatrix(ncols, 1, BDV.rand(ncols).data)
A.multiply(x).rows.collect

I am using the following options when running

--driver-memory 220G
--num-executors 203
--executor-cores 4
--executor-memory 25G
--conf spark.storage.memoryFraction=0

There are 25573 partitions to the parquet file, so the uncompressed Float
values of each partition should be less than 4Gb; I expect this should imply
that the current executor memory is much more than sufficient (I cannot
raise the executor-memory setting).

Any ideas why this is running into OOM errors and how to fix it?  The only
thought I've had is that it could be related to the fact that there are only
about 4.5K physical part- files for the parquet dataset, but Spark
partitions it into 25K partitions when loading the dataframe, so some rows
must be being distributed across partitions on different executors, so maybe
it is caching portions of rows to aid with shuffling ... if this is the
case, any suggestions on how to ameliorate this situation?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-runs-out-of-memory-when-reading-in-a-huge-matrix-tp25590.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



controlling parquet file sizes for faster transfer to S3 from HDFS

2015-11-26 Thread AlexG
Is there a way to control how large the part- files are for a parquet
dataset? I'm currently using e.g.

results.toDF.coalesce(60).write.mode("append").parquet(outputdir)

to manually reduce the number of parts, but this doesn't map linearly to
fewer parts: I noticed that coalescing to 30 actually gives smaller parts.
I'd like to be able to specify the size of the parts- directly rather than
guess and check what coalesce value to use.

Why I care: my data is ~3Tb in Parquet form, with about 16 thousand files of
around 200MB each. Transferring this from HDFS on EC2 to S3 based on the
transfer rate I calculated from the yarn webui's progress indicator will
take more than 4 hours. By way of comparison, when I transferred 3.8 Tb of
data out from S3 to HDFS on EC2, that only took about 1.5 hours; there the
files were 1.7 Gb each. 

Minimizing the transfer time is important because I'll be taking the dataset
out of S3 many times.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/controlling-parquet-file-sizes-for-faster-transfer-to-S3-from-HDFS-tp25490.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Why does a 3.8 T dataset take up 11.59 Tb on HDFS

2015-11-24 Thread AlexG
I downloaded a 3.8 T dataset from S3 to a freshly launched spark-ec2 cluster
with 16.73 Tb storage, using
distcp. The dataset is a collection of tar files of about 1.7 Tb each.
Nothing else was stored in the HDFS, but after completing the download, the
namenode page says that 11.59 Tb are in use. When I use hdfs du -h -s, I see
that the dataset only takes up 3.8 Tb as expected. I navigated through the
entire HDFS hierarchy from /, and don't see where the missing space is. Any
ideas what is going on and how to rectify it?

I'm using the spark-ec2 script to launch, with the command

spark-ec2 -k key -i ~/.ssh/key.pem -s 29 --instance-type=r3.8xlarge
--placement-group=pcavariants --copy-aws-credentials
--hadoop-major-version=yarn --spot-price=2.8 --region=us-west-2 launch
conversioncluster

and am not modifying any configuration files for Hadoop.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-does-a-3-8-T-dataset-take-up-11-59-Tb-on-HDFS-tp25471.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



out of memory error with Parquet

2015-11-13 Thread AlexG
I'm using Spark to read in a data from many files and write it back out in
Parquet format for ease of use later on. Currently, I'm using this code:

 val fnamesRDD = sc.parallelize(fnames,
ceil(fnames.length.toFloat/numfilesperpartition).toInt)
 val results = fnamesRDD.mapPartitionsWithIndex((index, fnames) =>
extractData(fnames, variablenames, index))
 results.toDF.saveAsParquetFile(outputdir)

extractData returns an array of tuples of (filename, array of floats)
corresponding to all the files in the partition. Each partition results in
about .6Gb data, corresponding to just 3 files per partition.

The problem is, I have 100s of files to convert, and apparently
saveAsParquetFile tries to pull down the data from too many of the
conversion tasks onto the driver at a time, so causes an OOM. E.g., I got an
error about it trying to pull down >4GB of data corresponding to 9 tasks
onto the driver. I could increase the driver memory, but this wouldn't help
if saveAsParquet then decided to pull in 100 tasks at a time.

Is there a way to avoid this OOM error?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/out-of-memory-error-with-Parquet-tp25381.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: out of memory error with Parquet

2015-11-13 Thread AlexG
Never mind; when I switched to Spark 1.5.0, my code works as written and is
pretty fast! Looking at some Parquet related Spark jiras, it seems that
Parquet is known to have some memory issues with buffering and writing, and
that at least some were resolved in Spark 1.5.0. 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/out-of-memory-error-with-Parquet-tp25381p25382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark on yarn is slower than spark-ec2 standalone, how to tune?

2015-08-15 Thread AlexG
I'm using a manually installation of Spark under Yarn to run a 30 node
r3.8xlarge EC2 cluster (each node has 244Gb RAM, 600Gb SDD). All my code
runs much faster on a cluster launched w/ the spark-ec2 script, but there's
a mysterious problem with  nodes becoming inaccessible, so I switched to
using Spark under Yarn because I figure Yarn wouldn't let Spark eat up all
the resources and render a machine inaccessible. So far, this seems to be
the case. Now my code runs to completion, but much slower, so I'm wondering
how I can tune my Spark under Yarn installation to make it as fast as the
standalone spark install.

The current code I'm interested in speeding up just loads a dense 1Tb matrix
from Parquet format and then computes a low rank approximation by
essentially doing a bunch of distributed matrix multiplies. Before my code
completed in half an hour from loading to writing the output, now I expect
it to take 4 or so hours to complete.

My spark-submit options are 
  --master yarn \
  --num-executors 29 \
  --driver-memory 180G \
  --executor-memory 180G \
  --conf spark.eventLog.enabled=true \
  --conf spark.eventLog.dir=$LOGDIR \
  --conf spark.driver.maxResultSize=50G \
  --conf spark.task.maxFailures=4 \
  --conf spark.worker.timeout=120 \
  --conf spark.network.timeout=120 \
the huge timeouts were necessary on EC2 to avoid losing executors. Not sure
that they've remained necessary when switching to Yarn. 

My yarn-site.xml has the following settings:
property
nameyarn.nodemanager.resource.memory-mb/name
value236000/value
  /property
  property
nameyarn.scheduler.minimum-allocation-mb/name
value59000/value
  /property
  property
nameyarn.scheduler.maximum-allocation-mb/name
value22/value
  /property

Any suggestions?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-on-yarn-is-slower-than-spark-ec2-standalone-how-to-tune-tp24282.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



what is cause of, and how to recover from, unresponsive nodes w/ spark-ec2 script

2015-08-12 Thread AlexG
I'm using the spark-ec2 script to launch a 30 node r3.8xlarge cluster.
Occasionally several nodes will become unresponsive: I will notice that hdfs
complains it can't find some blocks, then when I go to restart hadoop, the
messages indicate that the connection to some nodes timed out, then when I
check, I can't ssh into those nodes at all.

Is this a problem others have experienced? What is causing this random
failure--- or where can I look to find relevant logs---, and how can I
recover from this other than to destroy the cluster and start anew
(time-consuming, tedious, and requiring that I pull down my large dataset
from S3 to HDFS once again, but this is what I've been doing currently)?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-cause-of-and-how-to-recover-from-unresponsive-nodes-w-spark-ec2-script-tp24235.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark hangs at broadcasting during a filter

2015-08-05 Thread AlexG
I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some
of the rows of A are relevant, so the following code first loads the
triplets are text, splits them into Tuple3[Int, Int, Double], drops triplets
whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
Double]]] for each row (if I'm judging datatypes correctly).

val valsrows = sc.textFile(valsinpath).map(_.split(,)).
  map(x = (x(1).toInt, (x(0).toInt,
x(2).toDouble))).
  filter(x = !droprows.contains(x._1)).
  groupByKey.
  map(x = (x._1, x._2.toSeq.sortBy(_._1)))

Spark hangs during a broadcast that occurs during the filter step (according
to the Spark UI). The last two lines in the log before it pauses are:

5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)

I've left Spark running for up to 17 minutes one time, and it never
continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
instances (244Gb, 32 cores) with spark in standalone mode with 220G executor
and driver memory, and using the kyroserializer.

Any ideas on what could be causing this hang?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



small accumulator gives out of memory error

2015-07-15 Thread AlexG
When I call the following minimal working example, the accumulator matrix is
32-by-100K, and each executor has 64G but I get an out of memory error:

Exception in thread main java.lang.OutOfMemoryError: Requested array size
exceeds VM limit

Here BDM is a Breeze DenseMatrix

object BDMAccumulatorParam extends AccumulatorParam[BDM[Double]] {
  def zero(initialValue: BDM[Double]): BDM[Double] = {
BDM.zeros[Double](initialValue.rows, initialValue.cols)
  }

  def addInPlace(m1: BDM[Double], m2: BDM[Double]) : BDM[Double] = {
m1 += m2
  }
}

def testfun(mat: IndexedRowMatrix, lhs: DenseMatrix) : DenseMatrix = {

   val accum =
mat.rows.context.accumulator(BDM.zeros[Double](lhs.numRows.toInt,
mat.numCols.toInt))(BDMAccumulatorParam)
   mat.rows.foreach(row = accum += BDM.ones[Double](lhs.numRows.toInt,
mat.numCols.toInt))
   fromBreeze(accum.value)
}

Any ideas or suggestions on how to avoid this error?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/small-accumulator-gives-out-of-memory-error-tp23864.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java heap error

2015-07-15 Thread AlexG
I'm trying to compute the Frobenius norm error in approximating an
IndexedRowMatrix A with the product L*R where L and R are Breeze
DenseMatrices. 

I've written the following function that computes the squared error over
each partition of rows then sums up to get the total squared error (ignore
the mean argument, it is not used). It works on a smaller dataset that I've
been using to test my code, but fails on the full-sized A with an error
about the java heap being out of size.

A here is an 8mil-by-100K matrix partitioned into 5015 parts, L is
8mil-by-16, and R is 16-by-100K. Together L and R take up way less memory
than I have on each executor (64 Gb), so I don't understand the cause of the
error.

def calcCenteredFrobNormErr(mat: IndexedRowMatrix, lhsTall: BDM[Double],
rhsFat: BDM[Double], mean: BDV[Double] ) : Double = {
val lhsFactor = mat.rows.context.broadcast(lhsTall)
val rhsFactor = mat.rows.context.broadcast(rhsFat)

def partitionDiffFrobNorm2(rowiter : Iterator[IndexedRow], lhsFactor:
Broadcast[BDM[Double]], rhsFactor: Broadcast[BDM[Double]]) :
Iterator[Double] = {

  val lhsTall = lhsFactor.value
  val rhsFat = rhsFactor.value

  val rowlist = rowiter.toList
  val numrows = rowlist.length
  val matSubMat = BDM.zeros[Double](numrows, mat.numCols.toInt)
  val lhsSubMat = BDM.zeros[Double](numrows, lhsTall.cols)

  var currowindex = 0
  rowlist.foreach(
(currow: IndexedRow) = {
  currow.vector.foreachActive { case (j, v) =
matSubMat(currowindex, j) = v }
  lhsSubMat(currowindex, ::) := lhsTall(currow.index.toInt, ::)
  currowindex += 1
}
  )

  val diffmat = matSubMat - lhsSubMat * rhsFat
  List(sum(diffmat :* diffmat)).iterator
}

report(Beginning to compute Frobenius norm, true)
val res = mat.rows.mapPartitions(rowiter =
partitionDiffFrobNorm2(rowiter, lhsFactor, rhsFactor)).reduce(_ + _)
report(Finished computing Frobenius norm, true)
math.sqrt(res)
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-heap-error-tp23856.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



out of memory error in treeAggregate

2015-07-15 Thread AlexG
I'm using the following function to compute B*A where B is a 32-by-8mil
Breeze DenseMatrix and A is a 8mil-by-100K IndexedRowMatrix. 

// computes BA where B is a local matrix and A is distributed: let b_i
denote the
// ith col of B and a_i denote the ith row of A, then BA = sum(b_i a_i)

def leftMultiplyCenteredMatrixBy(mat: IndexedRowMatrix, lhs: DenseMatrix,
avg: BDV[Double]) : DenseMatrix = {
   val lhsBrz = lhs.toBreeze.asInstanceOf[BDM[Double]]
   val result =
 mat.rows.treeAggregate(BDM.zeros[Double](lhs.numRows.toInt,
mat.numCols.toInt))(
   seqOp = (U: BDM[Double], row: IndexedRow) = {
 val rowBrz = row.vector.toBreeze.asInstanceOf[BSV[Double]] - avg
 U += lhsBrz(::, row.index.toInt) * rowBrz.t
   },
   combOp = (U1, U2) = U1 += U2
 )
   fromBreeze(result)
  }

The accumulator used by the treeAggregate call is only 32-by-100K, and B is
less than a Gb. The executors have 64Gb RAM, yet the call fails with the
error

Exception in thread main java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
  at java.util.Arrays.copyOf(Arrays.java:2271)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
  at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
  at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
  at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
  at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
  at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
  at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
  at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1072)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
  at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1067)
  at
org.apache.spark.mllib.linalg.distributed.SVDVariants$.leftMultiplyCenteredMatrixBy(SVDVariants.scala:120)

Any idea what's going on/how to fix it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/out-of-memory-error-in-treeAggregate-tp23859.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: breeze.linalg.DenseMatrix not found

2015-06-29 Thread AlexG
I get the same error even when I define covOperator not to use a matrix at
all:

def covOperator(v : BDV[Double]) :BDV[Double] = { v }




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/breeze-linalg-DenseMatrix-not-found-tp23537p23538.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



breeze.linalg.DenseMatrix not found

2015-06-29 Thread AlexG
I'm trying to compute the eigendecomposition of a matrix in a portion of my
code, using mllib.linalg.EigenValueDecomposition 
(https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
)
as follows:

  val tol = 1e-10
  val maxIter = 300
  var testmat = DenseMatrix.ones(50,50).toBreeze.asInstanceOf[BDM[Double]]
  testmat *= testmat.t

  def covOperator(v : BDV[Double]) :BDV[Double] = { testmat*v }
  val (lambda2, u2) = EigenValueDecomposition.symmetricEigs(covOperator, 50,
rank, tol, maxIter)

The code compiles, but fails when I spark-submit the jar, with a 
java.lang.NoClassDefFoundError: breeze/linalg/DenseMatrix 
error at the line where I call EigenValueDecomposition 

Any ideas what the issue might be? I use breeze.linalg.DenseMatrix as BDM
elsewhere in the code and had no runtime issues until I inserted the call to
EigenValueDecomposition.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/breeze-linalg-DenseMatrix-not-found-tp23537.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org