Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos

2015-06-28 Thread Iulian Dragoș
This is something we (at Typesafe) also thought about, but didn't start
yet. It would be good to pool efforts.

On Sat, Jun 27, 2015 at 12:44 AM, Dave Ariens dari...@blackberry.com
wrote:

  Fair. I will look into an alternative with a generated delegation token.
   However the same issue exists.   How can I have the executor run some
 arbitrary code when it gets a task assignment and before it proceeds to
 process it's resources?

 *From: *Marcelo Vanzin
 *Sent: *Friday, June 26, 2015 6:20 PM
 *To: *Dave Ariens
 *Cc: *Tim Chen; Olivier Girardot; user@spark.apache.org
 *Subject: *Re: Accessing Kerberos Secured HDFS Resources from Spark on
 Mesos

   On Fri, Jun 26, 2015 at 3:09 PM, Dave Ariens dari...@blackberry.com
 wrote:

  Would there be any way to have the task instances in the slaves call
 the UGI login with a principal/keytab provided to the driver?


  That would only work with a very small number of executors. If you have
 many login requests in a short period of time with the same principal, the
 KDC will start to deny logins. That's why delegation tokens are used
 instead of explicit logins.

  --
 Marcelo




-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Spark-Submit / Spark-Shell Error Standalone cluster

2015-06-28 Thread Tomas Hudik
/usr/bin/ - looks like strange directory. Did you copy some files to
/usr/bin yourself?
If you download (possible compile) spark - it will never be placed into
/usr/bin

On Sun, Jun 28, 2015 at 9:19 AM, Wojciech Pituła w.pit...@gmail.com wrote:

 I assume that /usr/bin/load-spark-env.sh exists. Have you got the rights
 to execute it?

 niedz., 28.06.2015 o 04:53 użytkownik Ashish Soni asoni.le...@gmail.com
 napisał:

 Not sure what is the issue but when i run the spark-submit or spark-shell
 i am getting below error

 /usr/bin/spark-class: line 24: /usr/bin/load-spark-env.sh: No such file
 or directory

 Can some one please help

 Thanks,




Re: Spark-Submit / Spark-Shell Error Standalone cluster

2015-06-28 Thread Wojciech Pituła
I assume that /usr/bin/load-spark-env.sh exists. Have you got the rights to
execute it?

niedz., 28.06.2015 o 04:53 użytkownik Ashish Soni asoni.le...@gmail.com
napisał:

 Not sure what is the issue but when i run the spark-submit or spark-shell
 i am getting below error

 /usr/bin/spark-class: line 24: /usr/bin/load-spark-env.sh: No such file or
 directory

 Can some one please help

 Thanks,



problem for submitting job

2015-06-28 Thread 郭谦
HI,

I'm a junior user of spark from China.

I have a problem about submit spark job right now. I want to submit job
from code.

In other words ,How to submit spark job from within java program to yarn
cluster without using spark-submit


   I've learnt from official site
http://spark.apache.org/docs/latest/submitting-applications.html

that using  bin/spark-submit script to submit a job to cluster is easy .


   Because the script may does lots of complex work such as setting up
the classpath with Spark and its dependencies.

If I don't use the script ,I have to deal with all complex work by
myself.It makes me feel really frustrated.


   I have search this problem from Google,but the answers may not suit
for me .


   In hadoop developing ,I know that after setting up Configuration
,Job and resources ,

we can submit hadoop job by coding like this:

job.waitForCompletion

It is convenient for users to submit job programmatically


I want to know if there is a schedule( may be in spark 1.5+?)that provide
users variety ways of submitting job like hadoop .

Like monitoring ,In the recent release spark(1.4.0) We can get statements
about spark applications by REST API right now.


Thanks  Regards

GUO QIAN


Re: Matrix Multiplication and mllib.recommendation

2015-06-28 Thread Ilya Ganelin
Ayman - it's really a question of recommending user to products vs products
to users. There will only be a difference if you're not doing All to All.
For example, if you're recommending only the Top N recommendations. Then
you may recommend only the top N products or the top N users which would be
different.
On Sun, Jun 28, 2015 at 8:34 AM Ayman Farahat ayman.fara...@yahoo.com
wrote:

 Thanks Ilya
 Is there an advantage of say partitioning by users /products when you
 train ?
 Here are two alternatives I have

 #Partition by user or Product
 tot = newrdd.map(lambda l:
 (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
 ratings = tot.values()
 model = ALS.train(ratings, rank, numIterations)

 #use zipwithIndex

 tot = newrdd.map(lambda l: (l[1],Rating(int(l[1]),int(l[2]),l[4])))
 bob = tot.zipWithIndex().map(lambda x : (x[1] ,x[0])).partitionBy(30)
 ratings = bob.values()
 model = ALS.train(ratings, rank, numIterations)


 On Jun 28, 2015, at 8:24 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 You can also select pieces of your RDD by first doing a zipWithIndex and
 then doing a filter operation on the second element of the RDD.

 For example to select the first 100 elements :

 Val a = rdd.zipWithIndex().filter(s = 1  s  100)
 On Sat, Jun 27, 2015 at 11:04 AM Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 How do you partition by product in Python?
 the only API is partitionBy(50)

 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also in my experiments, it's much faster to blocked BLAS through
 cartesian rather than doing sc.union. Here are the details on the
 experiments:

 https://issues.apache.org/jira/browse/SPARK-4823

 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also not sure how threading helps here because Spark puts a partition to
 each core. On each core may be there are multiple threads if you are using
 intel hyperthreading but I will let Spark handle the threading.

 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
 dgemm based calculation.

 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine.
 In my case, there can be even more products. You simply broadcast blocks 
 of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of 
 product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a
 recommendProducts method that blockifies the factors to make use of 
 level 3
 BLAS (ie matrix-matrix multiply). I am not sure if this is available in 
 The
 Python api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper 
 caching
 and use of broadcast variables this will take a while defending on the 
 size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in 
 parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have
 about 30
 ,000 products and 90 

Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos

2015-06-28 Thread Steve Loughran

On 27 Jun 2015, at 07:56, Tim Chen 
t...@mesosphere.iomailto:t...@mesosphere.io wrote:

Does YARN provide the token through that env variable you mentioned? Or how 
does YARN do this?



Roughly:

1. client-side launcher creates the delegation tokens and adds them as byte[] 
data to the the request.
2. The YARN RM uses the HDFS token for the localisation, so the node managers 
can access the content the user has the rights to.
3. There's some other stuff related to token refresh of restarted app masters, 
essentially guaranteeing that even an AM restarted 3 days after the first 
launch will still have current credentials.
4. It's the duty of the launched App master to download those delegated tokens 
and make use of them. partly through the UGI stuff, also through other 
mechanisms (example, a subset of the tokens are usually passed to the launched 
containers)
5. It's also the duty of the launched AM to deal with token renewal and expiry. 
Short-lived ( 72h) apps don't have to worry about this -making the jump to 
long lived services adds a lot of extra work (which is in Spark 1.4)


Tim

On Fri, Jun 26, 2015 at 3:51 PM, Marcelo Vanzin 
van...@cloudera.commailto:van...@cloudera.com wrote:
On Fri, Jun 26, 2015 at 3:44 PM, Dave Ariens 
dari...@blackberry.commailto:dari...@blackberry.com wrote:
Fair. I will look into an alternative with a generated delegation token.   
However the same issue exists.   How can I have the executor run some arbitrary 
code when it gets a task assignment and before it proceeds to process it's 
resources?

Hmm, good question. If it doesn't already, Mesos could have its own 
implementation of CoarseGrainedExecutorBackend that provides that 
functionality. The only difference is that you'd run something before the 
executor starts up, not before each task.

YARN actually doesn't do it that way; YARN provides the tokens to the executor 
before the process starts, so that when you call 
UserGroupInformation.getCurrentUser() the tokens are already there.

One way of doing that is by writing the tokens to a file and setting the 
KRB5CCNAME env variable when starting the process. You can check the Hadoop 
sources for details. Not sure if there's another way.



From: Marcelo Vanzin
Sent: Friday, June 26, 2015 6:20 PM
To: Dave Ariens
Cc: Tim Chen; Olivier Girardot; 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos


On Fri, Jun 26, 2015 at 3:09 PM, Dave Ariens 
dari...@blackberry.commailto:dari...@blackberry.com wrote:
Would there be any way to have the task instances in the slaves call the UGI 
login with a principal/keytab provided to the driver?

That would only work with a very small number of executors. If you have many 
login requests in a short period of time with the same principal, the KDC will 
start to deny logins. That's why delegation tokens are used instead of explicit 
logins.

--
Marcelo



--
Marcelo




Re: required: org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

2015-06-28 Thread Ted Yu
Can you show us your code around line 100 ?

Which Spark release are you compiling against ?

Cheers

On Sun, Jun 28, 2015 at 5:49 AM, Arthur Chan arthur.hk.c...@gmail.com
wrote:

 Hi,

 I am trying Spark with some sample programs,


 In my code, the following items are imported:

 import
 org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD,
 LabeledPoint}

 import org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD}

 import org.apache.spark.streaming.{Seconds, StreamingContext}

 import scala.util.Random

 I got following error:

 [error] StreamingModel.scala:100: type mismatch;

 [error]  found   :
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint]

 [error]  required:
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

 [error] model.predictOn(labeledStream).print()

 [error] ^

 [error] one error found

 [error] (compile:compile) Compilation failed


 Any idea?


 Regards



