Re: NoClassDefFoundError with ZonedDateTime

2016-07-21 Thread Ilya Ganelin
what's the easiest way to get the Classpath for the spark application
itself?
On Thu, Jul 21, 2016 at 9:37 PM Ted Yu <yuzhih...@gmail.com> wrote:

> Might be classpath issue.
>
> Mind pastebin'ning the effective class path ?
>
> Stack trace of NoClassDefFoundError may also help provide some clue.
>
> On Thu, Jul 21, 2016 at 8:26 PM, Ilya Ganelin <ilgan...@gmail.com> wrote:
>
>> Hello - I'm trying to deploy the Spark TimeSeries library in a new
>> environment. I'm running Spark 1.6.1 submitted through YARN in a cluster
>> with Java 8 installed on all nodes but I'm getting the NoClassDef at
>> runtime when trying to create a new TimeSeriesRDD. Since ZonedDateTime is
>> part of Java 8 I feel like I shouldn't need to do anything else. The weird
>> thing is I get it on the data nodes, not the driver. Any thoughts on what's
>> causing this or how to track it down? Would appreciate the help.
>>
>> Thanks!
>>
>
>


NoClassDefFoundError with ZonedDateTime

2016-07-21 Thread Ilya Ganelin
Hello - I'm trying to deploy the Spark TimeSeries library in a new
environment. I'm running Spark 1.6.1 submitted through YARN in a cluster
with Java 8 installed on all nodes but I'm getting the NoClassDef at
runtime when trying to create a new TimeSeriesRDD. Since ZonedDateTime is
part of Java 8 I feel like I shouldn't need to do anything else. The weird
thing is I get it on the data nodes, not the driver. Any thoughts on what's
causing this or how to track it down? Would appreciate the help.

Thanks!


Re: Access to broadcasted variable

2016-02-20 Thread Ilya Ganelin
It gets serialized once per physical container, Instead of being serialized
once per task of every stage that uses it.
On Sat, Feb 20, 2016 at 8:15 AM jeff saremi  wrote:

> Is the broadcasted variable distributed to every executor or every worker?
> Now i'm more confused
> I thought it was supposed to save memory by distributing it to every
> worker and the executors would share that copy
>
>
> --
> Date: Fri, 19 Feb 2016 16:48:59 -0800
> Subject: Re: Access to broadcasted variable
> From: shixi...@databricks.com
> To: jeffsar...@hotmail.com
> CC: user@spark.apache.org
>
>
> The broadcasted object is serialized in driver and sent to the executors.
> And in the executor, it will deserialize the bytes to get the broadcasted
> object.
>
> On Fri, Feb 19, 2016 at 5:54 AM, jeff saremi 
> wrote:
>
> could someone please comment on this? thanks
>
> --
> From: jeffsar...@hotmail.com
> To: user@spark.apache.org
> Subject: Access to broadcasted variable
> Date: Thu, 18 Feb 2016 14:44:07 -0500
>
>
>
> I'd like to know if the broadcasted object gets serialized when accessed
> by the executor during the execution of a task?
> I know that it gets serialized from the driver to the worker. This
> question is inside worker when executor JVM's are accessing it
>
> thanks
> Jeff
>
>
>


Re: How to efficiently Scan (not filter nor lookup) part of Paird RDD or Ordered RDD

2016-01-24 Thread Ilya Ganelin
The solution I normally use is to zipWithIndex() and then use the filter
operation. Filter is an O(m) operation where m is the size of your
partition, not an O(N) operation.

-Ilya Ganelin

On Sat, Jan 23, 2016 at 5:48 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Problem is I have RDD of about 10M rows and it keeps growing. Everytime
> when we want to perform query and compute on subset of data we have to use
> filter and then some aggregation. Here I know filter goes through each
> partitions and every rows of RDD which may not be efficient at all.
>
> Spark having Ordered RDD functions I dont see why it's so difficult to
> implement such function. Cassandra/Hbase has it for years where they can
> fetch data only from certain partitions based on your rowkey. Scala TreeMap
> has Range function to do the same.
>
> I think people have been looking for this for while. I see several post
> asking this.
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-td20170.html#a26048
>
> By the way, I assume there
> Thanks
> Nirav
>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>


Spark LDA

2016-01-22 Thread Ilya Ganelin
Hi all - I'm running the Spark LDA algorithm on a dataset of roughly 3
million terms with a resulting RDD of approximately 20 GB on a 5 node
cluster with 10 executors (3 cores each) and 14gb of memory per executor.

As the application runs, I'm seeing progressively longer execution times
for the mapPartitions stage (18s - 56s - 3.4min) being caused by
progressively longer shuffle read times. Is there any way to speed up to
tune this out? My configs are below.

screen spark-shell --driver-memory 15g --num-executors 10 --executor-cores
3
--conf "spark.executor.memory=14g"
--conf "spark.io.compression.codec=lz4"
--conf "spark.shuffle.consolidateFiles=true"
--conf "spark.dynamicAllocation.enabled=false"
--conf "spark.shuffle.manager=tungsten-sort"
--conf "spark.akka.frameSize=1028"
--conf "spark.executor.extraJavaOptions=-Xss256k -XX:MaxPermSize=128m
-XX:PermSize=96m -XX:MaxTenuringThreshold=2 -XX:SurvivorRatio=6
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
-XX:+AggressiveOpts -XX:+UseCompressedOops" --master yarn-client

-Ilya Ganelin


Re: Why does a 3.8 T dataset take up 11.59 Tb on HDFS

2015-11-25 Thread Ilya Ganelin
Turning off replication sacrifices durability of your data, so if a node
goes down the data is lost - in case that's not obvious.
On Wed, Nov 25, 2015 at 8:43 AM Alex Gittens  wrote:

> Thanks, the issue was indeed the dfs replication factor. To fix it without
> entirely clearing out HDFS and rebooting, I first ran
> hdfs dfs -setrep -R -w 1 /
> to reduce all the current files' replication factor to 1 recursively from
> the root, then I changed the dfs.replication factor in
> ephemeral-hdfs/conf/hdfs-site.xml and ran ephemeral-hdfs/sbin/stop-all.sh
> and start-all.sh
>
> Alex
>
> On Tue, Nov 24, 2015 at 10:43 PM, Ye Xianjin  wrote:
>
>> Hi AlexG:
>>
>> Files(blocks more specifically) has 3 copies on HDFS by default. So 3.8 *
>> 3 = 11.4TB.
>>
>> --
>> Ye Xianjin
>> Sent with Sparrow 
>>
>> On Wednesday, November 25, 2015 at 2:31 PM, AlexG wrote:
>>
>> I downloaded a 3.8 T dataset from S3 to a freshly launched spark-ec2
>> cluster
>> with 16.73 Tb storage, using
>> distcp. The dataset is a collection of tar files of about 1.7 Tb each.
>> Nothing else was stored in the HDFS, but after completing the download,
>> the
>> namenode page says that 11.59 Tb are in use. When I use hdfs du -h -s, I
>> see
>> that the dataset only takes up 3.8 Tb as expected. I navigated through the
>> entire HDFS hierarchy from /, and don't see where the missing space is.
>> Any
>> ideas what is going on and how to rectify it?
>>
>> I'm using the spark-ec2 script to launch, with the command
>>
>> spark-ec2 -k key -i ~/.ssh/key.pem -s 29 --instance-type=r3.8xlarge
>> --placement-group=pcavariants --copy-aws-credentials
>> --hadoop-major-version=yarn --spot-price=2.8 --region=us-west-2 launch
>> conversioncluster
>>
>> and am not modifying any configuration files for Hadoop.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-does-a-3-8-T-dataset-take-up-11-59-Tb-on-HDFS-tp25471.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>


Re: Spark Job is getting killed after certain hours

2015-11-16 Thread Ilya Ganelin
Your Kerberos cert is likely expiring. Check your expiration settings.

-Ilya Ganelin

On Mon, Nov 16, 2015 at 9:20 PM, Vipul Rai <vipulrai8...@gmail.com> wrote:

> Hi Nikhil,
> It seems you have Kerberos enabled cluster and it is unable to
> authenticate using the ticket.
> Please check the Kerberos settings, it could also be because of Kerberos
> version mismatch on nodes.
>
> Thanks,
> Vipul
>
> On Tue 17 Nov, 2015 07:31 Nikhil Gs <gsnikhil1432...@gmail.com> wrote:
>
>> Hello Team,
>>
>> Below is the error which we are facing in our cluster after 14 hours of
>> starting the spark submit job. Not able to understand the issue and why its
>> facing the below error after certain time.
>>
>> If any of you have faced the same scenario or if you have any idea then
>> please guide us. To identify the issue, if you need any other info then
>> please revert me back with the requirement.Thanks a lot in advance.
>>
>> *Log Error:  *
>>
>> 15/11/16 04:54:48 ERROR ipc.AbstractRpcClient: SASL authentication
>> failed. The most likely cause is missing or invalid credentials. Consider
>> 'kinit'.
>>
>> javax.security.sasl.SaslException: *GSS initiate failed [Caused by
>> GSSException: No valid credentials provided (Mechanism level: Failed to
>> find any Kerberos tgt)]*
>>
>> at
>> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>>
>> at
>> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)
>>
>> at java.security.AccessController.doPrivileged(Native
>> Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1174)
>>
>> at
>> org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
>>
>> at
>> org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
>>
>> at
>> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
>>
>> at
>> org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
>>
>> at
>> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
>>
>> at
>> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
>>
>> at
>> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:369)
>>
>> at
>> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320)
>>
>> at
>> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206)
>>
>> at
>> org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:183)
>>
>> at
>> org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1482)
>>
>> at
>> org.apache.hadoop.hbase.client.HTable.put(HTable.java:1095)
>>
>> at
>> com.suxk.bigdata.pulse.consumer.ModempollHbaseLoadHelper$1.run(ModempollHbaseLoadHelper.java:89)
>>
>> at java.security.AccessController.