Re: Matrix Multiplication and mllib.recommendation

2015-06-28 Thread Ilya Ganelin
Oops - code should be :

Val a = rdd.zipWithIndex().filter(s = 1  s._2  100)

On Sun, Jun 28, 2015 at 8:24 AM Ilya Ganelin ilgan...@gmail.com wrote:

 You can also select pieces of your RDD by first doing a zipWithIndex and
 then doing a filter operation on the second element of the RDD.

 For example to select the first 100 elements :

 Val a = rdd.zipWithIndex().filter(s = 1  s  100)
 On Sat, Jun 27, 2015 at 11:04 AM Ayman Farahat
 ayman.fara...@yahoo.com.invalid wrote:

 How do you partition by product in Python?
 the only API is partitionBy(50)

 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also in my experiments, it's much faster to blocked BLAS through
 cartesian rather than doing sc.union. Here are the details on the
 experiments:

 https://issues.apache.org/jira/browse/SPARK-4823

 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also not sure how threading helps here because Spark puts a partition to
 each core. On each core may be there are multiple threads if you are using
 intel hyperthreading but I will let Spark handle the threading.

 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
 dgemm based calculation.

 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine.
 In my case, there can be even more products. You simply broadcast blocks 
 of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of 
 product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a
 recommendProducts method that blockifies the factors to make use of 
 level 3
 BLAS (ie matrix-matrix multiply). I am not sure if this is available in 
 The
 Python api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper 
 caching
 and use of broadcast variables this will take a while defending on the 
 size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in 
 parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have
 about 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix
 multiplication where
 I first get the product features, broadcast them and then do a dot
 product.
 Its still very slow. Any reason why
 here is a sample code

 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a


 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))



 --
 View this message in context:
 

What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?