Re: Matrix Multiplication and mllib.recommendation

2015-06-28 Thread Ilya Ganelin
Ayman - it's really a question of recommending user to products vs products
to users. There will only be a difference if you're not doing All to All.
For example, if you're recommending only the Top N recommendations. Then
you may recommend only the top N products or the top N users which would be
different.
On Sun, Jun 28, 2015 at 8:34 AM Ayman Farahat ayman.fara...@yahoo.com
wrote:

 Thanks Ilya
 Is there an advantage of say partitioning by users /products when you
 train ?
 Here are two alternatives I have

 #Partition by user or Product
 tot = newrdd.map(lambda l:
 (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
 ratings = tot.values()
 model = ALS.train(ratings, rank, numIterations)

 #use zipwithIndex

 tot = newrdd.map(lambda l: (l[1],Rating(int(l[1]),int(l[2]),l[4])))
 bob = tot.zipWithIndex().map(lambda x : (x[1] ,x[0])).partitionBy(30)
 ratings = bob.values()
 model = ALS.train(ratings, rank, numIterations)


 On Jun 28, 2015, at 8:24 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 You can also select pieces of your RDD by first doing a zipWithIndex and
 then doing a filter operation on the second element of the RDD.

 For example to select the first 100 elements :

 Val a = rdd.zipWithIndex().filter(s = 1  s  100)
 On Sat, Jun 27, 2015 at 11:04 AM Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 How do you partition by product in Python?
 the only API is partitionBy(50)

 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also in my experiments, it's much faster to blocked BLAS through
 cartesian rather than doing sc.union. Here are the details on the
 experiments:

 https://issues.apache.org/jira/browse/SPARK-4823

 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also not sure how threading helps here because Spark puts a partition to
 each core. On each core may be there are multiple threads if you are using
 intel hyperthreading but I will let Spark handle the threading.

 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
 dgemm based calculation.

 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine.
 In my case, there can be even more products. You simply broadcast blocks 
 of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of 
 product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a
 recommendProducts method that blockifies the factors to make use of 
 level 3
 BLAS (ie matrix-matrix multiply). I am not sure if this is available in 
 The
 Python api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper 
 caching
 and use of broadcast variables this will take a while defending on the 
 size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in 
 parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have
 about 30
 ,000 products and 90

Re: Matrix Multiplication and mllib.recommendation

2015-06-28 Thread Ilya Ganelin
Oops - code should be :

Val a = rdd.zipWithIndex().filter(s = 1  s._2  100)

On Sun, Jun 28, 2015 at 8:24 AM Ilya Ganelin ilgan...@gmail.com wrote:

 You can also select pieces of your RDD by first doing a zipWithIndex and
 then doing a filter operation on the second element of the RDD.

 For example to select the first 100 elements :

 Val a = rdd.zipWithIndex().filter(s = 1  s  100)
 On Sat, Jun 27, 2015 at 11:04 AM Ayman Farahat
 ayman.fara...@yahoo.com.invalid wrote:

 How do you partition by product in Python?
 the only API is partitionBy(50)

 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also in my experiments, it's much faster to blocked BLAS through
 cartesian rather than doing sc.union. Here are the details on the
 experiments:

 https://issues.apache.org/jira/browse/SPARK-4823

 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also not sure how threading helps here because Spark puts a partition to
 each core. On each core may be there are multiple threads if you are using
 intel hyperthreading but I will let Spark handle the threading.

 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
 dgemm based calculation.

 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine.
 In my case, there can be even more products. You simply broadcast blocks 
 of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of 
 product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a
 recommendProducts method that blockifies the factors to make use of 
 level 3
 BLAS (ie matrix-matrix multiply). I am not sure if this is available in 
 The
 Python api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper 
 caching
 and use of broadcast variables this will take a while defending on the 
 size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in 
 parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have
 about 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix
 multiplication where
 I first get the product features, broadcast them and then do a dot
 product.
 Its still very slow. Any reason why
 here is a sample code

 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a


 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com

Re: Matrix Multiplication and mllib.recommendation

2015-06-28 Thread Ilya Ganelin
You can also select pieces of your RDD by first doing a zipWithIndex and
then doing a filter operation on the second element of the RDD.

For example to select the first 100 elements :

Val a = rdd.zipWithIndex().filter(s = 1  s  100)
On Sat, Jun 27, 2015 at 11:04 AM Ayman Farahat
ayman.fara...@yahoo.com.invalid wrote:

 How do you partition by product in Python?
 the only API is partitionBy(50)

 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also in my experiments, it's much faster to blocked BLAS through cartesian
 rather than doing sc.union. Here are the details on the experiments:

 https://issues.apache.org/jira/browse/SPARK-4823

 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also not sure how threading helps here because Spark puts a partition to
 each core. On each core may be there are multiple threads if you are using
 intel hyperthreading but I will let Spark handle the threading.

 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
 dgemm based calculation.

 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine.
 In my case, there can be even more products. You simply broadcast blocks of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a recommendProducts
 method that blockifies the factors to make use of level 3 BLAS (ie
 matrix-matrix multiply). I am not sure if this is available in The Python
 api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper 
 caching
 and use of broadcast variables this will take a while defending on the 
 size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about
 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix
 multiplication where
 I first get the product features, broadcast them and then do a dot
 product.
 Its still very slow. Any reason why
 here is a sample code

 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a


 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com http://nabble.com

Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-16 Thread Ilya Ganelin
All - this issue showed up when I was tearing down a spark context and
creating a new one. Often, I was unable to then write to HDFS due to this
error. I subsequently switched to a different implementation where instead
of tearing down and re initializing the spark context I'd instead submit a
separate request to YARN.
On Fri, May 15, 2015 at 2:35 PM Puneet Kapoor puneet.cse.i...@gmail.com
wrote:

 I am seeing this on hadoop 2.4.0 version.

 Thanks for your suggestions, i will try those and let you know if they
 help !

 On Sat, May 16, 2015 at 1:57 AM, Steve Loughran ste...@hortonworks.com
 wrote:

  What version of Hadoop are you seeing this on?


  On 15 May 2015, at 20:03, Puneet Kapoor puneet.cse.i...@gmail.com
 wrote:

  Hey,

  Did you find any solution for this issue, we are seeing similar logs in
 our Data node logs. Appreciate any help.





  2015-05-15 10:51:43,615 ERROR
 org.apache.hadoop.hdfs.server.datanode.DataNode:
 NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
  src: /192.168.112.190:46253 dst: /192.168.151.104:50010
 java.net.SocketTimeoutException: 6 millis timeout while waiting for
 channel to be ready for read. ch :
 java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
 remote=/192.168.112.190:46253]
 at
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
 at java.io.BufferedInputStream.fill(Unknown Source)
 at java.io.BufferedInputStream.read1(Unknown Source)
 at java.io.BufferedInputStream.read(Unknown Source)
 at java.io.DataInputStream.read(Unknown Source)
 at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
 at java.lang.Thread.run(Unknown Source)


  That's being logged @ error level in DN. It doesn't mean the DN has
 crashed, only that it timed out waiting for data: something has gone wrong
 elsewhere.

  https://issues.apache.org/jira/browse/HDFS-693


 there's a couple of properties you can do to extend timeouts

   property

 namedfs.socket.timeout/name

 value2/value

 /property


 property

 namedfs.datanode.socket.write.timeout/name

 value2/value

 /property



 You can also increase the number of data node tranceiver threads to
 handle data IO across the network


 property
 namedfs.datanode.max.xcievers/name
 value4096/value
 /property

 Yes, that property has that explicit spellinng, it's easy to get wrong





Re: Broadcast variables can be rebroadcast?

2015-05-15 Thread Ilya Ganelin
The broadcast variable is like a pointer. If the underlying data changes
then the changes will be visible throughout the cluster.
On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(), can it
 ever be updated again? The use case is for something like the underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Broadcast variables can be rebroadcast?

2015-05-15 Thread Ilya Ganelin
Nope. It will just work when you all x.value.
On Fri, May 15, 2015 at 5:39 PM N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the underlying
 data is updated in order to get the changes visible on all nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote:

 The broadcast variable is like a pointer. If the underlying data changes
 then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(), can
 it
 ever be updated again? The use case is for something like the underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Why Spark is much faster than Hadoop MapReduce even on disk

2015-04-27 Thread Ilya Ganelin
I believe the typical answer is that Spark is actually a bit slower.
On Mon, Apr 27, 2015 at 7:34 PM bit1...@163.com bit1...@163.com wrote:

 Hi,

 I am frequently asked why spark is also much faster than Hadoop MapReduce
 on disk (without the use of memory cache). I have no convencing answer for
 this question, could you guys elaborate on this? Thanks!

 --




Re: loads of memory still GC overhead limit exceeded

2015-02-20 Thread Ilya Ganelin
No problem, Antony. ML lib is tricky! I'd love to chat with you about your
use case - sounds like we're working on similar problems/scales.
On Fri, Feb 20, 2015 at 1:55 PM Xiangrui Meng men...@gmail.com wrote:

 Hi Antony, Is it easy for you to try Spark 1.3.0 or master? The ALS
 performance should be improved in 1.3.0. -Xiangrui

 On Fri, Feb 20, 2015 at 1:32 PM, Antony Mayi
 antonym...@yahoo.com.invalid wrote:
  Hi Ilya,
 
  thanks for your insight, this was the right clue. I had default
 parallelism
  already set but it was quite low (hundreds) and moreover the number of
  partitions of the input RDD was low as well so the chunks were really too
  big. Increased parallelism and repartitioning seems to be helping...
 
  Thanks!
  Antony.
 
 
  On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com
  wrote:
 
 
 
  Hi Anthony - you are seeing a problem that I ran into. The underlying
 issue
  is your default parallelism setting. What's happening is that within ALS
  certain RDD operations end up changing the number of partitions you have
 of
  your data. For example if you start with an RDD of 300 partitions, unless
  default parallelism is set while the algorithm executes you'll eventually
  get an RDD with something like 20 partitions. Consequently, your giant
 data
  set is now stored across a much smaller number of partitions so each
  partition is huge. Then, when a shuffle requires serialization you run
 out
  of heap space trying to serialize it. The solution should be as simple as
  setting the default parallelism setting.
 
  This is referenced in a JIRA I can't find at the moment.
  On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid
 
  wrote:
 
  now with reverted spark.shuffle.io.preferDirectBufs (to true) getting
 again
  GC overhead limit exceeded:
 
  === spark stdout ===
  15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage
 18.0
  (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit
  exceeded
  at
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
  at
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
  at
  java.io.ObjectInputStream.readOrdinaryObject(
 ObjectInputStream.java:1801)
  at
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.
 java:371)
  at
  org.apache.spark.serializer.JavaDeserializationStream.
 readObject(JavaSerializer.scala:62)
 
  === yarn log (same) ===
  15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage
  18.0 (TID 5329)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
  at
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
  at
  java.io.ObjectInputStream.readOrdinaryObject(
 ObjectInputStream.java:1801)
  at
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.
 java:371)
  at
  org.apache.spark.serializer.JavaDeserializationStream.
 readObject(JavaSerializer.scala:62)
 
  === yarn nodemanager ===
  2015-02-19 12:08:13,758 INFO
  org.apache.hadoop.yarn.server.nodemanager.containermanager.
 monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 19014 for container-id
  container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory
  used; 31.7 GB of 67.2 GB virtual memory used
  2015-02-19 12:08:13,778 INFO
  org.apache.hadoop.yarn.server.nodemanager.containermanager.
 monitor.ContainersMonitorImpl:
  Memory usage of ProcessTree 19013 for container-id
  container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory
  used; 103.6 MB of 67.2 GB virtual memory used
  2015-02-19 12:08:14,455 WARN
  org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
  code from container container_1424204221358_0013_01_08 is : 143
  2015-02-19 12:08:14,455 INFO
  org.apache.hadoop.yarn.server.nodemanager.containermanager.
 container.Container:
  Container container_1424204221358_0013_01_08 transitioned from
 RUNNING
  to EXITED_WITH_FAILURE
  2015-02-19 12:08:14,455 INFO
  org.apache.hadoop.yarn.server.nodemanager.containermanager.
 launcher.ContainerLaunch:
  Cleaning up container container_1424204221358_0013_01_08
 
  Antony.
 
 
 
 
  On Thursday, 19 February 2015, 11:54, Antony Mayi
  antonym...@yahoo.com.INVALID wrote:
 
 
 
  it is from within the ALS.trainImplicit() call. btw. the exception varies
  between this GC overhead limit exceeded and Java heap space (which I
  guess is just different outcome of same problem).
 
  just tried another run and here are the logs (filtered) - note I tried
 this
  run with spark.shuffle.io.preferDirectBufs=false so this might be
 slightly
  different issue from my previous case (going

Re: storing MatrixFactorizationModel (pyspark)

2015-02-19 Thread Ilya Ganelin
Yep. the matrix model had two RDD vectors representing the decomposed
matrix. You can save these to disk and re use them.
On Thu, Feb 19, 2015 at 2:19 AM Antony Mayi antonym...@yahoo.com.invalid
wrote:

 Hi,

 when getting the model out of ALS.train it would be beneficial to store it
 (to disk) so the model can be reused later for any following predictions. I
 am using pyspark and I had no luck pickling it either using standard pickle
 module or even dill.

 does anyone have a solution for this (note it is pyspark)?

 thank you,
 Antony.



Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Ilya Ganelin
Hi Anthony - you are seeing a problem that I ran into. The underlying issue
is your default parallelism setting. What's happening is that within ALS
certain RDD operations end up changing the number of partitions you have of
your data. For example if you start with an RDD of 300 partitions, unless
default parallelism is set while the algorithm executes you'll eventually
get an RDD with something like 20 partitions. Consequently, your giant data
set is now stored across a much smaller number of partitions so each
partition is huge. Then, when a shuffle requires serialization you run out
of heap space trying to serialize it. The solution should be as simple as
setting the default parallelism setting.

This is referenced in a JIRA I can't find at the moment.
On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid
wrote:

 now with reverted spark.shuffle.io.preferDirectBufs (to true) getting
 again GC overhead limit exceeded:

 === spark stdout ===
 15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage
 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 === yarn log (same) ===
 15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage
 18.0 (TID 5329)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 === yarn nodemanager ===
 2015-02-19 12:08:13,758 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19014 for container-id
 container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory
 used; 31.7 GB of 67.2 GB virtual memory used
 2015-02-19 12:08:13,778 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19013 for container-id
 container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory
 used; 103.6 MB of 67.2 GB virtual memory used
 2015-02-19 12:08:14,455 WARN
 org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
 code from container container_1424204221358_0013_01_08 is : 143
 2015-02-19 12:08:14,455 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
 Container container_1424204221358_0013_01_08 transitioned from RUNNING
 to EXITED_WITH_FAILURE
 2015-02-19 12:08:14,455 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0013_01_08

 Antony.




   On Thursday, 19 February 2015, 11:54, Antony Mayi
 antonym...@yahoo.com.INVALID wrote:



 it is from within the ALS.trainImplicit() call. btw. the exception varies
 between this GC overhead limit exceeded and Java heap space (which I
 guess is just different outcome of same problem).

 just tried another run and here are the logs (filtered) - note I tried
 this run with spark.shuffle.io.preferDirectBufs=false so this might be
 slightly different issue from my previous case (going to revert now):

 === spark stdout ===
 15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart
 beats: 50221ms exceeds 45000ms
 15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart
 beats: 54749ms exceeds 45000ms
 15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor
 6 on 192.168.1.92: remote Akka client disassociated
 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage
 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage
 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage
 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
 15/02/19 10:16:44 WARN 

Re: Why is RDD lookup slow?

2015-02-19 Thread Ilya Ganelin
Hi Shahab - if your data structures are small enough a broadcasted Map is
going to provide faster lookup. Lookup within an RDD is an O(m) operation
where m is the size of the partition. For RDDs with multiple partitions,
executors can operate on it in parallel so you get some improvement for
larger RDDs.
On Thu, Feb 19, 2015 at 7:31 AM shahab shahab.mok...@gmail.com wrote:

 Hi,

 I am doing lookup on cached RDDs [(Int,String)], and I noticed that the
 lookup is relatively slow 30-100 ms ?? I even tried this on one machine
 with single partition, but no difference!

 The RDDs are not large at all, 3-30 MB.

 Is this expected behaviour? should I use other data structures, like
 HashMap to keep data and look up it there and use Broadcast to send a copy
 to all machines?

 best,
 /Shahab





Re: RDD Partition number

2015-02-19 Thread Ilya Ganelin
By default you will have (fileSize in Mb / 64) partitions. You can also set
the number of partitions when you read in a file with sc.textFile as an
optional second parameter.
On Thu, Feb 19, 2015 at 8:07 AM Alessandro Lulli lu...@di.unipi.it wrote:

 Hi All,

 Could you please help me understanding how Spark defines the number of
 partitions of the RDDs if not specified?

 I found the following in the documentation for file loaded from HDFS:
 *The textFile method also takes an optional second argument for
 controlling the number of partitions of the file. By default, Spark creates
 one partition for each block of the file (blocks being 64MB by default in
 HDFS), but you can also ask for a higher number of partitions by passing a
 larger value. Note that you cannot have fewer partitions than blocks*

 What is the rule for file loaded from the file systems?
 For instance, i have a file X replicated on 4 machines. If i load the file
 X in a RDD how many partitions are defined and why?

 Thanks for your help on this
 Alessandro



Re: Spark job fails on cluster but works fine on a single machine

2015-02-19 Thread Ilya Ganelin
The stupid question is whether you're deleting the file from hdfs on the
right node?
On Thu, Feb 19, 2015 at 11:31 AM Pavel Velikhov pavel.velik...@gmail.com
wrote:

 Yeah, I do manually delete the files, but it still fails with this error.

 On Feb 19, 2015, at 8:16 PM, Ganelin, Ilya ilya.gane...@capitalone.com
 wrote:

  When writing to hdfs Spark will not overwrite existing files or
 directories. You must either manually delete these or use Java's Hadoop
 FileSystem class to remove them.



 Sent with Good (www.good.com)


 -Original Message-
 *From: *Pavel Velikhov [pavel.velik...@gmail.com]
 *Sent: *Thursday, February 19, 2015 11:32 AM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Spark job fails on cluster but works fine on a single machine

 I have a simple Spark job that goes out to Cassandra, runs a pipe and
 stores results:

  val sc = new SparkContext(conf)
 val rdd = sc.cassandraTable(“keyspace, “table)
   .map(r = r.getInt(“column) + \t +
 write(get_lemmas(r.getString(tags
   .pipe(python3 /tmp/scripts_and_models/scripts/run.py)
   .map(r = convertStr(r) )
   .coalesce(1,true)
   .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt)
   //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”))

 When run on a single machine, everything is fine if I save to an hdfs file
 or save to Cassandra.
 When run in cluster neither works:

  - When saving to file, I get an exception: User class threw exception:
 Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt
 already exists
  - When saving to Cassandra, only 4 rows are updated with empty data (I
 test on a 4-machine Spark cluster)

 Any hints on how to debug this and where the problem could be?

 - I delete the hdfs file before running
 - Would really like the output to hdfs to work, so I can debug
 - Then it would be nice to save to Cassandra

 --
 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed.  If the reader of this message is not the
 intended recipient, you are hereby notified that any review,
 retransmission, dissemination, distribution, copying or other use of, or
 taking of any action in reliance upon this information is strictly
 prohibited. If you have received this communication in error, please
 contact the sender and delete the material from your computer.





Re: spark ignoring all memory settings and defaulting to 512MB?

2014-12-31 Thread Ilya Ganelin
Welcome to Spark. What's more fun is that setting controls memory on the
executors but if you want to set memory limit on the driver you need to
configure it as a parameter of the spark-submit script. You also set
num-executors and executor-cores on the spark submit call.

See both the Spark tuning guide and the Spark configuration page for more
discussion of stuff like this.

W.r.t. The spark memory option, my understanding is that parameter has been
deprecated (the SPARK_EXE_MEM) and the documentation is probably stale.
Good starting point for cleanup would probably be to update that :-).
On Thu, Jan 1, 2015 at 1:45 AM Kevin Burton bur...@spinn3r.com wrote:

 wow. Just figured it out:

 conf.set( spark.executor.memory, 2g);

 I have to set it in the Job… that’s really counter intuitive.  Especially
 because the documentation in spark-env.sh says the exact opposite.

 What’s the resolution here.  This seems like a mess. I’d propose a
 solution to clean it up but I don’t know where to begin.

 On Wed, Dec 31, 2014 at 10:35 PM, Kevin Burton bur...@spinn3r.com wrote:

 This is really weird and I’m surprised no one has found this issue yet.

 I’ve spent about an hour or more trying to debug this :-(

 My spark install is ignoring ALL my memory settings.  And of course my
 job is running out of memory.

 The default is 512MB so pretty darn small.

 The worker and master start up and both use 512M

 This alone is very weird and poor documentation IMO because:

  SPARK_WORKER_MEMORY, to set how much total memory workers have to give
 executors (e.g. 1000m, 2g)”

 … so if it’s giving it to executors, AKA the memory executors run with,
 then it should be SPARK_EXECUTOR_MEMORY…

 … and the worker actually uses SPARK_DAEMON memory.

 but actually I’m right.  It IS SPARK_EXECUTOR_MEMORY… according to
 bin/spark-class

 … but, that’s not actually being used :-(

 that setting is just flat out begin ignored and it’s just using 512MB.
 So all my jobs fail.

 … and I write an ‘echo’ so I could trace the spark-class script to see
 what the daemons are actually being run with and spark-class wasn’t being
 called with and nothing is logged for the coarse grained executor.  I guess
 it’s just inheriting the JVM opts from it’s parent and Java is launching
 the process directly?

 This is a nightmare :(

 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




Re: Long-running job cleanup

2014-12-28 Thread Ilya Ganelin
Hi Patrick - is that cleanup present in 1.1?

The overhead I am talking about is with regards to what I believe is
shuffle related metadata. If I watch the execution log I see small
broadcast variables created for every stage of execution, a few KB at a
time, and a certain number of MB remaining of available memory on the
driver. As I run, this available memory goes down, and these variables are
never erased. The only RDDs that persist are those that are explicitly
cached. The RDDs that are generated iteratively are not retained or
referenced, so I would expect things to get cleaned up but they do not. The
items consuming memory are not RDDs but what appears to be shuffle
metadata.

I have a script that parses the logs to show memory consumption over time
and the script shows memory very steadily being consumed over many hours
without clearing one small bit at a time.

The specific computation I am doing is the generation of dot products
between two RDDs of vectors. I need to generate this product for every
combination of products between the two RDDs but both RDDs are too big to
fit in memory. Consequently, I iteratively generate this product across one
entry from the first RDD and all entries from the second and retain the
pared-down result within an accumulator (by retaining the top N results it
is possible to actually store the Cartesian which is otherwise too large to
fit on disk). After a certain number of iterations these intermediate
results are then written to disk. Each of these steps is tractable in
itself but due to the accumulation of memory, the overall job becomes
intractable.

I would appreciate any suggestions as to how to clean up these intermediate
broadcast variables. Thank you.


On Sun, Dec 28, 2014 at 1:56 PM Patrick Wendell pwend...@gmail.com wrote:

 What do you mean when you say the overhead of spark shuffles start to
 accumulate? Could you elaborate more?

 In newer versions of Spark shuffle data is cleaned up automatically
 when an RDD goes out of scope. It is safe to remove shuffle data at
 this point because the RDD can no longer be referenced. If you are
 seeing a large build up of shuffle data, it's possible you are
 retaining references to older RDDs inadvertently. Could you explain
 what your job actually doing?

 - Patrick

 On Mon, Dec 22, 2014 at 2:36 PM, Ganelin, Ilya
 ilya.gane...@capitalone.com wrote:
  Hi all, I have a long running job iterating over a huge dataset. Parts of
  this operation are cached. Since the job runs for so long, eventually the
  overhead of spark shuffles starts to accumulate culminating in the driver
  starting to swap.
 
  I am aware of the spark.cleanup.tll parameter that allows me to configure
  when cleanup happens but the issue with doing this is that it isn't done
  safely, e.g. I can be in the middle of processing a stage when this
 cleanup
  happens and my cached RDDs get cleared. This ultimately causes a
  KeyNotFoundException when I try to reference the now cleared cached RDD.
  This behavior doesn't make much sense to me, I would expect the cached
 RDD
  to either get regenerated or at the very least for there to be an option
 to
  execute this cleanup without deleting those RDDs.
 
  Is there a programmatically safe way of doing this cleanup that doesn't
  break everything?
 
  If I instead tear down the spark context and bring up a new context for
  every iteration (assuming that each iteration is sufficiently
 long-lived),
  would memory get released appropriately?
 
  
 
  The information contained in this e-mail is confidential and/or
 proprietary
  to Capital One and/or its affiliates. The information transmitted
 herewith
  is intended only for use by the individual or entity to which it is
  addressed.  If the reader of this message is not the intended recipient,
 you
  are hereby notified that any review, retransmission, dissemination,
  distribution, copying or other use of, or taking of any action in
 reliance
  upon this information is strictly prohibited. If you have received this
  communication in error, please contact the sender and delete the material
  from your computer.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Problem with StreamingContext - getting SPARK-2243

2014-12-27 Thread Ilya Ganelin
Are you trying to do this in the shell? Shell is instantiated with a spark
context named sc.

-Ilya Ganelin

On Sat, Dec 27, 2014 at 5:24 PM, tfrisk tfris...@gmail.com wrote:


 Hi,

 Doing:
val ssc = new StreamingContext(conf, Seconds(1))

 and getting:
Only one SparkContext may be running in this JVM (see SPARK-2243). To
 ignore this error, set spark.driver.allowMultipleContexts = true.


 But I dont think that I have another SparkContext running. Is there any way
 I can check this or force kill ?  I've tried restarting the server as I'm
 desperate but still I get the same issue.  I was not getting this earlier
 today.

 Any help much appreciated .

 Thanks,

 Thomas




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-StreamingContext-getting-SPARK-2243-tp20869.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Long-running job cleanup

2014-12-25 Thread Ilya Ganelin
Hello all - can anyone please offer any advice on this issue?

-Ilya Ganelin

On Mon, Dec 22, 2014 at 5:36 PM, Ganelin, Ilya ilya.gane...@capitalone.com
wrote:

 Hi all, I have a long running job iterating over a huge dataset. Parts of
 this operation are cached. Since the job runs for so long, eventually the
 overhead of spark shuffles starts to accumulate culminating in the driver
 starting to swap.

 I am aware of the spark.cleanup.tll parameter that allows me to configure
 when cleanup happens but the issue with doing this is that it isn’t done
 safely, e.g. I can be in the middle of processing a stage when this cleanup
 happens and my cached RDDs get cleared. This ultimately causes a
 KeyNotFoundException when I try to reference the now cleared cached RDD.
 This behavior doesn’t make much sense to me, I would expect the cached RDD
 to either get regenerated or at the very least for there to be an option to
 execute this cleanup without deleting those RDDs.

 Is there a programmatically safe way of doing this cleanup that doesn’t
 break everything?

 If I instead tear down the spark context and bring up a new context for
 every iteration (assuming that each iteration is sufficiently long-lived),
 would memory get released appropriately?

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed.  If the reader of this message is not the
 intended recipient, you are hereby notified that any review,
 retransmission, dissemination, distribution, copying or other use of, or
 taking of any action in reliance upon this information is strictly
 prohibited. If you have received this communication in error, please
 contact the sender and delete the material from your computer.



Re: what is the best way to implement mini batches?

2014-12-11 Thread Ilya Ganelin
Hi all. I've been working on a similar problem. One solution that is
straightforward (if suboptimal) is to do the following.

A.zipWithIndex().filter(_._2 =range_start  _._2  range_end). Lastly
just put that in a for loop. I've found that this approach scales very
well.

As Matei said another option is to define a custom partitioner and then use
mapPartitions. Hope that helps!


On Thu, Dec 11, 2014 at 6:16 PM Imran Rashid im...@therashids.com wrote:

 Minor correction:  I think you want iterator.grouped(10) for
 non-overlapping mini batches
 On Dec 11, 2014 1:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

 You can just do mapPartitions on the whole RDD, and then called sliding()
 on the iterator in each one to get a sliding window. One problem is that
 you will not be able to slide forward into the next partition at
 partition boundaries. If this matters to you, you need to do something more
 complicated to get those, such as the repartition that you said (where you
 map each record to the partition it should be in).

 Matei

  On Dec 11, 2014, at 10:16 AM, ll duy.huynh@gmail.com wrote:
 
  any advice/comment on this would be much appreciated.
 
 
 
  --
  View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini
 -batches-tp20264p20635.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: GC Issues with randomSplit on large dataset

2014-10-30 Thread Ilya Ganelin
The split is something like 30 million into 2 milion partitions. The reason
that it becomes tractable is that after I perform the Cartesian on the
split data and operate on it I don't keep the full results - I actually
only keep a tiny fraction of that generated dataset - making the overall
dataset tractable ( I neglected to mention this in the first email).

The way the code is structured I have forced linear execution until this
point so at the time of execution of the split it is the only thing
happening. In terms of memory I have assigned 23gb of memory and 17gb of
heap.
On Oct 30, 2014 3:32 AM, Sean Owen so...@cloudera.com wrote:

 Can you be more specific about numbers?
 I am not sure that splitting helps so much in the end, in that it has
 the same effect as executing a smaller number at a time of the large
 number of tasks that the full cartesian join would generate.
 The full join is probably intractable no matter what in this case?
 The OOM is not necessarily directly related. It depends on where it
 happened, what else you are doing, how much memory you gave, etc.

 On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya
 ilya.gane...@capitalone.com wrote:
  Hey all – not writing to necessarily get a fix but more to get an
  understanding of what’s going on internally here.
 
  I wish to take a cross-product of two very large RDDs (using cartesian),
 the
  product of which is well in excess of what can be stored on disk .
 Clearly
  that is intractable, thus my solution is to do things in batches -
  essentially I can take the cross product of a small piece of the first
 data
  set with the entirety of the other. To do this, I calculate how many
 items
  can fit into 1 gig of memory. Next, I use RDD.random Split() to partition
  the first data set. The issue is that I am trying to partition an RDD of
  several million items into several million partitions. This throws the
  following error:
 
  I would like to understand the internals of what’s going on here so that
 I
  can adjust my approach accordingly. Thanks in advance.
 
 
  14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread
  [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem
  [sparkDriver]
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213)
  at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24)
  at
 
 akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
  at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
  at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
  at
 
 akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  Exception in thread main java.lang.OutOfMemoryError: GC overhead limit
  exceeded
  at java.util.Arrays.copyOfRange(Arrays.java:2694)
  at java.lang.String.init(String.java:203)
  at java.lang.String.substring(String.java:1913)
  at java.lang.String.subSequence(String.java:1946)
  at java.util.regex.Matcher.getSubSequence(Matcher.java:1245)
  at java.util.regex.Matcher.group(Matcher.java:490)
  at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675)
  at java.util.Formatter.parse(Formatter.java:2528)
  at java.util.Formatter.format(Formatter.java:2469)
  at java.util.Formatter.format(Formatter.java:2423)
  at java.lang.String.format(String.java:2790)
  at
 scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
  at scala.collection.immutable.StringOps.format(StringOps.scala:31)
  at org.apache.spark.util.Utils$.getCallSite(Utils.scala:944)
  at org.apache.spark.rdd.RDD.init(RDD.scala:1227)
  at org.apache.spark.rdd.RDD.init(RDD.scala:83)
  at
 
 org.apache.spark.rdd.PartitionwiseSampledRDD.init(PartitionwiseSampledRDD.scala:47)
  at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:378)
  at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:377)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
  at

Re: Questions about serialization and SparkConf

2014-10-29 Thread Ilya Ganelin
Hello Steve .

1) When you call new SparkConf you should get an object with the default
config values. You can reference the spark configuration and tuning pages
for details on what those are.

2) Yes. Properties set in this configuration will be pushed down to worker
nodes actually executing the spark job. The way this is done is through the
instance of a SparkContext which accepts the SparkConf as a parameter. This
shared config is what will be used by all RDDs and processes spawned as a
function of this context.