2015-06-28 Thread YaoPau
I've heard Spark is not just MapReduce mentioned during Spark talks, but it
seems like every method that Spark has is really doing something like (Map
- Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with the
performance benefit of keeping RDDs in memory between stages.

Am I wrong about that?  Is Spark doing anything more efficiently than a
series of Maps followed by a Reduce in memory?  What methods does Spark have
that can't easily be mapped (with somewhat similar efficiency) to Map and
Reduce in memory?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Matrix Multiplication and mllib.recommendation

2015-06-28 Thread Ilya Ganelin
You can also select pieces of your RDD by first doing a zipWithIndex and
then doing a filter operation on the second element of the RDD.

For example to select the first 100 elements :

Val a = rdd.zipWithIndex().filter(s = 1  s  100)
On Sat, Jun 27, 2015 at 11:04 AM Ayman Farahat
ayman.fara...@yahoo.com.invalid wrote:

 How do you partition by product in Python?
 the only API is partitionBy(50)

 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also in my experiments, it's much faster to blocked BLAS through cartesian
 rather than doing sc.union. Here are the details on the experiments:

 https://issues.apache.org/jira/browse/SPARK-4823

 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Also not sure how threading helps here because Spark puts a partition to
 each core. On each core may be there are multiple threads if you are using
 intel hyperthreading but I will let Spark handle the threading.

 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
 dgemm based calculation.

 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine.
 In my case, there can be even more products. You simply broadcast blocks of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a recommendProducts
 method that blockifies the factors to make use of level 3 BLAS (ie
 matrix-matrix multiply). I am not sure if this is available in The Python
 api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper 
 caching
 and use of broadcast variables this will take a while defending on the 
 size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about
 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix
 multiplication where
 I first get the product features, broadcast them and then do a dot
 product.
 Its still very slow. Any reason why
 here is a sample code

 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a


 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com http://nabble.com/.

 

Re: Matrix Multiplication and mllib.recommendation

2015-06-28 Thread Ayman Farahat
Thanks Ilya
Is there an advantage of say partitioning by users /products when you train ?
Here are two alternatives I have 

#Partition by user or Product 
tot = newrdd.map(lambda l: 
(l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
ratings = tot.values()
model = ALS.train(ratings, rank, numIterations)

#use zipwithIndex

tot = newrdd.map(lambda l: (l[1],Rating(int(l[1]),int(l[2]),l[4])))
bob = tot.zipWithIndex().map(lambda x : (x[1] ,x[0])).partitionBy(30)
ratings = bob.values()
model = ALS.train(ratings, rank, numIterations)


On Jun 28, 2015, at 8:24 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 You can also select pieces of your RDD by first doing a zipWithIndex and then 
 doing a filter operation on the second element of the RDD. 
 
 For example to select the first 100 elements :
 
 Val a = rdd.zipWithIndex().filter(s = 1  s  100)
 On Sat, Jun 27, 2015 at 11:04 AM Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:
 How do you partition by product in Python?
 the only API is partitionBy(50)
 
 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com wrote:
 
 Also in my experiments, it's much faster to blocked BLAS through cartesian 
 rather than doing sc.union. Here are the details on the experiments:
 
 https://issues.apache.org/jira/browse/SPARK-4823
 
 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com 
 wrote:
 Also not sure how threading helps here because Spark puts a partition to 
 each core. On each core may be there are multiple threads if you are using 
 intel hyperthreading but I will let Spark handle the threading.  
 
 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com 
 wrote:
 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS 
 dgemm based calculation.
 
 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:
 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share. 
 Best
 Ayman
 
 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:
 
 Nick is right. I too have implemented this way and it works just fine. In 
 my case, there can be even more products. You simply broadcast blocks of 
 products to userFeatures.mapPartitions() and BLAS multiply in there to get 
 recommendations. In my case 10K products form one block. Note that you 
 would then have to union your recommendations. And if there lots of product 
 blocks, you might also want to checkpoint once every few times.
 
 Regards
 Sab
 
 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com 
 wrote:
 One issue is that you broadcast the product vectors and then do a dot 
 product one-by-one with the user vector.
 
 You should try forming a matrix of the item vectors and doing the dot 
 product as a matrix-vector multiply which will make things a lot faster.
 
 Another optimisation that is avalailable on 1.4 is a recommendProducts 
 method that blockifies the factors to make use of level 3 BLAS (ie 
 matrix-matrix multiply). I am not sure if this is available in The Python 
 api yet. 
 
 But you can do a version yourself by using mapPartitions over user factors, 
 blocking the factors into sub-matrices and doing matrix multiply with item 
 factor matrix to get scores on a block-by-block basis.
 
 Also as Ilya says more parallelism can help. I don't think it's so 
 necessary to do LSH with 30,000 items.
 
 —
 Sent from Mailbox
 
 
 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:
 
 Actually talk about this exact thing in a blog post here 
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
  Keep in mind, you're actually doing a ton of math. Even with proper 
 caching and use of broadcast variables this will take a while defending on 
 the size of your cluster. To get real results you may want to look into 
 locality sensitive hashing to limit your search space and definitely look 
 into spinning up multiple threads to process your product features in 
 parallel to increase resource utilization on the cluster.
 
 
 
 Thank you,
 Ilya Ganelin
 
 
 
 -Original Message-
 From: afarahat [ayman.fara...@yahoo.com]
 Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Matrix Multiplication and mllib.recommendation
 
 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication where
 I first get the product features, broadcast them and then do a dot product.
 Its still very slow. Any reason why
 here is a sample code
 
 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   

required: org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

2015-06-28 Thread Arthur Chan
Hi,

I am trying Spark with some sample programs,


In my code, the following items are imported:

import org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD,
LabeledPoint}

import org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.Random

I got following error:

[error] StreamingModel.scala:100: type mismatch;

[error]  found   :
org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint]

[error]  required:
org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

[error] model.predictOn(labeledStream).print()

[error] ^

[error] one error found

[error] (compile:compile) Compilation failed


Any idea?


Regards


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Running this now

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
-Phive -Phive-thriftserver -DskipTests clean package


Waiting for it to complete. There is no progress after initial log messages


//LOGS

$ ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
-Phive -Phive-thriftserver -DskipTests clean package

+++ dirname ./make-distribution.sh

++ cd .

++ pwd

+ SPARK_HOME=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0

+ DISTDIR=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

+ SPARK_TACHYON=false

+ TACHYON_VERSION=0.6.4

+ TACHYON_TGZ=tachyon-0.6.4-bin.tar.gz

+ TACHYON_URL=
https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gz

+ MAKE_TGZ=false

+ NAME=none

+ MVN=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn

+ ((  9  ))

+ case $1 in

+ MAKE_TGZ=true

+ shift

+ ((  8  ))

+ case $1 in

+ break

+ '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
']'

+ '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
']'

++ command -v git

+ '[' /usr/bin/git ']'

++ git rev-parse --short HEAD

++ :

+ GITREV=

+ '[' '!' -z '' ']'

+ unset GITREV

++ command -v /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn

+ '[' '!' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn ']'

++ /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn help:evaluate
-Dexpression=project.version -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
-Phive -Phive-thriftserver -DskipTests clean package

++ grep -v INFO

++ tail -n 1

//LOGS

On Sun, Jun 28, 2015 at 12:17 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz
 file ?

 On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote:

 You can use the following command to build Spark after applying the pull
 request:

 mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package


 Cheers


 On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I see that block support did not make it to spark 1.4 release.

 Can you share instructions of building spark with this support for
 hadoop 2.4.x distribution.

 appreciate.

 On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 This is nice. Which version of Spark has this support ? Or do I need to
 build it.
 I have never built Spark from git, please share instructions for Hadoop
 2.4.x YARN.

 I am struggling a lot to get a join work between 200G and 2TB datasets.
 I am constantly getting this exception

 1000s of executors are failing with

 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to
 get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
 java.io.IOException: Failed to connect to
 executor_host_name/executor_ip_address:60162
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com
 wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with
 Spark. Worst is am not able to figure out the reason of failure,  the 
 logs
 run into millions of lines and i do not know the keywords to search for
 failure reason

 On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We use Scoobi + 

Re: Unable to start Pi (hello world) application on Spark 1.4

2015-06-28 Thread ๏̯͡๏
Any thoughts on this ?

On Fri, Jun 26, 2015 at 2:27 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 It used to work with 1.3.1, however with 1.4.0 i get the following
 exception


 export SPARK_HOME=/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4
 export
 SPARK_JAR=/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar
 export HADOOP_CONF_DIR=/apache/hadoop/conf
 cd $SPARK_HOME
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar
 --jars
 /apache/hadoop/lib/hadoop-lzo-0.6.0.jar,/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
 --num-executors 1 --driver-memory 4g --driver-java-options
 -XX:MaxPermSize=2G --executor-memory 2g --executor-cores 1 --queue
 hdmi-express --class org.apache.spark.examples.SparkPi
 ./lib/spark-examples*.jar 10

 *Exception*

 15/06/26 14:24:42 INFO client.ConfiguredRMFailoverProxyProvider: Failing
 over to rm2

 15/06/26 14:24:42 WARN ipc.Client: Exception encountered while connecting
 to the server : java.lang.IllegalArgumentException: Server has invalid
 Kerberos principal: hadoop/x-y-rm-2.vip.cm@corp.cm.com


 I remember getting this error when working Spark 1.2.x where in the way i
 used to get

 */apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar*

 this library into cp. With 1.3.1 using --driver-class-path gets it running
 but with 1.4 it does not work

 Please suggest.

 --
 Deepak




-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread Ted Yu
maven command needs to be passed through --mvn option.

Cheers

On Sun, Jun 28, 2015 at 12:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Running this now

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
 -Phive -Phive-thriftserver -DskipTests clean package


 Waiting for it to complete. There is no progress after initial log messages


 //LOGS

 $ ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
 -Phive -Phive-thriftserver -DskipTests clean package

 +++ dirname ./make-distribution.sh

 ++ cd .

 ++ pwd

 + SPARK_HOME=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0

 + DISTDIR=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + SPARK_TACHYON=false

 + TACHYON_VERSION=0.6.4

 + TACHYON_TGZ=tachyon-0.6.4-bin.tar.gz

 + TACHYON_URL=
 https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gz

 + MAKE_TGZ=false

 + NAME=none

 + MVN=/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn

 + ((  9  ))

 + case $1 in

 + MAKE_TGZ=true

 + shift

 + ((  8  ))

 + case $1 in

 + break

 + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
 ']'

 + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/
 ']'

 ++ command -v git

 + '[' /usr/bin/git ']'

 ++ git rev-parse --short HEAD

 ++ :

 + GITREV=

 + '[' '!' -z '' ']'

 + unset GITREV

 ++ command -v /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn

 + '[' '!' /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn ']'

 ++ /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/build/mvn help:evaluate
 -Dexpression=project.version -Phadoop-2.4 -Pyarn -Dhadoop.version=2.4.0
 -Phive -Phive-thriftserver -DskipTests clean package

 ++ grep -v INFO

 ++ tail -n 1

 //LOGS

 On Sun, Jun 28, 2015 at 12:17 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz
 file ?

 On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote:

 You can use the following command to build Spark after applying the pull
 request:

 mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package


 Cheers


 On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I see that block support did not make it to spark 1.4 release.

 Can you share instructions of building spark with this support for
 hadoop 2.4.x distribution.

 appreciate.

 On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 This is nice. Which version of Spark has this support ? Or do I need
 to build it.
 I have never built Spark from git, please share instructions for
 Hadoop 2.4.x YARN.

 I am struggling a lot to get a join work between 200G and 2TB
 datasets. I am constantly getting this exception

 1000s of executors are failing with

 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to
 get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
 java.io.IOException: Failed to connect to
 executor_host_name/executor_ip_address:60162
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com
 wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with
 Spark. Worst is am not able to figure out the reason of failure,  the 
 logs
 run into millions of lines and i do not know the keywords to search for
 failure reason

 On 

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
 ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
-Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package


or


 ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
-Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
​Both fail with

+ echo -e 'Specify the Maven command with the --mvn flag'

Specify the Maven command with the --mvn flag

+ exit -1


Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
you need 1) to publish to inhouse maven, so your application can depend on
your version, and 2) use the spark distribution you compiled to launch your
job (assuming you run with yarn so you can launch multiple versions of
spark on same cluster)

On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 How can i import this pre-built spark into my application via maven as i
 want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak




Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I am able to use blockjoin API and it does not throw compilation error

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.blockJoin(viEvents,1,1).map {

}

Here viEvents is highly skewed and both are on HDFS.

What should be the optimal values of replication, i gave 1,1



On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as i
 want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
a blockJoin spreads out one side while replicating the other. i would
suggest replicating the smaller side. so if lstgItem is smaller try 3,1 or
else 1,3. this should spread the fat keys out over multiple (3)
executors...


On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as
 i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




 --
 Deepak




Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
You mentioned storage levels must be
(should be memory-and-disk or disk-only), number of partitions (should be
large, multiple of num executors),

how do i specify that ?

On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as
 i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




 --
 Deepak




-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I ran this w/o maven options

./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
-Phive-thriftserver

I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

I hope this is built with 2.4.x hadoop as i did specify -P

On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
How can i import this pre-built spark into my application via maven as i
want to use the block join API.

On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




-- 
Deepak


Re: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?

2015-06-28 Thread Koert Kuipers
spark is partitioner aware, so it can exploit a situation where 2 datasets
are partitioned the same way (for example by doing a map-side join on
them). map-red does not expose this.

On Sun, Jun 28, 2015 at 12:13 PM, YaoPau jonrgr...@gmail.com wrote:

 I've heard Spark is not just MapReduce mentioned during Spark talks, but
 it
 seems like every method that Spark has is really doing something like (Map
 - Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with the
 performance benefit of keeping RDDs in memory between stages.

 Am I wrong about that?  Is Spark doing anything more efficiently than a
 series of Maps followed by a Reduce in memory?  What methods does Spark
 have
 that can't easily be mapped (with somewhat similar efficiency) to Map and
 Reduce in memory?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
-Phive-thriftserver

Build was successful but the script faild. Is there a way to pass the
incremented version ?


[INFO] BUILD SUCCESS

[INFO]


[INFO] Total time: 09:56 min

[INFO] Finished at: 2015-06-28T13:45:29-07:00

[INFO] Final Memory: 84M/902M

[INFO]


+ rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

+ mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

+ echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

+ echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

+ cp
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

+ cp
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

+ cp
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

+ mkdir -p
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

+ cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

+ '[' 1 == 1 ']'

+ cp
'/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

cp:
/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
No such file or directory

LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
-Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com wrote:

 you need 1) to publish to inhouse maven, so your application can depend on
 your version, and 2) use the spark distribution you compiled to launch your
 job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as i
 want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





-- 
Deepak


Re: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?

2015-06-28 Thread Stephen Boesch
Vanilla map/reduce does not expose it: but hive on top of map/reduce has
superior partitioning (and bucketing) support to Spark.

2015-06-28 13:44 GMT-07:00 Koert Kuipers ko...@tresata.com:

 spark is partitioner aware, so it can exploit a situation where 2 datasets
 are partitioned the same way (for example by doing a map-side join on
 them). map-red does not expose this.

 On Sun, Jun 28, 2015 at 12:13 PM, YaoPau jonrgr...@gmail.com wrote:

 I've heard Spark is not just MapReduce mentioned during Spark talks,
 but it
 seems like every method that Spark has is really doing something like (Map
 - Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with
 the
 performance benefit of keeping RDDs in memory between stages.

 Am I wrong about that?  Is Spark doing anything more efficiently than a
 series of Maps followed by a Reduce in memory?  What methods does Spark
 have
 that can't easily be mapped (with somewhat similar efficiency) to Map and
 Reduce in memory?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





RE: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?

2015-06-28 Thread Ashic Mahtab
Spark comes with quite a few components. At it's core is..surprisespark 
core. This provides the core things required to run spark jobs. Spark provides 
a lot of operators out of the box...take a look at 
https://spark.apache.org/docs/latest/programming-guide.html#transformationshttps://spark.apache.org/docs/latest/programming-guide.html#actions
While all of them can be implemented with variations of rd.map().reduce(), 
there are optimisations to be gained in terms of data locality, etc., and the 
additional operators simply make life simpler.
In addition to the core stuff, spark also brings things like Spark Streaming, 
Spark Sql and data frames, MLLib, GraphX, etc. Spark Streaming gives you 
microbatches of rdds at periodic intervals.Think give me the last 15 seconds 
of events every 5 seconds. You can then program towards the small collection, 
and the job will run in a fault tolerant manner on your cluster. Spark Sql 
provides hive like functionality that works nicely with various data sources, 
and RDDs. MLLib provide a lot of oob machine learning algorithms, and the new 
Spark ML project provides a nice elegant pipeline api to take care of a lot of 
common machine learning tasks. GraphX allows you to represent data in graphs, 
and run graph algorithms on it. e.g. you can represent your data as RDDs of 
vertexes and edges, and run pagerank on a distributed cluster.
And there's moreso, yeah...Spark is definitely not just MapReduce. :)

 Date: Sun, 28 Jun 2015 09:13:18 -0700
 From: jonrgr...@gmail.com
 To: user@spark.apache.org
 Subject: What does Spark is not just MapReduce mean?  Isn't every Spark job 
 a form of MapReduce?
 
 I've heard Spark is not just MapReduce mentioned during Spark talks, but it
 seems like every method that Spark has is really doing something like (Map
 - Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with the
 performance benefit of keeping RDDs in memory between stages.
 
 Am I wrong about that?  Is Spark doing anything more efficiently than a
 series of Maps followed by a Reduce in memory?  What methods does Spark have
 that can't easily be mapped (with somewhat similar efficiency) to Map and
 Reduce in memory?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

Use logback instead of log4j in a Spark job

2015-06-28 Thread Mario Pastorelli
Hey sparkers,

I'm trying to use Logback for logging from my Spark jobs but I noticed that
if I submit the job with spark-submit then the log4j implementation of
slf4j is loaded instead of logback. Consequently, any call to
org.slf4j.LoggerFactory.getLogger will return a log4j logger instead of a
logback one, even if my job has been developed with only logback as
dependency. I got the jobs to work by not using spark-submit but I was
wondering if there is a better way to enforce the use of logback instead of
lo4j for a Spark job. Any idea?

Thanks,
Mario


spark-submit in deployment mode with the --jars option

2015-06-28 Thread hishamm
Hi,

I want to deploy my application on a standalone cluster. 
Spark submit acts in strange way. When I deploy the application in
*client* mode, everything works well and my application can see the
additional jar files. 

Here is the command:
   spark-submit --master spark://1.2.3.4:7077 --deploy-mode  client
 --supervise --jars $(echo /myjars/*.jar | tr ' ' ',')  --class
 com.algorithm /my/path/algorithm.jar 

However, when I submit the command in *cluster* deployment mode. The
driver can not see the additional jars.
I always get *java.lang.ClassNotFoundException*

Here is the command:
   spark-submit --master spark://1.2.3.4:7077 --deploy-mode cluster
 --supervise --jars $(echo /myjars/*.jar | tr ' ' ',')  --class
 com.algorithm /my/path/algorithm.jar 


Do I miss something ?

thanks,
Hisham



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-in-deployment-mode-with-the-jars-option-tp23519.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Join highly skewed datasets

2015-06-28 Thread Ted Yu
You can use the following command to build Spark after applying the pull
request:

mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package


Cheers


On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I see that block support did not make it to spark 1.4 release.

 Can you share instructions of building spark with this support for hadoop
 2.4.x distribution.

 appreciate.

 On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 This is nice. Which version of Spark has this support ? Or do I need to
 build it.
 I have never built Spark from git, please share instructions for Hadoop
 2.4.x YARN.

 I am struggling a lot to get a join work between 200G and 2TB datasets. I
 am constantly getting this exception

 1000s of executors are failing with

 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to
 get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
 java.io.IOException: Failed to connect to
 executor_host_name/executor_ip_address:60162
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with Spark.
 Worst is am not able to figure out the reason of failure,  the logs run
 into millions of lines and i do not know the keywords to search for failure
 reason

 On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We use Scoobi + MR to perform joins and we particularly use
 blockJoin() API of scoobi


 /** Perform an equijoin with another distributed list where this list
 is considerably smaller
 * than the right (but too large to fit in memory), and where the keys
 of right may be
 * particularly skewed. */

  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A,
 B))] =
 Relational.blockJoin(left, right)


 I am trying to do a POC and what Spark join API(s) is recommended to
 achieve something similar ?

 Please suggest.

 --
 Deepak





 --
 Deepak





 --
 Deepak




 --
 Deepak




Re: spark streaming with kafka reset offset

2015-06-28 Thread Shushant Arora
Few doubts :

In 1.2 streaming when I use union of streams , my streaming application
getting hanged sometimes and nothing gets printed on driver.


[Stage 2:

  (0 + 2) / 2]
 Whats is 0+2/2 here signifies.



1.Does no of streams in topicsMap.put(testSparkPartitioned, 3); be same
as numstreams=2 ? in unioned stream ?

2. I launched app on yarnRM with num-executors as 5 . It created 2
receivers and 5 execuots . As in stream receivers nodes get fixed at start
of app throughout its lifetime . Does executors gets allicated at start of
each job on 1s batch interval? If yes, how does its fast to allocate
resources. I mean if i increase num-executors to 50 , it will negotiate 50
executors from yarnRM at start of each job so does it takes more time in
allocating executors than batch interval(here 1s , say if 500ms).? Can i
fixed processing executors also throughout the app?




SparkConf conf = new SparkConf().setAppName(SampleSparkStreamingApp);
JavaStreamingContext jssc = new
JavaStreamingContext(conf,Durations.milliseconds(1000));

MapString,String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect,ipadd:2181);
kafkaParams.put(group.id, testgroup);
kafkaParams.put(zookeeper.session.timeout.ms, 1);
 MapString,Integer topicsMap = new HashMapString,Integer();