E.g. when creating a new RDD with sc.parallelize() or reading a text file
in with sc.textFile() .

I think that to address 3-4 you should reason in terms of the SparkContext.

In short, you shouldn't need to worry about explicitly controlling what is
happening on the slave nodes. Spark should abstract away that layer so that
you can write parallelizable code that the resource manager i.e. YARN
pushes out to your cluster.
On Oct 29, 2014 2:58 PM, Steve Lewis lordjoe2...@gmail.com wrote:

  Assume in my executor I say

 SparkConf sparkConf = new SparkConf();
 sparkConf.set(spark.kryo.registrator,
 com.lordjoe.distributed.hydra.HydraKryoSerializer);
sparkConf.set(mysparc.data, Some user Data);
  sparkConf.setAppName(Some App);

 Now
1) Are there default values set in some system file which are populated
 if I call new SparkConf - if not how do I get those? _ I think i see
 defaults foe the master, the Serializer...
 2) If I set a property in SparkConf for my SparkContext will I see
 that property in a Slave machine?
  3) If I set a property anf then call showSparkProperties() do I see
 that property set and if not how can I see the property set - say in
 another thread as in
  if in some other thread on the executor   say as in
 showSparkPropertiesInAnotherThread();
   4) How can a slave machine access properties set on the executor

 I an really interested in   sparkConf.set(spark.kryo.registrator,
 com.lordjoe.distributed.hydra.HydraKryoSerializer);
 which needs to be used by the Slave


/**
  * dump all spark properties to System.err
  */
 public static void showSparkProperties()
 {
 SparkConf sparkConf = new SparkConf();
 Tuple2String, String[] all = sparkConf.getAll();
 for (Tuple2String, String prp  : all) {
 System.err.println(prp._1().toString() + = + prp._2());
 }
 }

 public static void  showSparkPropertiesInAnotherThread()
 {
 new Thread(new Runnable() {
 @Override
 public void run() {
 showSparkProperties();
 }
 }).start();
 }




Re: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0

2014-10-28 Thread Ilya Ganelin
Hi all - I've simplified the code so now I'm literally feeding in 200
million ratings directly to ALS.train. Nothing else is happening in the
program.
I've also tried with both the regular serializer and the KryoSerializer.
With Kryo, I get the same ArrayIndex exceptions.

With the regular serializer I get the following error stack:

14/10/28 10:43:14 WARN TaskSetManager: Lost task 119.0 in stage 10.0 (TID
2282, innovationdatanode07.cof.ds.capitalone.com):
java.io.FileNotFoundException:
/opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1414016059040_0715/spark-local-20141028102246-dde7/06/shuffle_7_119_8
(No such file or directory)
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)

org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)

org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)

org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
14/10/28 10:43:14 INFO TaskSetManager: Starting task 119.1 in stage 10.0
(TID 2303, innovationdatanode07.cof.ds.capitalone.com, PROCESS_LOCAL, 5642
bytes)
14/10/28 10:43:14 WARN TaskSetManager: Lost task 119.1 in stage 10.0 (TID
2303, innovationdatanode07.cof.ds.capitalone.com):
java.io.FileNotFoundException:
/opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1414016059040_0715/spark-local-20141028102246-dde7/23/shuffle_8_90_119
(No such file or directory)
java.io.RandomAccessFile.open(Native Method)
java.io.RandomAccessFile.init(RandomAccessFile.java:241)
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:93)
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:116)