topicsMap.put(testSparkPartitioned, 3);
int numStreams = 2;
ListJavaPairDStreambyte[],byte[] kafkaStreams = new
ArrayListJavaPairDStreambyte[], byte[]();
  for(int i=0;inumStreams;i++){
 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
byte[].class,kafka.serializer.DefaultDecoder.class ,
kafka.serializer.DefaultDecoder.class,
kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
}
 JavaPairDStreambyte[], byte[] directKafkaStream =
jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
kafkaStreams.size()));
 JavaDStreamString lines = directKafkaStream.map(new
FunctionTuple2byte[],byte[], String() {

public String call(Tuple2byte[], byte[] arg0) throws Exception {
...processing
..return msg;
}
});
lines.print();
jssc.start();
jssc.awaitTermination();



---
3.For avoiding dataloss when we use checkpointing, and factory method to
create sparkConytext, is method name fixed
or we can use any name and how to set in app the method name to be used ?

4.In 1.3 non receiver based streaming, kafka offset is not stored in
zookeeper, is it because of zookeeper is not efficient for high writes and
read is not strictly consistent? So

 we use simple Kafka API that does not use Zookeeper and offsets tracked
only by Spark Streaming within its checkpoints. This eliminates
inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
record is received by Spark Streaming effectively exactly once despite
failures.

So we have to call context.checkpoint(hdfsdir)? Or is it implicit checkoint
location ? Means does hdfs be used for small data(just offset?)










On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi,

 There is another option to try for Receiver Based Low Level Kafka Consumer
 which is part of Spark-Packages (
 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
 can be used with WAL as well for end to end zero data loss.

 This is also Reliable Receiver and Commit offset to ZK.  Given the number
 of Kafka Partitions you have (  100) , using High Level Kafka API for
 Receiver based approach may leads to issues related Consumer Re-balancing
  which is a major issue of Kafka High Level API.

 Regards,
 Dibyendu



 On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das t...@databricks.com
 wrote:

 In the receiver based approach, If the receiver crashes for any reason
 (receiver crashed or executor crashed) the receiver should get restarted on
 another executor and should start reading data from the offset present in
 the zookeeper. There is some chance of data loss which can alleviated using
 Write Ahead Logs (see streaming programming guide for more details, or see
 my talk [Slides PDF
 http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx
 , Video
 https://www.youtube.com/watch?v=d5UJonrruHklist=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6index=4
 ] from last Spark Summit 2015). But that approach can give duplicate
 records. The direct approach gives exactly-once guarantees, so you should
 try it out.

 TD

 On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Read the spark streaming guide ad the kafka integration guide for a
 better understanding of how the receiver based stream works.

 Capacity planning is specific to your environment and what the job is
 actually doing, youll need to determine it empirically.


 On Friday, June 26, 2015, Shushant Arora shushantaror...@gmail.com
 wrote:

 In 

RE: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?

2015-06-28 Thread Michael Malak
I would also add, from a data locality theoretic standpoint, mapPartitions() 
provides for node-local computation that plain old map-reduce does not.


From my Android phone on T-Mobile. The first nationwide 4G network.

 Original message 
From: Ashic Mahtab as...@live.com 
Date: 06/28/2015  10:51 AM  (GMT-07:00) 
To: YaoPau jonrgr...@gmail.com,Apache Spark user@spark.apache.org 
Subject: RE: What does Spark is not just MapReduce mean?  Isn't every Spark 
job a form of MapReduce? 
 
Spark comes with quite a few components. At it's core is..surprisespark 
core. This provides the core things required to run spark jobs. Spark provides 
a lot of operators out of the box...take a look at 
https://spark.apache.org/docs/latest/programming-guide.html#transformations
https://spark.apache.org/docs/latest/programming-guide.html#actions

While all of them can be implemented with variations of rd.map().reduce(), 
there are optimisations to be gained in terms of data locality, etc., and the 
additional operators simply make life simpler.

In addition to the core stuff, spark also brings things like Spark Streaming, 
Spark Sql and data frames, MLLib, GraphX, etc. Spark Streaming gives you 
microbatches of rdds at periodic intervals.Think give me the last 15 seconds 
of events every 5 seconds. You can then program towards the small collection, 
and the job will run in a fault tolerant manner on your cluster. Spark Sql 
provides hive like functionality that works nicely with various data sources, 
and RDDs. MLLib provide a lot of oob machine learning algorithms, and the new 
Spark ML project provides a nice elegant pipeline api to take care of a lot of 
common machine learning tasks. GraphX allows you to represent data in graphs, 
and run graph algorithms on it. e.g. you can represent your data as RDDs of 
vertexes and edges, and run pagerank on a distributed cluster.

And there's moreso, yeah...Spark is definitely not just MapReduce. :)

 Date: Sun, 28 Jun 2015 09:13:18 -0700
 From: jonrgr...@gmail.com
 To: user@spark.apache.org
 Subject: What does Spark is not just MapReduce mean? Isn't every Spark job 
 a form of MapReduce?
 
 I've heard Spark is not just MapReduce mentioned during Spark talks, but it
 seems like every method that Spark has is really doing something like (Map
 - Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with the
 performance benefit of keeping RDDs in memory between stages.
 
 Am I wrong about that? Is Spark doing anything more efficiently than a
 series of Maps followed by a Reduce in memory? What methods does Spark have
 that can't easily be mapped (with somewhat similar efficiency) to Map and
 Reduce in memory?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


Share your cluster run details

2015-06-28 Thread ๏̯͡๏
I would like to get a sense of spark YARN cluster used around and this
thread can help others as well

1. Number of nodes in cluster
2. Container memory limit
3. Typical Hardware configuration of worker nodes
4. Typical number of executors used ?
5.  Any other related info you want to share.

How do you decide on number of executors/cores/memory given you know the
amount of data you will process with/without cache enabled.


-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I just did that, where can i find that spark-1.4.0-bin-hadoop2.4.tgz file
?

On Sun, Jun 28, 2015 at 12:15 PM, Ted Yu yuzhih...@gmail.com wrote:

 You can use the following command to build Spark after applying the pull
 request:

 mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package


 Cheers


 On Sun, Jun 28, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I see that block support did not make it to spark 1.4 release.

 Can you share instructions of building spark with this support for hadoop
 2.4.x distribution.

 appreciate.

 On Fri, Jun 26, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 This is nice. Which version of Spark has this support ? Or do I need to
 build it.
 I have never built Spark from git, please share instructions for Hadoop
 2.4.x YARN.

 I am struggling a lot to get a join work between 200G and 2TB datasets.
 I am constantly getting this exception

 1000s of executors are failing with

 15/06/26 13:05:28 ERROR storage.ShuffleBlockFetcherIterator: Failed to
 get block(s) from phxdpehdc9dn2125.stratus.phx.ebay.com:60162
 java.io.IOException: Failed to connect to
 executor_host_name/executor_ip_address:60162
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 On Fri, Jun 26, 2015 at 3:20 PM, Koert Kuipers ko...@tresata.com
 wrote:

 we went through a similar process, switching from scalding (where
 everything just works on large datasets) to spark (where it does not).

 spark can be made to work on very large datasets, it just requires a
 little more effort. pay attention to your storage levels (should be
 memory-and-disk or disk-only), number of partitions (should be large,
 multiple of num executors), and avoid groupByKey

 also see:
 https://github.com/tresata/spark-sorted (for avoiding in memory
 operations for certain type of reduce operations)
 https://github.com/apache/spark/pull/6883 (for blockjoin)


 On Fri, Jun 26, 2015 at 5:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Not far at all. On large data sets everything simply fails with Spark.
 Worst is am not able to figure out the reason of failure,  the logs run
 into millions of lines and i do not know the keywords to search for 
 failure
 reason

 On Mon, Jun 15, 2015 at 6:52 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 How far did you get?

 On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We use Scoobi + MR to perform joins and we particularly use
 blockJoin() API of scoobi


 /** Perform an equijoin with another distributed list where this
 list is considerably smaller
 * than the right (but too large to fit in memory), and where the
 keys of right may be
 * particularly skewed. */

  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A,
 B))] =
 Relational.blockJoin(left, right)


 I am trying to do a POC and what Spark join API(s) is recommended to
 achieve something similar ?

 Please suggest.

 --
 Deepak





 --
 Deepak





 --
 Deepak




 --
 Deepak