org.apache.spark.shuffle.FileShuffleBlockManager.getBytes(FileShuffleBlockManager.scala:190)

org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:361)

org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1$$anonfun$apply$10.apply(BlockFetcherIterator.scala:208)

org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1$$anonfun$apply$10.apply(BlockFetcherIterator.scala:208)

org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:258)

org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
.

This is an issue I referenced in the past here:
https://www.google.com/url?sa=trct=jq=esrc=ssource=webcd=1cad=rjauact=8ved=0CB4QFjAAurl=https%3A%2F%2Fmail-archives.apache.org%2Fmod_mbox%2Fincubator-spark-user%2F201410.mbox%2F%253CCAM-S9zS-%2B-MSXVcohWEhjiAEKaCccOKr_N5e0HPXcNgnxZd%3DHw%40mail.gmail.com%253Eei=97FPVIfyCsbgsASL94CoDQusg=AFQjCNEQ6gUlwpr6KzlcZVd0sQeCSdjQgQsig2=Ne7pL_Z94wN4g9BwSutsXQ

-Ilya Ganelin

On Mon, Oct 27, 2014 at 6:12 PM, Xiangrui Meng men...@gmail.com wrote:

 Could you save the data before ALS and try to reproduce the problem?
 You might try reducing the number of partitions and not using Kryo
 serialization, just to narrow down the issue. -Xiangrui

 On Mon, Oct 27, 2014 at 1:29 PM, Ilya Ganelin ilgan...@gmail.com wrote:
  Hi Burak.
 
  I always see this error. I'm running the CDH 5.2 version of Spark 1.1.0.
 I
  load my data from HDFS. By the time it hits the recommender it had gone
  through many spark operations.
 
  On Oct 27, 2014 4:03 PM, Burak Yavuz bya...@stanford.edu wrote:
 
  Hi,
 
  I've come across this multiple times, but not in a consistent manner. I
  found it hard to reproduce. I have a jira for it: SPARK-3080
 
  Do you observe this error every single time? Where do you load your data
  from? Which version of Spark are you running?
  Figuring out the similarities may help in pinpointing the bug.
 
  Thanks,
  Burak
 
  - Original Message -
  From: Ilya Ganelin ilgan...@gmail.com
  To: user user@spark.apache.org
  Sent: Monday, October 27, 2014 11:36:46 AM
  Subject: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0
 
  Hello all - I am attempting to run MLLib's ALS algorithm on a
 substantial

Re: How can number of partitions be set in spark-env.sh?

2014-10-28 Thread Ilya Ganelin
In Spark, certain functions have an optional parameter to determine the
number of partitions (distinct, textFile, etc..). You can also use the
coalesce () or repartiton() functions to change the number of partitions
for your RDD. Thanks.
On Oct 28, 2014 9:58 AM, shahab shahab.mok...@gmail.com wrote:

 Thanks for the useful comment. But I guess this setting applies only when
 I use SparkSQL  right=  is there any similar settings for Spark?

 best,
 /Shahab

 On Tue, Oct 28, 2014 at 2:38 PM, Wanda Hawk wanda_haw...@yahoo.com
 wrote:

 Is this what are you looking for ?

 In Shark, default reducer number is 1 and is controlled by the property
 mapred.reduce.tasks. Spark SQL deprecates this property in favor of
 spark.sql.shuffle.partitions, whose default value is 200. Users may
 customize this property via SET:

 SET spark.sql.shuffle.partitions=10;
 SELECT page, count(*) c
 FROM logs_last_month_cached
 GROUP BY page ORDER BY c DESC LIMIT 10;


 Spark SQL Programming Guide - Spark 1.1.0 Documentation
 http://spark.apache.org/docs/latest/sql-programming-guide.html






 Spark SQL Programming Guide - Spark 1.1.0 Documentation
 http://spark.apache.org/docs/latest/sql-programming-guide.html
 Spark SQL Programming Guide Overview Getting Started Data Sources RDDs
 Inferring the Schema Using Reflection Programmatically Specifying the
 Schema Parquet Files Loading Data Programmatically
 View on spark.apache.org
 http://spark.apache.org/docs/latest/sql-programming-guide.html
 Preview by Yahoo


   --
  *From:* shahab shahab.mok...@gmail.com
 *To:* user@spark.apache.org
 *Sent:* Tuesday, October 28, 2014 3:20 PM
 *Subject:* How can number of partitions be set in spark-env.sh?

 I am running a stand alone Spark cluster, 2 workers each has 2 cores.
 Apparently, I am loading and processing relatively large chunk of data so
 that I receive task failure   .  As I read from some posts and
 discussions in the mailing list the failures could be related to the large
 size of processing data in the partitions and if I have understood
 correctly I should have smaller partitions (but many of them) ?!

 Is there any way that I can set the number of partitions dynamically in
 spark-env.sh or in the submiited Spark application?


 best,
 /Shahab






MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0

2014-10-27 Thread Ilya Ganelin
Hello all - I am attempting to run MLLib's ALS algorithm on a substantial
test vector - approx. 200 million records.

I have resolved a few issues I've had with regards to garbage collection,
KryoSeralization, and memory usage.

I have not been able to get around this issue I see below however:


 java.lang.
 ArrayIndexOutOfBoundsException: 6106

 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.
 scala:543)
 scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 org.apache.spark.mllib.recommendation.ALS.org
 $apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537)

 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505)

 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504)

 org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)

 org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:144)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)


I do not have any negative indices or indices that exceed Int-Max.

I have partitioned the input data into 300 partitions and my Spark config
is below:

.set(spark.executor.memory, 14g)
  .set(spark.storage.memoryFraction, 0.8)
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
  .set(spark.kryo.registrator, MyRegistrator)
  .set(spark.core.connection.ack.wait.timeout,600)
  .set(spark.akka.frameSize,50)
  .set(spark.yarn.executor.memoryOverhead,1024)

Does anyone have any suggestions as to why i'm seeing the above error or
how to get around it?
It may be possible to upgrade to the latest version of Spark but the
mechanism for doing so in our environment isn't obvious yet.

-Ilya Ganelin


Re: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0

2014-10-27 Thread Ilya Ganelin
Hi Burak.

I always see this error. I'm running the CDH 5.2 version of Spark 1.1.0. I
load my data from HDFS. By the time it hits the recommender it had gone
through many spark operations.
On Oct 27, 2014 4:03 PM, Burak Yavuz bya...@stanford.edu wrote:

 Hi,

 I've come across this multiple times, but not in a consistent manner. I
 found it hard to reproduce. I have a jira for it: SPARK-3080

 Do you observe this error every single time? Where do you load your data
 from? Which version of Spark are you running?
 Figuring out the similarities may help in pinpointing the bug.

 Thanks,
 Burak

 - Original Message -
 From: Ilya Ganelin ilgan...@gmail.com
 To: user user@spark.apache.org
 Sent: Monday, October 27, 2014 11:36:46 AM
 Subject: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0

 Hello all - I am attempting to run MLLib's ALS algorithm on a substantial
 test vector - approx. 200 million records.

 I have resolved a few issues I've had with regards to garbage collection,
 KryoSeralization, and memory usage.

 I have not been able to get around this issue I see below however:


  java.lang.
  ArrayIndexOutOfBoundsException: 6106
 
 
 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.
  scala:543)
  scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  org.apache.spark.mllib.recommendation.ALS.org
  $apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537)
 
 
 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505)
 
 
 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504)
 
 
 org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
 
 
 org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 
 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:144)
 
 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
 
 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
 
 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 
 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
  org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)


 I do not have any negative indices or indices that exceed Int-Max.

 I have partitioned the input data into 300 partitions and my Spark config
 is below:

 .set(spark.executor.memory, 14g)
   .set(spark.storage.memoryFraction, 0.8)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   .set(spark.kryo.registrator, MyRegistrator)
   .set(spark.core.connection.ack.wait.timeout,600)
   .set(spark.akka.frameSize,50)
   .set(spark.yarn.executor.memoryOverhead,1024)

 Does anyone have any suggestions as to why i'm seeing the above error or
 how to get around it?
 It may be possible to upgrade to the latest version of Spark but the
 mechanism for doing so in our environment isn't obvious yet.

 -Ilya Ganelin




Num-executors and executor-cores overwritten by defaults

2014-10-21 Thread Ilya Ganelin
Hi all. Just upgraded our cluster to CDH 5.2 (with Spark 1.1) but now I can
no longer set the number of executors or executor-cores. No matter what
values I pass on the command line to spark they are overwritten by the
defaults. Does anyone have any idea what could have happened here? Running
on Spark 1.02 before I had no trouble.

Also I am able to launch the spark shell without these parameters being
overwritten.


Re: How do you write a JavaRDD into a single file

2014-10-20 Thread Ilya Ganelin
Hey Steve - the way to do this is to use the coalesce() function to
coalesce your RDD into a single partition. Then you can do a saveAsTextFile
and you'll wind up with outpuDir/part-0 containing all the data.

-Ilya Ganelin

On Mon, Oct 20, 2014 at 11:01 PM, jay vyas jayunit100.apa...@gmail.com
wrote:

 sounds more like a use case for using collect... and writing out the
 file in your program?

 On Mon, Oct 20, 2014 at 6:53 PM, Steve Lewis lordjoe2...@gmail.com
 wrote:

 Sorry I missed the discussion - although it did not answer the question -
 In my case (and I suspect the askers) the 100 slaves are doing a lot of
 useful work but the generated output is small enough to be handled by a
 single process.
 Many of the large data problems I have worked process a lot of data but
 end up with a single report file - frequently in a format specified by
 preexisting downstream code.
   I do not want a separate  hadoop merge step for a lot of reasons
 starting with
 better control of the generation of the file.
 However toLocalIterator is exactly what I need.
 Somewhat off topic - I am being overwhelmed by getting a lot of emails
 from the list - is there s way to get a daily summary which might be a lot
 easier to keep up with


 On Mon, Oct 20, 2014 at 3:23 PM, Sean Owen so...@cloudera.com wrote:

 This was covered a few days ago:


 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-write-a-RDD-into-One-Local-Existing-File-td16720.html

 The multiple output files is actually essential for parallelism, and
 certainly not a bad idea. You don't want 100 distributed workers
 writing to 1 file in 1 place, not if you want it to be fast.

 RDD and  JavaRDD already expose a method to iterate over the data,
 called toLocalIterator. It does not require that the RDD fit entirely
 in memory.

 On Mon, Oct 20, 2014 at 6:13 PM, Steve Lewis lordjoe2...@gmail.com
 wrote:
At the end of a set of computation I have a JavaRDDString . I want
 a
  single file where each string is printed in order. The data is small
 enough
  that it is acceptable to handle the printout on a single processor. It
 may
  be large enough that using collect to generate a list might be
 unacceptable.
  the saveAsText command creates multiple files with names like part,
  part0001  This was bed behavior in Hadoop for final output and is
 also
  bad for Spark.
A more general issue is whether is it possible to convert a JavaRDD
 into
  an iterator or iterable over then entire data set without using
 collect or
  holding all data in memory.
 In many problems where it is desirable to parallelize intermediate
 steps
  but use a single process for handling the final result this could be
 very
  useful.




 --
 Steven M. Lewis PhD
 4221 105th Ave NE
 Kirkland, WA 98033
 206-384-1340 (cell)
 Skype lordjoe_com




 --
 jay vyas



Re: What's wrong with my spark filter? I get org.apache.spark.SparkException: Task not serializable

2014-10-19 Thread Ilya Ganelin
Check for any variables you've declared in your class. Even if you're not
calling them from the function they are passed to the worker nodes as part
of the context. Consequently, if you have something without a default
serializer (like an imported class) it will also get passed.

To fix this you can either move that variable out of the class (make it
global) or you can implement kryo serialization (see the Spark tuning guide
for this).
On Oct 17, 2014 6:37 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 Probably I am missing very simple principle , but something is wrong with
 my filter,
 i get org.apache.spark.SparkException: Task not serializable expetion.

 here is my filter function:
 object OBJ {
def f1(): Boolean = {
  var i = 1;
  for (j-1 to 10) i = i +1;
  true;
}
 }

 rdd.filter(row = OBJ.f1())


 And when I run, I get the following exception:

 org.apache.spark.SparkException: Task not serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
 ...
 Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 ...



 best,
 /Shahab