-- 
Deepak


Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Regarding # of executors.

I get 342 executors in parallel each time and i set executor-cores to 1.
Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions
while running blockJoin. Is this correct.

And is my assumptions on replication levels correct.

Did you get a chance to look at my processing.



On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers ko...@tresata.com wrote:

 regarding your calculation of executors... RAM in executor is not really
 comparable to size on disk.

 if you read from from file and write to file you do not have to set
 storage level.

 in the join or blockJoin specify number of partitions  as a multiple (say
 2 times) of number of cores available to you across all executors (so not
 just number of executors, on yarn you can have many cores per executor).

 On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Could you please suggest and help me understand further.

 This is the actual sizes

 -sh-4.1$ hadoop fs -count dw_lstg_item
1  764  2041084436189
 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
 *This is not skewed there is exactly one etntry for each item but its 2TB*
 So should its replication be set to 1 ?

 The below two datasets (RDD) are unioned and their total size is 150G.
 These can be skewed and
 hence we use block join with Scoobi + MR.
 *So should its replication be set to 3 ?*
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
1  10173796345977
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
1  10185559964549
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21

 Also can you suggest the number of executors to be used in this case ,
 each executor is started with max 14G of memory?

 Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors
 ? Is this calculation correct ?

 And also please suggest on the
 (should be memory-and-disk or disk-only), number of partitions (should
 be large, multiple of num executors),


 https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

 When do i choose this setting ?  (Attached is my code for reference)



 On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote:

 a blockJoin spreads out one side while replicating the other. i would
 suggest replicating the smaller side. so if lstgItem is smaller try 3,1
 or else 1,3. this should spread the fat keys out over multiple (3)
 executors...


 On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 

Re: required: org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

2015-06-28 Thread Arthur Chan
Hi,


line 99:model.trainOn(labeledStream)

line 100: model.predictOn(labeledStream).print()

line 101:ssc.start()

line 102: ssc.awaitTermination()


Regards

On Sun, Jun 28, 2015 at 10:53 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you show us your code around line 100 ?

 Which Spark release are you compiling against ?

 Cheers

 On Sun, Jun 28, 2015 at 5:49 AM, Arthur Chan arthur.hk.c...@gmail.com
 wrote:

 Hi,

 I am trying Spark with some sample programs,


 In my code, the following items are imported:

 import
 org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD,
 LabeledPoint}

 import
 org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD}

 import org.apache.spark.streaming.{Seconds, StreamingContext}

 import scala.util.Random

 I got following error:

 [error] StreamingModel.scala:100: type mismatch;

 [error]  found   :
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint]

 [error]  required:
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

 [error] model.predictOn(labeledStream).print()

 [error] ^

 [error] one error found

 [error] (compile:compile) Compilation failed


 Any idea?


 Regards





Re: required: org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

2015-06-28 Thread Feynman Liang
You are trying to predict on a DStream[LabeledPoint] (data + labels) but
predictOn expects a DStream[Vector] (just the data without the labels).

Try doing:

val unlabeledStream = labeledStream.map { x = x.features }
model.predictOn(unlabeledStream).print()

On Sun, Jun 28, 2015 at 6:03 PM, Arthur Chan arthur.hk.c...@gmail.com
wrote:

 also my Spark is 1.4

 On Mon, Jun 29, 2015 at 9:02 AM, Arthur Chan arthur.hk.c...@gmail.com
 wrote:



 Hi,


 line 99:model.trainOn(labeledStream)

 line 100: model.predictOn(labeledStream).print()

 line 101:ssc.start()

 line 102: ssc.awaitTermination()


 Regards

 On Sun, Jun 28, 2015 at 10:53 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you show us your code around line 100 ?

 Which Spark release are you compiling against ?

 Cheers

 On Sun, Jun 28, 2015 at 5:49 AM, Arthur Chan arthur.hk.c...@gmail.com
 wrote:

 Hi,

 I am trying Spark with some sample programs,


 In my code, the following items are imported:

 import
 org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD,
 LabeledPoint}

 import
 org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD}

 import org.apache.spark.streaming.{Seconds, StreamingContext}

 import scala.util.Random

 I got following error:

 [error] StreamingModel.scala:100: type mismatch;

 [error]  found   :
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint]

 [error]  required:
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

 [error] model.predictOn(labeledStream).print()

 [error] ^

 [error] one error found

 [error] (compile:compile) Compilation failed


 Any idea?


 Regards







Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
Could you please suggest and help me understand further.

This is the actual sizes

-sh-4.1$ hadoop fs -count dw_lstg_item
   1  764  2041084436189
/sys/edw/dw_lstg_item/snapshot/2015/06/25/00
*This is not skewed there is exactly one etntry for each item but its 2TB*
So should its replication be set to 1 ?

The below two datasets (RDD) are unioned and their total size is 150G.
These can be skewed and
hence we use block join with Scoobi + MR.
*So should its replication be set to 3 ?*
-sh-4.1$ hadoop fs -count
/apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
   1  10173796345977
/apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
-sh-4.1$ hadoop fs -count
/apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
   1  10185559964549
/apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21

Also can you suggest the number of executors to be used in this case , each
executor is started with max 14G of memory?

Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors ?
Is this calculation correct ?

And also please suggest on the
(should be memory-and-disk or disk-only), number of partitions (should be
large, multiple of num executors),

https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

When do i choose this setting ?  (Attached is my code for reference)



On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote:

 a blockJoin spreads out one side while replicating the other. i would
 suggest replicating the smaller side. so if lstgItem is smaller try 3,1
 or else 1,3. this should spread the fat keys out over multiple (3)
 executors...


 On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com
 wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as
 i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, 

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
specify numPartitions or partitioner for operations that shuffle.

so use:
def join[W](other: RDD[(K, W)], numPartitions: Int)

or
def blockJoin[W](
  other: JavaPairRDD[K, W],
  leftReplication: Int,
  rightReplication: Int,
  partitioner: Partitioner)

for example:
left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions))



On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 You mentioned storage levels must be
 (should be memory-and-disk or disk-only), number of partitions (should be
 large, multiple of num executors),

 how do i specify that ?

 On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com
 wrote:

 you need 1) to publish to inhouse maven, so your application can depend
 on your version, and 2) use the spark distribution you compiled to launch
 your job (assuming you run with yarn so you can launch multiple versions of
 spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven as
 i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




 --
 Deepak




 --
 Deepak




Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
My code:

val viEvents = details.filter(_.get(14).asInstanceOf[Long] !=
NULL_VALUE).map
{ vi = (vi.get(14).asInstanceOf[Long], vi) } //AVRO (150G)

val lstgItem = DataUtil.getDwLstgItem(sc,
DateUtil.addDaysToDate(startDate, -89)).filter(_.getItemId().toLong !=
NULL_VALUE).map { lstg = (lstg.getItemId().toLong, lstg) } // SEQUENCE
(2TB)


val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
Long))] = viEvents.blockJoin(lstgItem, 3, 1, new HashPartitioner(2141)).map
{

}



On Sun, Jun 28, 2015 at 3:03 PM, Koert Kuipers ko...@tresata.com wrote:

 specify numPartitions or partitioner for operations that shuffle.

 so use:
 def join[W](other: RDD[(K, W)], numPartitions: Int)

 or
 def blockJoin[W](
   other: JavaPairRDD[K, W],
   leftReplication: Int,
   rightReplication: Int,
   partitioner: Partitioner)

 for example:
 left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions))



 On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 You mentioned storage levels must be
 (should be memory-and-disk or disk-only), number of partitions (should be
 large, multiple of num executors),

 how do i specify that ?

 On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com
 wrote:

 you need 1) to publish to inhouse maven, so your application can
 depend on your version, and 2) use the spark distribution you compiled to
 launch your job (assuming you run with yarn so you can launch multiple
 versions of spark on same cluster)

 On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 How can i import this pre-built spark into my application via maven
 as i want to use the block join API.

 On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I ran this w/o maven options

 ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.

 I hope this is built with 2.4.x hadoop as i did specify -P

 On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package


 or


  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean 
 package
 ​Both fail with

 + echo -e 'Specify the Maven command with the --mvn flag'

 Specify the Maven command with the --mvn flag

 + exit -1




 --
 Deepak




 --
 Deepak





 --
 Deepak




 --
 

Re: Join highly skewed datasets

2015-06-28 Thread ๏̯͡๏
I am unable to run my application or sample application with prebuilt spark
1.4 and wit this custom 1.4. In both cases i get this error

15/06/28 15:30:07 WARN ipc.Client: Exception encountered while connecting
to the server : java.lang.IllegalArgumentException: Server has invalid
Kerberos principal: hadoop/r...@corp.x.com


Please let me know what is the correct way to specify JARS with 1.4. The
below command used to work with 1.3.1


*Command*

*./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
 --num-executors 9973 --driver-memory 14g --driver-java-options
-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps --executor-memory 14g --executor-cores 1 --queue
hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-06-20 endDate=2015-06-21
input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=200G *



On Sun, Jun 28, 2015 at 3:09 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 My code:

 val viEvents = details.filter(_.get(14).asInstanceOf[Long] !=
 NULL_VALUE).map { vi = (vi.get(14).asInstanceOf[Long], vi) } //AVRO
 (150G)

 val lstgItem = DataUtil.getDwLstgItem(sc,
 DateUtil.addDaysToDate(startDate, -89)).filter(_.getItemId().toLong !=
 NULL_VALUE).map { lstg = (lstg.getItemId().toLong, lstg) } // SEQUENCE
 (2TB)


 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = viEvents.blockJoin(lstgItem, 3, 1, new HashPartitioner(2141)).map
 {

 }



 On Sun, Jun 28, 2015 at 3:03 PM, Koert Kuipers ko...@tresata.com wrote:

 specify numPartitions or partitioner for operations that shuffle.

 so use:
 def join[W](other: RDD[(K, W)], numPartitions: Int)

 or
 def blockJoin[W](
   other: JavaPairRDD[K, W],
   leftReplication: Int,
   rightReplication: Int,
   partitioner: Partitioner)

 for example:
 left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions))



 On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 You mentioned storage levels must be
 (should be memory-and-disk or disk-only), number of partitions (should
 be large, multiple of num executors),

 how do i specify that ?

 On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
regarding your calculation of executors... RAM in executor is not really
comparable to size on disk.

if you read from from file and write to file you do not have to set storage
level.

in the join or blockJoin specify number of partitions  as a multiple (say 2
times) of number of cores available to you across all executors (so not
just number of executors, on yarn you can have many cores per executor).

On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Could you please suggest and help me understand further.

 This is the actual sizes

 -sh-4.1$ hadoop fs -count dw_lstg_item
1  764  2041084436189
 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
 *This is not skewed there is exactly one etntry for each item but its 2TB*
 So should its replication be set to 1 ?

 The below two datasets (RDD) are unioned and their total size is 150G.
 These can be skewed and
 hence we use block join with Scoobi + MR.
 *So should its replication be set to 3 ?*
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
1  10173796345977
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
1  10185559964549
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21

 Also can you suggest the number of executors to be used in this case ,
 each executor is started with max 14G of memory?

 Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors ?
 Is this calculation correct ?

 And also please suggest on the
 (should be memory-and-disk or disk-only), number of partitions (should
 be large, multiple of num executors),


 https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

 When do i choose this setting ?  (Attached is my code for reference)



 On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com wrote:

 a blockJoin spreads out one side while replicating the other. i would
 suggest replicating the smaller side. so if lstgItem is smaller try 3,1
 or else 1,3. this should spread the fat keys out over multiple (3)
 executors...


 On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/

 + '[' 1 == 1 ']'

 + cp
 '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 cp:
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
 No such file or directory

 LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
 -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver



 On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers ko...@tresata.com
 wrote:

 you need 1) to publish to inhouse maven, so your application can
 depend on your version, and 2) use the spark distribution you compiled to
 launch your job (assuming you run with yarn so you can launch multiple
 versions of spark on same cluster)

 On Sun, Jun 28, 2015 at 

Re: Join highly skewed datasets

2015-06-28 Thread Koert Kuipers
other people might disagree, but i have had better luck with a model that
looks more like traditional map-red if you use spark for disk-to-disk
computations: more cores per executor (and so less RAM per core/task). so i
would suggest trying --executor-cores 4 and adjust numPartitions
accordingly.

On Sun, Jun 28, 2015 at 6:45 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Regarding # of executors.

 I get 342 executors in parallel each time and i set executor-cores to 1.
 Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions
 while running blockJoin. Is this correct.

 And is my assumptions on replication levels correct.

 Did you get a chance to look at my processing.



 On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers ko...@tresata.com wrote:

 regarding your calculation of executors... RAM in executor is not really
 comparable to size on disk.

 if you read from from file and write to file you do not have to set
 storage level.

 in the join or blockJoin specify number of partitions  as a multiple (say
 2 times) of number of cores available to you across all executors (so not
 just number of executors, on yarn you can have many cores per executor).

 On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Could you please suggest and help me understand further.

 This is the actual sizes

 -sh-4.1$ hadoop fs -count dw_lstg_item
1  764  2041084436189
 /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
 *This is not skewed there is exactly one etntry for each item but its
 2TB*
 So should its replication be set to 1 ?

 The below two datasets (RDD) are unioned and their total size is 150G.
 These can be skewed and
 hence we use block join with Scoobi + MR.
 *So should its replication be set to 3 ?*
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
1  10173796345977
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
 -sh-4.1$ hadoop fs -count
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
1  10185559964549
 /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21

 Also can you suggest the number of executors to be used in this case ,
 each executor is started with max 14G of memory?

 Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors
 ? Is this calculation correct ?

 And also please suggest on the
 (should be memory-and-disk or disk-only), number of partitions (should
 be large, multiple of num executors),


 https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

 When do i choose this setting ?  (Attached is my code for reference)



 On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers ko...@tresata.com
 wrote:

 a blockJoin spreads out one side while replicating the other. i would
 suggest replicating the smaller side. so if lstgItem is smaller try
 3,1 or else 1,3. this should spread the fat keys out over multiple (3)
 executors...


 On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I am able to use blockjoin API and it does not throw compilation error

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.blockJoin(viEvents,1,1).map {

 }

 Here viEvents is highly skewed and both are on HDFS.

 What should be the optimal values of replication, i gave 1,1



 On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran

  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
 -Phive-thriftserver

 Build was successful but the script faild. Is there a way to pass the
 incremented version ?


 [INFO] BUILD SUCCESS

 [INFO]
 

 [INFO] Total time: 09:56 min

 [INFO] Finished at: 2015-06-28T13:45:29-07:00

 [INFO] Final Memory: 84M/902M

 [INFO]
 

 + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist

 + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib

 + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'

 + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + cp
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/

 + mkdir -p
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main

 + cp -r
 /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
 

Re: Unable to start Pi (hello world) application on Spark 1.4

2015-06-28 Thread ๏̯͡๏
Figured it out.

All the jars that are specified with driver-class-path are now exported
through SPARK_CLASSPATH and its working now.

I thought SPARK_CLASSPATH was dead. Looks like its flipping ON/OFF

On Sun, Jun 28, 2015 at 12:55 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Any thoughts on this ?

 On Fri, Jun 26, 2015 at 2:27 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 It used to work with 1.3.1, however with 1.4.0 i get the following
 exception


 export SPARK_HOME=/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4
 export
 SPARK_JAR=/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar
 export HADOOP_CONF_DIR=/apache/hadoop/conf
 cd $SPARK_HOME
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar
 --jars
 /apache/hadoop/lib/hadoop-lzo-0.6.0.jar,/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.4/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
 --num-executors 1 --driver-memory 4g --driver-java-options
 -XX:MaxPermSize=2G --executor-memory 2g --executor-cores 1 --queue
 hdmi-express --class org.apache.spark.examples.SparkPi
 ./lib/spark-examples*.jar 10

 *Exception*

 15/06/26 14:24:42 INFO client.ConfiguredRMFailoverProxyProvider: Failing
 over to rm2

 15/06/26 14:24:42 WARN ipc.Client: Exception encountered while connecting
 to the server : java.lang.IllegalArgumentException: Server has invalid
 Kerberos principal: hadoop/x-y-rm-2.vip.cm@corp.cm.com


 I remember getting this error when working Spark 1.2.x where in the way i
 used to get

 */apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar*

 this library into cp. With 1.3.1 using --driver-class-path gets it
 running but with 1.4 it does not work

 Please suggest.

 --
 Deepak




 --
 Deepak




-- 
Deepak