Re: input split size

2014-10-18 Thread Ilya Ganelin
Also - if you're doing a text file read you can pass the number of
resulting partitions as the second argument.
On Oct 17, 2014 9:05 PM, Larry Liu larryli...@gmail.com wrote:

 Thanks, Andrew. What about reading out of local?

 On Fri, Oct 17, 2014 at 5:38 PM, Andrew Ash and...@andrewash.com wrote:

 When reading out of HDFS it's the HDFS block size.

 On Fri, Oct 17, 2014 at 5:27 PM, Larry Liu larryli...@gmail.com wrote:

 What is the default input split size? How to change it?






Re: IOException and appcache FileNotFoundException in Spark 1.02

2014-10-14 Thread Ilya Ganelin
Hello all . Does anyone else have any suggestions? Even understanding what
this error is from would help a lot.
On Oct 11, 2014 12:56 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi Akhil - I tried your suggestions and tried varying my partition sizes.
 Reducing the number of partitions led to memory errors (presumably - I saw
 IOExceptions much sooner).

 With the settings you provided the program ran for longer but ultimately
 crashes in the same way. I would like to understand what is going on
 internally leading to this.

 Could this be related to garbage collection?
 On Oct 10, 2014 3:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 You could be hitting this issue
 https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can
 try the following workarounds:

 sc.set(spark.core.connection.ack.wait.timeout,600)
 sc.set(spark.akka.frameSize,50)
 Also reduce the number of partitions, you could be hitting the kernel's
 ulimit. I faced this issue and it was gone when i dropped the partitions
 from 1600 to 200.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi all – I could use some help figuring out a couple of exceptions I’ve
 been getting regularly.

 I have been running on a fairly large dataset (150 gigs). With smaller
 datasets I don't have any issues.

 My sequence of operations is as follows – unless otherwise specified, I
 am not caching:

 Map a 30 million row x 70 col string table to approx 30 mil x  5 string
 (For read as textFile I am using 1500 partitions)

 From that, map to ((a,b), score) and reduceByKey, numPartitions = 180

 Then, extract distinct values for A and distinct values for B. (I cache
 the output of distinct), , numPartitions = 180

 Zip with index for A and for B (to remap strings to int)

 Join remapped ids with original table

 This is then fed into MLLIBs ALS algorithm.

 I am running with:

 Spark version 1.02 with CDH5.1

 numExecutors = 8, numCores = 14

 Memory = 12g

 MemoryFration = 0.7

 KryoSerialization

 My issue is that the code runs fine for a while but then will
 non-deterministically crash with either file IOExceptions or the following
 obscure error:

 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to
 java.io.IOException: Filesystem closed [duplicate 10]

 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException

 java.io.FileNotFoundException:
 /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354
 (No such file or directory)

 Looking through the logs, I see the IOException in other places but it
 appears to be non-catastrophic. The FileNotFoundException, however, is. I
 have found the following stack overflow that at least seems to address the
 IOException:


 http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed

 But I have not found anything useful at all with regards to the app
 cache error.

 Any help would be much appreciated.





Re: Breaking the previous large-scale sort record with Spark