Re: Fine control with sc.sequenceFile

2015-06-28 Thread ๏̯͡๏
val hadoopConf = new Configuration(sc.hadoopConfiguration)

hadoopConf.set(mapreduce.input.fileinputformat.split.maxsize,
67108864)


sc.hadoopConfiguration(hadoopConf)

or

sc.hadoopConfiguration = hadoopConf

threw error.

On Sun, Jun 28, 2015 at 9:32 PM, Ted Yu yuzhih...@gmail.com wrote:

 sequenceFile() calls hadoopFile() where:
 val confBroadcast = broadcast(new
 SerializableConfiguration(hadoopConfiguration))

 You can set the parameter in sc.hadoopConfiguration before calling
 sc.sequenceFile().

 Cheers

 On Sun, Jun 28, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I can do this

 val hadoopConf = new Configuration(sc.hadoopConfiguration)

 *hadoopConf.set(mapreduce.input.fileinputformat.split.maxsize,
 67108864)*

 sc.newAPIHadoopFile(

   path + /*.avro,

   classOf[AvroKeyInputFormat[GenericRecord]],

   classOf[AvroKey[GenericRecord]],

   classOf[NullWritable],

   hadoopConf)


 But i cant do the same with

 sc.sequenceFile(path, classOf[Text], classOf[Text])
 How can i achieve the same with sequenceFile
 --
 Deepak





-- 
Deepak


Fine control with sc.sequenceFile

2015-06-28 Thread ๏̯͡๏
I can do this

val hadoopConf = new Configuration(sc.hadoopConfiguration)

*hadoopConf.set(mapreduce.input.fileinputformat.split.maxsize,
67108864)*

sc.newAPIHadoopFile(

  path + /*.avro,

  classOf[AvroKeyInputFormat[GenericRecord]],

  classOf[AvroKey[GenericRecord]],

  classOf[NullWritable],

  hadoopConf)


But i cant do the same with

sc.sequenceFile(path, classOf[Text], classOf[Text])
How can i achieve the same with sequenceFile
-- 
Deepak


Re: Fine control with sc.sequenceFile

2015-06-28 Thread ๏̯͡๏
sc.hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize,
67108864)

sc.sequenceFile(getMostRecentDirectory(tablePath, _.startsWith(_)).get
+ /*, classOf[Text], classOf[Text])

works

On Sun, Jun 28, 2015 at 9:46 PM, Ted Yu yuzhih...@gmail.com wrote:

 There isn't setter for sc.hadoopConfiguration
 You can directly change value of parameter in sc.hadoopConfiguration

 However, see the note in scaladoc:
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not
 to modify it unless you
* plan to set some global configurations for all Hadoop RDDs.

 Cheers

 On Sun, Jun 28, 2015 at 9:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 val hadoopConf = new Configuration(sc.hadoopConfiguration)

 hadoopConf.set(mapreduce.input.fileinputformat.split.maxsize,
 67108864)


 sc.hadoopConfiguration(hadoopConf)

 or

 sc.hadoopConfiguration = hadoopConf

 threw error.

 On Sun, Jun 28, 2015 at 9:32 PM, Ted Yu yuzhih...@gmail.com wrote:

 sequenceFile() calls hadoopFile() where:
 val confBroadcast = broadcast(new
 SerializableConfiguration(hadoopConfiguration))

 You can set the parameter in sc.hadoopConfiguration before calling
 sc.sequenceFile().

 Cheers

 On Sun, Jun 28, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I can do this

 val hadoopConf = new Configuration(sc.hadoopConfiguration)

 *hadoopConf.set(mapreduce.input.fileinputformat.split.maxsize,
 67108864)*

 sc.newAPIHadoopFile(

   path + /*.avro,

   classOf[AvroKeyInputFormat[GenericRecord]],

   classOf[AvroKey[GenericRecord]],

   classOf[NullWritable],

   hadoopConf)


 But i cant do the same with

 sc.sequenceFile(path, classOf[Text], classOf[Text])
 How can i achieve the same with sequenceFile
 --
 Deepak





 --
 Deepak





-- 
Deepak


Re: Fine control with sc.sequenceFile

2015-06-28 Thread Ted Yu
There isn't setter for sc.hadoopConfiguration
You can directly change value of parameter in sc.hadoopConfiguration

However, see the note in scaladoc:
   * '''Note:''' As it will be reused in all Hadoop RDDs, it's better not
to modify it unless you
   * plan to set some global configurations for all Hadoop RDDs.

Cheers

On Sun, Jun 28, 2015 at 9:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 val hadoopConf = new Configuration(sc.hadoopConfiguration)

 hadoopConf.set(mapreduce.input.fileinputformat.split.maxsize,
 67108864)


 sc.hadoopConfiguration(hadoopConf)

 or

 sc.hadoopConfiguration = hadoopConf

 threw error.

 On Sun, Jun 28, 2015 at 9:32 PM, Ted Yu yuzhih...@gmail.com wrote:

 sequenceFile() calls hadoopFile() where:
 val confBroadcast = broadcast(new
 SerializableConfiguration(hadoopConfiguration))

 You can set the parameter in sc.hadoopConfiguration before calling
 sc.sequenceFile().

 Cheers

 On Sun, Jun 28, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I can do this

 val hadoopConf = new Configuration(sc.hadoopConfiguration)

 *hadoopConf.set(mapreduce.input.fileinputformat.split.maxsize,
 67108864)*

 sc.newAPIHadoopFile(

   path + /*.avro,

   classOf[AvroKeyInputFormat[GenericRecord]],

   classOf[AvroKey[GenericRecord]],

   classOf[NullWritable],

   hadoopConf)


 But i cant do the same with

 sc.sequenceFile(path, classOf[Text], classOf[Text])
 How can i achieve the same with sequenceFile
 --
 Deepak





 --
 Deepak




Re: Fine control with sc.sequenceFile

2015-06-28 Thread Ted Yu
sequenceFile() calls hadoopFile() where:
val confBroadcast = broadcast(new
SerializableConfiguration(hadoopConfiguration))

You can set the parameter in sc.hadoopConfiguration before calling
sc.sequenceFile().

Cheers

On Sun, Jun 28, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I can do this

 val hadoopConf = new Configuration(sc.hadoopConfiguration)

 *hadoopConf.set(mapreduce.input.fileinputformat.split.maxsize,
 67108864)*

 sc.newAPIHadoopFile(

   path + /*.avro,

   classOf[AvroKeyInputFormat[GenericRecord]],

   classOf[AvroKey[GenericRecord]],

   classOf[NullWritable],

   hadoopConf)


 But i cant do the same with

 sc.sequenceFile(path, classOf[Text], classOf[Text])
 How can i achieve the same with sequenceFile
 --
 Deepak




Re: Spark FP-Growth algorithm for frequent sequential patterns

2015-06-28 Thread Xiangrui Meng
Hi Ping,

FYI, we just merged Feynman's PR:
https://github.com/apache/spark/pull/6997 that adds sequential pattern
support. Please check out master branch and help test. Thanks!

Best,
Xiangrui

On Wed, Jun 24, 2015 at 2:16 PM, Feynman Liang fli...@databricks.com wrote:
 There is a JIRA for this which I just submitted a PR for :)

 On Tue, Jun 23, 2015 at 6:09 PM, Xiangrui Meng men...@gmail.com wrote:

 This is on the wish list for Spark 1.5. Assuming that the items from
 the same transaction are distinct. We can still follow FP-Growth's
 steps:

 1. find frequent items
 2. filter transactions and keep only frequent items
 3. do NOT order by frequency
 4. use suffix to partition the transactions (whether to use prefix or
 suffix doesn't really matter in this case)
 5. grow FP-tree locally on each partition (the data structure should
 be the same)
 6. generate frequent sub-sequences

 +Feynman

 Best,
 Xiangrui

 On Fri, Jun 19, 2015 at 10:51 AM, ping yan sharon...@gmail.com wrote:
  Hi,
 
  I have a use case where I'd like to mine frequent sequential patterns
  (consider the clickpath scenario). Transaction A - B doesn't equal
  Transaction B-A..
 
  From what I understand about FP-growth in general and the MLlib
  implementation of it, the orders are not preserved. Anyone can provide
  some
  insights or ideas in extending the algorithm to solve frequent
  sequential
  pattern mining problems?
 
  Thanks as always.
 
 
  Ping
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org