2014-10-13 Thread Ilya Ganelin
Thank you for the details! Would you mind speaking to what tools proved
most useful as far as identifying bottlenecks or bugs? Thanks again.
On Oct 13, 2014 5:36 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

 The biggest scaling issue was supporting a large number of reduce tasks
 efficiently, which the JIRAs in that post handle. In particular, our
 current default shuffle (the hash-based one) has each map task open a
 separate file output stream for each reduce task, which wastes a lot of
 memory (since each stream has its own buffer).

 A second thing that helped efficiency tremendously was Reynold's new
 network module (https://issues.apache.org/jira/browse/SPARK-2468). Doing
 I/O on 32 cores, 10 Gbps Ethernet and 8+ disks efficiently is not easy, as
 can be seen when you try to scale up other software.

 Finally, with 30,000 tasks even sending info about every map's output size
 to each reducer was a problem, so Reynold has a patch that avoids that if
 the number of tasks is large.

 Matei

 On Oct 10, 2014, at 10:09 PM, Ilya Ganelin ilgan...@gmail.com wrote:

  Hi Matei - I read your post with great interest. Could you possibly
 comment in more depth on some of the issues you guys saw when scaling up
 spark and how you resolved them? I am interested specifically in
 spark-related problems. I'm working on scaling up spark to very large
 datasets and have been running into a variety of issues. Thanks in advance!
 
  On Oct 10, 2014 10:54 AM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
  Hi folks,
 
  I interrupt your regularly scheduled user / dev list to bring you some
 pretty cool news for the project, which is that we've been able to use
 Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x
 faster on 10x fewer nodes. There's a detailed writeup at
 http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
 Summary: while Hadoop MapReduce held last year's 100 TB world record by
 sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on
 206 nodes; and we also scaled up to sort 1 PB in 234 minutes.
 
  I want to thank Reynold Xin for leading this effort over the past few
 weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali
 Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for
 providing the machines to make this possible. Finally, this result would of
 course not be possible without the many many other contributions, testing
 and feature requests from throughout the community.
 
  For an engine to scale from these multi-hour petabyte batch jobs down to
 100-millisecond streaming and interactive queries is quite uncommon, and
 it's thanks to all of you folks that we are able to make this happen.
 
  Matei
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




Re: How To Implement More Than One Subquery in Scala/Spark

2014-10-11 Thread Ilya Ganelin
Because of how closures work in Scala, there is no support for nested
map/rdd-based operations. Specifically, if you have

Context a {
Context b {

}
}

Operations within context b, when distributed across nodes, will no longer
have visibility of variables specific to context a because that context is
not distributed alongside that operation!

To get around this you need to serialize your operations. For example , run
a map job. Take the output of that and run a second map job to filter.
Another option is to run two separate map jobs and join their results. Keep
in mind that another useful technique is to execute the groupByKey routine
, particularly if you want to operate on a particular variable.
On Oct 11, 2014 11:09 AM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:

 Hi,

 My Spark version is v1.1.0 and Hive is 0.12.0, I need to use more than 1
 subquery in my Spark SQL, below are my sample table structures and a SQL
 that contains more than 1 subquery.

 Question 1:  How to load a HIVE table into Scala/Spark?
 Question 2:  How to implement a SQL_WITH_MORE_THAN_ONE_SUBQUERY  in
 SCALA/SPARK?
 Question 3:  What is the DATEADD function in Scala/Spark? or how to
 implement  DATEADD(MONTH, 3, '2013-07-01')” and DATEADD(YEAR, 1, '
 2014-01-01')” in Spark or Hive?
 I can find HIVE (date_add(string startdate, int days)) but it is in days
 not MONTH / YEAR.

 Thanks.

 Regards
 Arthur

 ===
 My sample SQL with more than 1 subquery:
 SELECT S_NAME,
COUNT(*) AS NUMWAIT
 FROM   SUPPLIER,
LINEITEM L1,
ORDERS
 WHERE  S_SUPPKEY = L1.L_SUPPKEY
AND O_ORDERKEY = L1.L_ORDERKEY
AND O_ORDERSTATUS = 'F'
AND L1.L_RECEIPTDATE  L1.L_COMMITDATE
AND EXISTS (SELECT *
FROM   LINEITEM L2
WHERE  L2.L_ORDERKEY = L1.L_ORDERKEY
   AND L2.L_SUPPKEY  L1.L_SUPPKEY)
AND NOT EXISTS (SELECT *
FROM   LINEITEM L3
WHERE  L3.L_ORDERKEY = L1.L_ORDERKEY
   AND L3.L_SUPPKEY  L1.L_SUPPKEY
   AND L3.L_RECEIPTDATE  L3.L_COMMITDATE)
 GROUP  BY S_NAME
 ORDER  BY NUMWAIT DESC, S_NAME
 limit 100;


 ===
 Supplier Table:
 CREATE TABLE IF NOT EXISTS SUPPLIER (
 S_SUPPKEY INTEGER PRIMARY KEY,
 S_NAME  CHAR(25),
 S_ADDRESS VARCHAR(40),
 S_NATIONKEY BIGINT NOT NULL,
 S_PHONE CHAR(15),
 S_ACCTBAL DECIMAL,
 S_COMMENT VARCHAR(101)
 )

 ===
 Order Table:
 CREATE TABLE IF NOT EXISTS ORDERS (
 O_ORDERKEY INTEGER PRIMARY KEY,
 O_CUSTKEY BIGINT NOT NULL,
 O_ORDERSTATUS   CHAR(1),
 O_TOTALPRICEDECIMAL,
 O_ORDERDATE CHAR(10),
 O_ORDERPRIORITY CHAR(15),
 O_CLERK CHAR(15),
 O_SHIPPRIORITY  INTEGER,
 O_COMMENT VARCHAR(79)

 ===
 LineItem Table:
 CREATE TABLE IF NOT EXISTS LINEITEM (
 L_ORDERKEY  BIGINT not null,
 L_PARTKEY   BIGINT,
 L_SUPPKEY   BIGINT,
 L_LINENUMBERINTEGER not null,
 L_QUANTITY  DECIMAL,
 L_EXTENDEDPRICE DECIMAL,
 L_DISCOUNT  DECIMAL,
 L_TAX   DECIMAL,
 L_SHIPDATE  CHAR(10),
 L_COMMITDATECHAR(10),
 L_RECEIPTDATE   CHAR(10),
 L_RETURNFLAGCHAR(1),
 L_LINESTATUSCHAR(1),
 L_SHIPINSTRUCT  CHAR(25),
 L_SHIPMODE  CHAR(10),
 L_COMMENT   VARCHAR(44),
 CONSTRAINT pk PRIMARY KEY (L_ORDERKEY, L_LINENUMBER )
 )




Re: IOException and appcache FileNotFoundException in Spark 1.02

2014-10-10 Thread Ilya Ganelin
Thank you - I will try this. If I drop the partition count am I not more
likely to hit memory issues? Especially if the dataset is rather large?
On Oct 10, 2014 3:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 You could be hitting this issue
 https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can
 try the following workarounds:

 sc.set(spark.core.connection.ack.wait.timeout,600)
 sc.set(spark.akka.frameSize,50)
 Also reduce the number of partitions, you could be hitting the kernel's
 ulimit. I faced this issue and it was gone when i dropped the partitions
 from 1600 to 200.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi all – I could use some help figuring out a couple of exceptions I’ve
 been getting regularly.

 I have been running on a fairly large dataset (150 gigs). With smaller
 datasets I don't have any issues.

 My sequence of operations is as follows – unless otherwise specified, I
 am not caching:

 Map a 30 million row x 70 col string table to approx 30 mil x  5 string
 (For read as textFile I am using 1500 partitions)

 From that, map to ((a,b), score) and reduceByKey, numPartitions = 180

 Then, extract distinct values for A and distinct values for B. (I cache
 the output of distinct), , numPartitions = 180

 Zip with index for A and for B (to remap strings to int)

 Join remapped ids with original table

 This is then fed into MLLIBs ALS algorithm.

 I am running with:

 Spark version 1.02 with CDH5.1

 numExecutors = 8, numCores = 14

 Memory = 12g

 MemoryFration = 0.7

 KryoSerialization

 My issue is that the code runs fine for a while but then will
 non-deterministically crash with either file IOExceptions or the following
 obscure error:

 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to
 java.io.IOException: Filesystem closed [duplicate 10]

 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException

 java.io.FileNotFoundException:
 /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354
 (No such file or directory)

 Looking through the logs, I see the IOException in other places but it
 appears to be non-catastrophic. The FileNotFoundException, however, is. I
 have found the following stack overflow that at least seems to address the
 IOException:


 http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed

 But I have not found anything useful at all with regards to the app cache
 error.

 Any help would be much appreciated.





Re: IOException and appcache FileNotFoundException in Spark 1.02

2014-10-10 Thread Ilya Ganelin
Hi Akhil - I tried your suggestions and tried varying my partition sizes.
Reducing the number of partitions led to memory errors (presumably - I saw
IOExceptions much sooner).

With the settings you provided the program ran for longer but ultimately
crashes in the same way. I would like to understand what is going on
internally leading to this.

Could this be related to garbage collection?
On Oct 10, 2014 3:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 You could be hitting this issue
 https://issues.apache.org/jira/browse/SPARK-3633 (or similar). You can
 try the following workarounds:

 sc.set(spark.core.connection.ack.wait.timeout,600)
 sc.set(spark.akka.frameSize,50)
 Also reduce the number of partitions, you could be hitting the kernel's
 ulimit. I faced this issue and it was gone when i dropped the partitions
 from 1600 to 200.

 Thanks
 Best Regards

 On Fri, Oct 10, 2014 at 5:58 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Hi all – I could use some help figuring out a couple of exceptions I’ve
 been getting regularly.

 I have been running on a fairly large dataset (150 gigs). With smaller
 datasets I don't have any issues.

 My sequence of operations is as follows – unless otherwise specified, I
 am not caching:

 Map a 30 million row x 70 col string table to approx 30 mil x  5 string
 (For read as textFile I am using 1500 partitions)

 From that, map to ((a,b), score) and reduceByKey, numPartitions = 180

 Then, extract distinct values for A and distinct values for B. (I cache
 the output of distinct), , numPartitions = 180

 Zip with index for A and for B (to remap strings to int)

 Join remapped ids with original table

 This is then fed into MLLIBs ALS algorithm.

 I am running with:

 Spark version 1.02 with CDH5.1

 numExecutors = 8, numCores = 14

 Memory = 12g

 MemoryFration = 0.7

 KryoSerialization

 My issue is that the code runs fine for a while but then will
 non-deterministically crash with either file IOExceptions or the following
 obscure error:

 14/10/08 13:29:59 INFO TaskSetManager: Loss was due to
 java.io.IOException: Filesystem closed [duplicate 10]

 14/10/08 13:30:08 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException

 java.io.FileNotFoundException:
 /opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354
 (No such file or directory)

 Looking through the logs, I see the IOException in other places but it
 appears to be non-catastrophic. The FileNotFoundException, however, is. I
 have found the following stack overflow that at least seems to address the
 IOException:


 http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed

 But I have not found anything useful at all with regards to the app cache
 error.

 Any help would be much appreciated.





Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Ilya Ganelin
Hi Matei - I read your post with great interest. Could you possibly comment
in more depth on some of the issues you guys saw when scaling up spark and
how you resolved them? I am interested specifically in spark-related
problems. I'm working on scaling up spark to very large datasets and have
been running into a variety of issues. Thanks in advance!
On Oct 10, 2014 10:54 AM, Matei Zaharia matei.zaha...@gmail.com wrote:

 Hi folks,

 I interrupt your regularly scheduled user / dev list to bring you some
 pretty cool news for the project, which is that we've been able to use
 Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x
 faster on 10x fewer nodes. There's a detailed writeup at
 http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
 Summary: while Hadoop MapReduce held last year's 100 TB world record by
 sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on
 206 nodes; and we also scaled up to sort 1 PB in 234 minutes.

 I want to thank Reynold Xin for leading this effort over the past few
 weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali
 Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for
 providing the machines to make this possible. Finally, this result would of
 course not be possible without the many many other contributions, testing
 and feature requests from throughout the community.

 For an engine to scale from these multi-hour petabyte batch jobs down to
 100-millisecond streaming and interactive queries is quite uncommon, and
 it's thanks to all of you folks that we are able to make this happen.

 Matei
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Debug Spark in Cluster Mode

2014-10-10 Thread Ilya Ganelin
I would also be interested in knowing more about this. I have used the
cloudera manager and the spark resource interface (clientnode:4040) but
would love to know if there are other tools out there - either for post
processing or better observation during execution.
On Oct 9, 2014 4:50 PM, Rohit Pujari rpuj...@hortonworks.com wrote:

 Hello Folks:

 What're some best practices to debug Spark in cluster mode?


 Thanks,
 Rohit

 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity
 to which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.


IOException and appcache FileNotFoundException in Spark 1.02

2014-10-09 Thread Ilya Ganelin
On Oct 9, 2014 10:18 AM, Ilya Ganelin ilgan...@gmail.com wrote:


Hi all – I could use some help figuring out a couple of exceptions I’ve
been getting regularly.
I have been running on a fairly large dataset (150 gigs). With smaller
datasets I don't have any issues.

My sequence of operations is as follows – unless otherwise specified, I am
not caching:
Map a 30 million row x 70 col string table to approx 30 mil x  5 string
(For read as textFile I am using 1500 partitions)


From that, map to ((a,b), score) and reduceByKey, numPartitions = 180
Then, extract distinct values for A and distinct values for B. (I cache the
output of distinct), , numPartitions = 180
Zip with index for A and for B (to remap strings to int)
Join remapped ids with original table
This is then fed into MLLIBs ALS algorithm.

I am running with:
Spark version 1.02 with CDH5.1
numExecutors = 8, numCores = 14
Memory = 12g
MemoryFration = 0.7
KryoSerialization

My issue is that the code runs fine for a while but then will
non-deterministically crash with either file IOExceptions or the following
obscure error:

14/10/08 13:29:59 INFO TaskSetManager: Loss was due to java.io.IOException:
Filesystem closed [duplicate 10]
14/10/08 13:30:08 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354
(No such file or directory)

Looking through the logs, I see the IOException in other places but it
appears to be non-catastrophic. The FileNotFoundException, however, is. I
have found the following stack overflow that at least seems to address the
IOException:
http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
But I have not found anything useful at all with regards to the app cache
error.

Any help would be much appreciated.



-Ilya Ganelin


IOException and appcache FileNotFoundException in Spark 1.02

2014-10-09 Thread Ilya Ganelin
Hi all – I could use some help figuring out a couple of exceptions I’ve
been getting regularly.

I have been running on a fairly large dataset (150 gigs). With smaller
datasets I don't have any issues.

My sequence of operations is as follows – unless otherwise specified, I am
not caching:

Map a 30 million row x 70 col string table to approx 30 mil x  5 string
(For read as textFile I am using 1500 partitions)

From that, map to ((a,b), score) and reduceByKey, numPartitions = 180

Then, extract distinct values for A and distinct values for B. (I cache the
output of distinct), , numPartitions = 180

Zip with index for A and for B (to remap strings to int)

Join remapped ids with original table

This is then fed into MLLIBs ALS algorithm.

I am running with:

Spark version 1.02 with CDH5.1

numExecutors = 8, numCores = 14

Memory = 12g

MemoryFration = 0.7

KryoSerialization

My issue is that the code runs fine for a while but then will
non-deterministically crash with either file IOExceptions or the following
obscure error:

14/10/08 13:29:59 INFO TaskSetManager: Loss was due to java.io.IOException:
Filesystem closed [duplicate 10]

14/10/08 13:30:08 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException

java.io.FileNotFoundException:
/opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1412717093951_0024/spark-local-20141008131827-c082/1c/shuffle_3_117_354
(No such file or directory)

Looking through the logs, I see the IOException in other places but it
appears to be non-catastrophic. The FileNotFoundException, however, is. I
have found the following stack overflow that at least seems to address the
IOException:

http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed

But I have not found anything useful at all with regards to the app cache
error.

Any help would be much appreciated.