Re: How to keep a SQLContext instance alive in a spark streaming application's life cycle?

2015-06-09 Thread drarse
Why? I  tried  this solution and works fine.

El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] 
ml-node+s1001560n23218...@n3.nabble.com escribió:

 Hi drarse, thanks for replying, the way you said use a singleton object
 does not work




 在 2015-06-09 16:24:25,drarse [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23218i=0 写道:

 The best way is create a singleton object like:

 object SQLContextSingleton {
   @transient private var instance: SQLContext = null

   // Instantiate SQLContext on demand
   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
 if (instance == null) {
   instance = new SQLContext(sparkContext)
 }
 instance
   }}

  You have more information in the programming guide:

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations



 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23216i=0:

 I used SQLContext in a spark streaming application as blew:

 

 case class topic_name (f1: Int, f2: Int)

 val sqlContext = new SQLContext(sc)
 @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
 ssc.checkpoint(.)
 val theDStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name))

 theDStream.map(x = x._2).foreach { rdd =
   sqlContext.jsonRDD(newsIdRDD).registerTempTable(topic_name)
   sqlContext.sql(select count(*) from topic_name).foreach { x =
 WriteToFile(file_path, x(0).toString)
   }
 }

 ssc.start()
 ssc.awaitTermination()
 


 I found i could only get every 5 seconds's count of message, because The
 lifetime of this temporary table is tied to the SQLContext that was used to
 create this DataFrame, i guess every 5 seconds, a new sqlContext will be
 create and the temporary table can only alive just 5 seconds, i want to the
 sqlContext and the temporary table alive all the streaming application's
 life cycle, how to do it?

 Thanks~

 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=23216i=1
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23216.html
  To unsubscribe from How to keep a SQLContext instance alive in a spark
 streaming application's life cycle?, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23218.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 javascript:_e(%7B%7D,'cvml','ml-node%2bs1001560n1...@n3.nabble.com');
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA==
 .
 NAML
 

RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Shao, Saisai
The shuffle data can be deleted through weak reference mechanism, you could 
check the code of ContextCleaner, also you could trigger a full gc manually 
with JVisualVM or some other tools to see if shuffle files are deleted.

Thanks
Jerry

From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, June 9, 2015 5:28 PM
To: Shao, Saisai; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl


Jerry, I agree with you.



However, in my case, I kept the monitoring the blockmanager folder. I do see 
sometimes the number of files decreased, but the folder's size kept increasing.



And below is a screenshot of the folder. You can see some old files are not 
deleted somehow.



[cid:image001.jpg@01D0A2DB.739904D0]



-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Tuesday, June 09, 2015 4:33 PM
To: Haopu Wang; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl



From the stack I think this problem may be due to the deletion of broadcast 
variable, as you set the spark.cleaner.ttl, so after this timeout limit, the 
old broadcast variable will be deleted,  you will meet this exception when you 
want to use it again after that time limit.



Basically I think you don't need to use this configuration, Spark Streaming 
will automatically delete the old, unused data, also Spark itself will delete 
this metadata using weak reference. Also this configuration will be deprecated 
in the coming release.





Thanks

Jerry



-Original Message-

From: Haopu Wang [mailto:hw...@qilinsoft.com]

Sent: Tuesday, June 9, 2015 3:30 PM

To: user

Subject: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl



When I ran a spark streaming application longer, I noticed the local 
directory's size was kept increasing.



I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.



The spark streaming batch duration is 10 seconds and checkpoint duration is 10 
minutes.



The setting took effect but after that, below exception happened.



Do you have any idea about this error? Thank you!







15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 
27045, host2): java.io.IOException:

org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of

broadcast_82

at

org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)

at

org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr

oadcast.scala:164)

at

org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro

adcast.scala:64)

at

org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal

a:64)

at

org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc

ala:87)

at

org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter

.scala:202)

at

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.

scala:56)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6

8)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4

1)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav

a:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja

va:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Failed to get

broadcast_82_piece0 of broadcast_82

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at


Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]

2015-06-09 Thread amit tewari
Thanks Akhil,Mark for your valuable comments.
Problem resolved.

AT


On Tue, Jun 9, 2015 at 2:17 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 ​I think Yes, as the documentation says  Creates tuples of the elements
 in this RDD by applying f. ​


 Thanks
 Best Regards

 On Tue, Jun 9, 2015 at 1:54 PM, amit tewari amittewar...@gmail.com
 wrote:

 Actually the question was will keyBy() take accept multiple fields (eg
 x(0), x(1)) as Key?


 On Tue, Jun 9, 2015 at 1:07 PM, amit tewari amittewar...@gmail.com
 wrote:

 Thanks Akhil, as you suggested, I have to go keyBy(route) as need the
 columns intact.
 But wil keyBy() take accept multiple fields (eg x(0), x(1))?

 Thanks
 Amit

 On Tue, Jun 9, 2015 at 12:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Try this way:

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3)))
 scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3)))

 scala input11.join(input22).take(10)


 PairFunctions basically requires RDD[K,V] and in your case its
 ((String, String), String, String). You can also look in keyBy if you don't
 want to concatenate your keys.

 Thanks
 Best Regards

 On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com
 wrote:

 Hi Dear Spark Users

 I am very new to Spark/Scala.

 Am using Datastax (4.7/Spark 1.2.1) and struggling with following
 error/issue.

 Already tried options like import org.apache.spark.SparkContext._ or
 explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions.
 But error not resolved.

 Help much appreciated.

 Thanks
 AT

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3)))
 scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3)))

  scala input11.join(input22).take(10)

 console:66: error: value join is not a member of
 org.apache.spark.rdd.RDD[((String, String), String, String)]

   input11.join(input22).take(10)













Re: Re: Re: How to decrease the time of storing block in memory

2015-06-09 Thread Akhil Das
Hi 罗辉

I think you interpret the logs wrong.

Your program actually runs from this point: (Rest of them are just starting
up stuffs and connecting)

15/06/08 16:14:22 INFO broadcast.TorrentBroadcast: Started reading
broadcast variable 0
15/06/08 16:14:23 INFO storage.MemoryStore: ensureFreeSpace(1561) called
with curMem=0, maxMem=370503843
15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 1561.0 B, free 353.3 MB)
15/06/08 16:14:23 INFO storage.BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/06/08 16:14:23 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 0 took 967 ms
15/06/08 16:14:23 INFO storage.MemoryStore: ensureFreeSpace(2168) called
with curMem=1561, maxMem=370503843
15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 2.1 KB, free 353.3 MB)

= At this point it has already stored the broadcast piece in memory. And
starts your Task 0

15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0
(TID 0). 693 bytes result sent to driver

= It took 19s to finish your Task 0, and starts Task 1 from this point

15/06/08 16:14:42 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 1
15/06/08 16:14:42 INFO executor.Executor: Running task 1.0 in stage 0.0
(TID 1)
15/06/08 16:14:56 INFO executor.Executor: Finished task 1.0 in stage 0.0
(TID 1). 693 bytes result sent to driver
15/06/08 16:14:56 INFO executor.CoarseGrainedExecutorBackend: Driver
commanded a shutdown


Now, to speed up things you need to obtain parallelism (at least 2-3 times
the number of cores you have), which could mean that your sort.sh is
running on a single core. You can perhaps instead of triggering an external
command try to do the operation within spark itself, in that way you can
always control the parallelism and stuffs. Hope it helps.



Thanks
Best Regards

On Tue, Jun 9, 2015 at 3:00 PM, luohui20...@sina.com wrote:

 hi akhil

 Not exactly ,the task took 54s to finish, started from 16:14:02 and ended
 at 16:14:56.

 within this 54s , it needs 19s to store value in memory, which started
 from 16:14:23 and ended at 16:14:42. I think this is the most time-wasting
 part of this task ,also unreasonable.

 You may check the log attached in previous mail.


 and here is my codes:


 import org.apache.spark._

 object GeneCompare3 {
   def main(args: Array[String]) {
 //i:piece number, j:user number
 val i = args(0).toInt
 val j = args(1)
 val conf = new SparkConf().setAppName(CompareGenePiece  + i +  of
 User  + j).setMaster(spark://slave3:7077).set(spark.executor.memory,
 2g)

 val sc = new SparkContext(conf)


 println(start to compare gene)
 val runmodifyshell2 = List(run, sort.sh)
 val runmodifyshellRDD2 = sc.makeRDD(runmodifyshell2)
 val pipeModify2 = runmodifyshellRDD2.pipe(sh /opt/sh/bin/sort.sh
 /opt/data/shellcompare/db/chr + i + .txt
 /opt/data/shellcompare/data/user + j + /pgs/sample/samplechr + i + .txt
 /opt/data/shellcompare/data/user + j + /pgs/intermediateResult/result +
 i + .txt 600)

 pipeModify2.collect()

 sc.stop()
   }
 }


 

 Thanksamp;Best regards!
 San.Luo

 - 原始邮件 -
 发件人:Akhil Das ak...@sigmoidanalytics.com
 收件人:罗辉 luohui20...@sina.com
 抄送人:user user@spark.apache.org
 主题:Re: Re: How to decrease the time of storing block in memory
 日期:2015年06月09日 16点51分

 Is it that task taking 19s? It won't be simply taking 19s to store 2KB of
 data into memory there could be other operations happening too (the
 transformations that you are doing), It would be good if you can paste the
 code snippet that you are running to have a better understanding.

 Thanks
 Best Regards

 On Tue, Jun 9, 2015 at 2:09 PM, luohui20...@sina.com wrote:

 Only 1 minor GC, 0.07s.


 

 Thanksamp;Best regards!
 San.Luo

 - 原始邮件 -
 发件人:Akhil Das ak...@sigmoidanalytics.com
 收件人:罗辉 luohui20...@sina.com
 抄送人:user user@spark.apache.org
 主题:Re: How to decrease the time of storing block in memory
 日期:2015年06月09日 15点02分

 May be you should check in your driver UI and see if there's any GC time
 involved etc.

 Thanks
 Best Regards

 On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote:

 hi there

   I am trying to descrease my app's running time in worker node. I
 checked the log and found the most time-wasting part is below:

 15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 2.1 KB, free 353.3 MB)
 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0
 (TID 0). 693 bytes result sent to driver


 I don't know why it needs 19s to storing 2.1KB size data to memory. Is
 there any tuning method?



 The attache is the full log, here it is:

 15/06/08 16:14:02 INFO executor.CoarseGrainedExecutorBackend: Registered
 signal handlers for [TERM, HUP, INT]
 15/06/08 16:14:07 WARN 

回复:Re: Re: How to decrease the time of storing block in memory

2015-06-09 Thread luohui20001
hi akhil
Not exactly ,the task took 54s to finish, started from 16:14:02 and ended at 
16:14:56. 
within this 54s , it needs 19s to store value in memory, which started from 
16:14:23 and ended at 16:14:42. I think this is the most time-wasting part of 
this task ,also unreasonable.You may check the log attached in previous mail.
and here is my codes:

import org.apache.spark._

object GeneCompare3 {
  def main(args: Array[String]) {
//i:piece number, j:user number
val i = args(0).toInt
val j = args(1)
val conf = new SparkConf().setAppName(CompareGenePiece  + i +  of User  
+ j).setMaster(spark://slave3:7077).set(spark.executor.memory, 2g)
val sc = new SparkContext(conf)
println(start to compare gene)
val runmodifyshell2 = List(run, sort.sh)
val runmodifyshellRDD2 = sc.makeRDD(runmodifyshell2)
val pipeModify2 = runmodifyshellRDD2.pipe(sh /opt/sh/bin/sort.sh 
/opt/data/shellcompare/db/chr + i + .txt /opt/data/shellcompare/data/user + 
j + /pgs/sample/samplechr + i + .txt /opt/data/shellcompare/data/user + j + 
/pgs/intermediateResult/result + i + .txt 600)
pipeModify2.collect()sc.stop()
  }
}



 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das ak...@sigmoidanalytics.com
收件人:罗辉 luohui20...@sina.com
抄送人:user user@spark.apache.org
主题:Re: Re: How to decrease the time of storing block in memory
日期:2015年06月09日 16点51分

Is it that task taking 19s? It won't be simply taking 19s to store 2KB of data 
into memory there could be other operations happening too (the transformations 
that you are doing), It would be good if you can paste the code snippet that 
you are running to have a better understanding.ThanksBest Regards

On Tue, Jun 9, 2015 at 2:09 PM,  luohui20...@sina.com wrote:
Only 1 minor GC, 0.07s. 




 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das ak...@sigmoidanalytics.com
收件人:罗辉 luohui20...@sina.com
抄送人:user user@spark.apache.org
主题:Re: How to decrease the time of storing block in memory
日期:2015年06月09日 15点02分

May be you should check in your driver UI and see if there's any GC time 
involved etc. ThanksBest Regards

On Mon, Jun 8, 2015 at 5:45 PM,  luohui20...@sina.com wrote:
hi there  I am trying to descrease my app's running time in worker node. I 
checked the log and found the most time-wasting part is below:15/06/08 16:14:23 
INFO storage.MemoryStore: Block broadcast_0 stored as values in memory 
(estimated size 2.1 KB, free 353.3 MB)
15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 
0). 693 bytes result sent to driver

I don't know why it needs 19s to storing 2.1KB size data to memory. Is there 
any tuning method?

The attache is the full log, here it is:15/06/08 16:14:02 INFO 
executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, 
HUP, INT]
15/06/08 16:14:07 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
15/06/08 16:14:10 INFO spark.SecurityManager: Changing view acls to: root
15/06/08 16:14:10 INFO spark.SecurityManager: Changing modify acls to: root
15/06/08 16:14:10 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); users with 
modify permissions: Set(root)
15/06/08 16:14:14 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/06/08 16:14:14 INFO Remoting: Starting remoting
15/06/08 16:14:15 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://driverPropsFetcher@slave5:54684]
15/06/08 16:14:15 INFO util.Utils: Successfully started service 
'driverPropsFetcher' on port 54684.
15/06/08 16:14:16 INFO spark.SecurityManager: Changing view acls to: root
15/06/08 16:14:16 INFO spark.SecurityManager: Changing modify acls to: root
15/06/08 16:14:16 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); users with 
modify permissions: Set(root)
15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Shutting down remote daemon.
15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote 
daemon shut down; proceeding with flushing remote transports.
15/06/08 16:14:16 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/06/08 16:14:16 INFO Remoting: Starting remoting
15/06/08 16:14:17 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Remoting shut down.
15/06/08 16:14:17 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkExecutor@slave5:49169]
15/06/08 16:14:17 INFO util.Utils: Successfully started service 'sparkExecutor' 
on port 49169.
15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to MapOutputTracker: 
akka.tcp://sparkDriver@slave5:58630/user/MapOutputTracker
15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to BlockManagerMaster: 
akka.tcp://sparkDriver@slave5:58630/user/BlockManagerMaster
15/06/08 

RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Haopu Wang
Jerry, I agree with you.

 

However, in my case, I kept the monitoring the blockmanager folder. I
do see sometimes the number of files decreased, but the folder's size
kept increasing.

 

And below is a screenshot of the folder. You can see some old files are
not deleted somehow.

 

 

 

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Tuesday, June 09, 2015 4:33 PM
To: Haopu Wang; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting
spark.cleaner.ttl

 

From the stack I think this problem may be due to the deletion of
broadcast variable, as you set the spark.cleaner.ttl, so after this
timeout limit, the old broadcast variable will be deleted,  you will
meet this exception when you want to use it again after that time limit.

 

Basically I think you don't need to use this configuration, Spark
Streaming will automatically delete the old, unused data, also Spark
itself will delete this metadata using weak reference. Also this
configuration will be deprecated in the coming release.

 

Thanks

Jerry

 

-Original Message-

From: Haopu Wang [mailto:hw...@qilinsoft.com] 

Sent: Tuesday, June 9, 2015 3:30 PM

To: user

Subject: [SparkStreaming 1.3.0] Broadcast failure after setting
spark.cleaner.ttl

 

When I ran a spark streaming application longer, I noticed the local
directory's size was kept increasing.

 

I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

 

The spark streaming batch duration is 10 seconds and checkpoint duration
is 10 minutes.

 

The setting took effect but after that, below exception happened.

 

Do you have any idea about this error? Thank you!

 



 

15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
(TID 27045, host2): java.io.IOException:

org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of

broadcast_82

at

org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)

at

org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr

oadcast.scala:164)

at

org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro

adcast.scala:64)

at

org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal

a:64)

at

org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc

ala:87)

at

org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter

.scala:202)

at

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.

scala:56)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6

8)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4

1)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav

a:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja

va:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Failed to get

broadcast_82_piece0 of broadcast_82

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at scala.Option.getOrElse(Option.scala:120)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc

ala:136)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br


Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Daniel Darabos
It would be even faster to load the data on the driver and sort it there
without using Spark :). Using reduce() is cheating, because it only works
as long as the data fits on one machine. That is not the targeted use case
of a distributed computation system. You can repeat your test with more
data (that doesn't fit on one machine) to see what I mean.

On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com wrote:

 For a research project, I tried sorting the elements in an RDD. I did this
 in
 two different approaches.

 In the first method, I applied a mapPartitions() function on the RDD, so
 that it would sort the contents of the RDD, and provide a result RDD that
 contains the sorted list as the only record in the RDD. Then, I applied a
 reduce function which basically merges sorted lists.

 I ran these experiments on an EC2 cluster containing 30 nodes. I set it up
 using the spark ec2 script. The data file was stored in HDFS.

 In the second approach I used the sortBy method in Spark.

 I performed these operation on the US census data(100MB) found here

 A single lines looks like this

 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not
 in universe or children, Not in universe, White, All other, Female, Not in
 universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not
 in universe, Not in universe, Child 18 never marr not in subfamily, Child
 under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in
 universe, 0, Both parents present, United-States, United-States,
 United-States, Native- Born in the United States, 0, Not in universe, 0, 0,
 94, - 5.
 I sorted based on the 25th value in the CSV. In this line that is 1758.14.

 I noticed that sortBy performs worse than the other method. Is this the
 expected scenario? If it is, why wouldn't the mapPartitions() and reduce()
 be the default sorting approach?

 Here is my implementation

 public static void sortBy(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 long start = System.currentTimeMillis();
 rdd.sortBy(new FunctionString, Double(){

 @Override
 public Double call(String v1) throws Exception {
   // TODO Auto-generated method stub
   String [] arr = v1.split(,);
   return Double.parseDouble(arr[24]);
 }
 }, true, 9).collect();
 long end = System.currentTimeMillis();
 System.out.println(SortBy:  + (end - start));
   }

 public static void sortList(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l,
 8);
 long start = System.currentTimeMillis();
 JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 =
 rdd.mapPartitions(new FlatMapFunctionIteratorlt;String,
 LinkedListTuple2lt;Double, String(){

 @Override
 public IterableLinkedListlt;Tuple2lt;Double, String
 call(IteratorString t)
 throws Exception {
   // TODO Auto-generated method stub
   LinkedListTuple2lt;Double, String lines = new
 LinkedListTuple2lt;Double, String();
   while(t.hasNext()){
 String s = t.next();
 String arr1[] = s.split(,);
 Tuple2Double, String t1 = new Tuple2Double,
 String(Double.parseDouble(arr1[24]),s);
 lines.add(t1);
   }
   Collections.sort(lines, new IncomeComparator());
   LinkedListLinkedListlt;Tuple2lt;Double, String list = new
 LinkedListLinkedListlt;Tuple2lt;Double, String();
   list.add(lines);
   return list;
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: RDD of RDDs

2015-06-09 Thread kiran lonikar
Possibly in future, if and when spark architecture allows workers to launch
spark jobs (the functions passed to transformation or action APIs of RDD),
it will be possible to have RDD of RDD.

On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar loni...@gmail.com wrote:

 Simillar question was asked before:
 http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html

 Here is one of the reasons why I think RDD[RDD[T]] is not possible:

- RDD is only a handle to the actual data partitions. It has a
reference/pointer to the *SparkContext* object (*sc*) and a list of
partitions.
- The *SparkContext *is an object in the Spark Application/Driver
Program's JVM. Similarly, the list of partitions is also in the JVM of the
driver program. Each partition contains kind of remote references to the
partition data on the worker JVMs.
- The functions passed to RDD's transformations and actions execute in
the worker's JVMs on different nodes. For example, in *rdd.map { x =
x*x }*, the function performing *x*x* runs on the JVMs of the
worker nodes where the partitions of the RDD reside. These JVMs do not have
access to the *sc* since its only on the driver's JVM.
- Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd =
innerRDD.filter { x = x*x } }*, the worker nodes will not be able to
execute the *filter* on *innerRDD *as the code in the worker does not
have access to sc and can not launch a spark job.


 Hope it helps. You need to consider List[RDD] or some other collection.

 -Kiran

 On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote:

 Hi,


 The problem I am looking at is as follows:

 - I read in a log file of multiple users as a RDD

 - I'd like to group the above RDD into *multiple RDDs* by userIds (the
 key)

 - my processEachUser() function then takes in each RDD mapped into
 each individual user, and calls for RDD.map or DataFrame operations on
 them. (I already had the function coded, I am therefore reluctant to work
 with the ResultIterable object coming out of rdd.groupByKey() ... )

 I've searched the mailing list and googled on RDD of RDDs and seems
 like it isn't a thing at all.

 A few choices left seem to be: 1) groupByKey() and then work with the
 ResultIterable object; 2) groupbyKey() and then write each group into a
 file, and read them back as individual rdds to process..

 Anyone got a better idea or had a similar problem before?


 Thanks!
 Ping






 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721





Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException

2015-06-09 Thread Jeroen Vlek
Hi,

I posted a question with regards to Phoenix and Spark Streaming on 
StackOverflow [1]. Please find a copy of the question to this email below the 
first stack trace. I also already contacted the Phoenix mailing list and tried 
the suggestion of setting spark.driver.userClassPathFirst. Unfortunately that 
only pushed me further into the dependency hell, which I tried to resolve 
until I hit a wall with an UnsatisfiedLinkError on Snappy.

What I am trying to achieve: To save a stream from Kafka into  Phoenix/Hbase 
via Spark Streaming. I'm using MapR as a platform and the original exception 
happens both on a 3-node cluster, as on the MapR Sandbox (a VM for 
experimentation), in YARN and stand-alone mode. Further experimentation (like 
the saveAsNewHadoopApiFile below), was done only on the sandbox in standalone 
mode.

Phoenix only supports Spark from 4.4.0 onwards, but I thought I could 
use a naive implementation that creates a new connection for 
every RDD from the DStream in 4.3.1.  This resulted in the 
ClassNotFoundException described in [1], so I switched to 4.4.0.

Unfortunately the saveToPhoenix method is only available in Scala. So I did 
find the suggestion to try it via the saveAsNewHadoopApiFile method [2] and an 
example implementation [3], which I adapted to my own needs. 

However, 4.4.0 + saveAsNewHadoopApiFile  raises the same 
ClassNotFoundExeption, just a slightly different stacktrace:

  java.lang.RuntimeException: java.sql.SQLException: ERROR 103 
(08004): Unable to establish connection.
at 
org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:995)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to 
establish connection.
at 
org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386)
at 
org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1881)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860)
at 
org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860)
at 
org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
at 
org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131)
at 
org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
at 
java.sql.DriverManager.getConnection(DriverManager.java:571)
at 
java.sql.DriverManager.getConnection(DriverManager.java:187)
at 
org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92)
at 
org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80)
at 
org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68)
at 
org.apache.phoenix.mapreduce.PhoenixRecordWriter.init(PhoenixRecordWriter.java:49)
at 
org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:55)
... 8 more
Caused by: java.io.IOException: 
java.lang.reflect.InvocationTargetException
at 
org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
at 
org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
at 
org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:286)
... 23 more
Caused by: java.lang.reflect.InvocationTargetException
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method)
at 

Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]

2015-06-09 Thread Akhil Das
​I think Yes, as the documentation says  Creates tuples of the elements in
this RDD by applying f. ​


Thanks
Best Regards

On Tue, Jun 9, 2015 at 1:54 PM, amit tewari amittewar...@gmail.com wrote:

 Actually the question was will keyBy() take accept multiple fields (eg
 x(0), x(1)) as Key?


 On Tue, Jun 9, 2015 at 1:07 PM, amit tewari amittewar...@gmail.com
 wrote:

 Thanks Akhil, as you suggested, I have to go keyBy(route) as need the
 columns intact.
 But wil keyBy() take accept multiple fields (eg x(0), x(1))?

 Thanks
 Amit

 On Tue, Jun 9, 2015 at 12:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Try this way:

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3)))
 scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3)))

 scala input11.join(input22).take(10)


 PairFunctions basically requires RDD[K,V] and in your case its ((String,
 String), String, String). You can also look in keyBy if you don't want to
 concatenate your keys.

 Thanks
 Best Regards

 On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com
 wrote:

 Hi Dear Spark Users

 I am very new to Spark/Scala.

 Am using Datastax (4.7/Spark 1.2.1) and struggling with following
 error/issue.

 Already tried options like import org.apache.spark.SparkContext._ or
 explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions.
 But error not resolved.

 Help much appreciated.

 Thanks
 AT

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3)))
 scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3)))

  scala input11.join(input22).take(10)

 console:66: error: value join is not a member of
 org.apache.spark.rdd.RDD[((String, String), String, String)]

   input11.join(input22).take(10)












Re: RDD of RDDs

2015-06-09 Thread kiran lonikar
Simillar question was asked before:
http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html

Here is one of the reasons why I think RDD[RDD[T]] is not possible:

   - RDD is only a handle to the actual data partitions. It has a
   reference/pointer to the *SparkContext* object (*sc*) and a list of
   partitions.
   - The *SparkContext *is an object in the Spark Application/Driver
   Program's JVM. Similarly, the list of partitions is also in the JVM of the
   driver program. Each partition contains kind of remote references to the
   partition data on the worker JVMs.
   - The functions passed to RDD's transformations and actions execute in
   the worker's JVMs on different nodes. For example, in *rdd.map { x =
   x*x }*, the function performing *x*x* runs on the JVMs of the worker
   nodes where the partitions of the RDD reside. These JVMs do not have access
   to the *sc* since its only on the driver's JVM.
   - Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd =
   innerRDD.filter { x = x*x } }*, the worker nodes will not be able to
   execute the *filter* on *innerRDD *as the code in the worker does not
   have access to sc and can not launch a spark job.


Hope it helps. You need to consider List[RDD] or some other collection.

-Kiran

On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote:

 Hi,


 The problem I am looking at is as follows:

 - I read in a log file of multiple users as a RDD

 - I'd like to group the above RDD into *multiple RDDs* by userIds (the
 key)

 - my processEachUser() function then takes in each RDD mapped into
 each individual user, and calls for RDD.map or DataFrame operations on
 them. (I already had the function coded, I am therefore reluctant to work
 with the ResultIterable object coming out of rdd.groupByKey() ... )

 I've searched the mailing list and googled on RDD of RDDs and seems like
 it isn't a thing at all.

 A few choices left seem to be: 1) groupByKey() and then work with the
 ResultIterable object; 2) groupbyKey() and then write each group into a
 file, and read them back as individual rdds to process..

 Anyone got a better idea or had a similar problem before?


 Thanks!
 Ping






 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721




Re: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Benjamin Fradet
Hi,

Are you restarting your Spark streaming context through getOrCreate?
On 9 Jun 2015 09:30, Haopu Wang hw...@qilinsoft.com wrote:

 When I ran a spark streaming application longer, I noticed the local
 directory's size was kept increasing.

 I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

 The spark streaming batch duration is 10 seconds and checkpoint duration
 is 10 minutes.

 The setting took effect but after that, below exception happened.

 Do you have any idea about this error? Thank you!

 

 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
 (TID 27045, host2): java.io.IOException:
 org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
 broadcast_82
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
 oadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
 adcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
 a:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
 ala:87)
 at
 org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
 at
 org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
 $3.apply(HashmapEnrichDStream.scala:39)
 at
 org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
 $3.apply(HashmapEnrichDStream.scala:39)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
 .scala:202)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
 scala:56)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
 8)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
 1)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
 a:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
 va:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.spark.SparkException: Failed to get
 broadcast_82_piece0 of broadcast_82
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
 .scala:137)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
 .scala:137)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
 ala:136)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
 orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
 1.apply(TorrentBroadcast.scala:174)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
 ... 25 more

 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4
 times; aborting job
 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job
 143382585 ms.0





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Rdd of Rdds

2015-06-09 Thread lonikar
Replicating my answer to another question asked today:

Here is one of the reasons why I think RDD[RDD[T]] is not possible:
   * RDD is only a handle to the actual data partitions. It has a
reference/pointer to the /SparkContext /object (/sc/) and a list of
partitions.
   * The SparkContext is an object in the Spark Application/Driver Program's
JVM. Similarly, the list of partitions is also in the JVM of the driver
program. Each partition contains kind of remote references to the
partition data on the worker JVMs.
   * The functions passed to RDD's transformations and actions execute in
the worker's JVMs on different nodes. For example, in *rdd.map { x = x*x
}*, the function performing *x*x* runs on the JVMs of the worker nodes
where the partitions of the RDD reside. These JVMs do not have access to the
sc since its only on the driver's JVM.
   * Thus, in case of your RDD of RDD: *outerRDD.map { innerRdd =
innerRDD.filter { x = x*x } }*, the worker nodes will not be able to
execute the filter on innerRDD as the code in the worker does not have
access to sc and can not launch a spark job.

Hope it helps. You need to consider List[RDD] or some other collection.

Possibly in future, if and when spark architecture allows workers to launch
spark jobs (the functions passed to transformation or action APIs of RDD),
it will be possible to have RDD of RDD.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-tp17025p23217.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Shao, Saisai
From the stack I think this problem may be due to the deletion of broadcast 
variable, as you set the spark.cleaner.ttl, so after this timeout limit, the 
old broadcast variable will be deleted,  you will meet this exception when you 
want to use it again after that time limit.

Basically I think you don't need to use this configuration, Spark Streaming 
will automatically delete the old, unused data, also Spark itself will delete 
this metadata using weak reference. Also this configuration will be deprecated 
in the coming release.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, June 9, 2015 3:30 PM
To: user
Subject: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl

When I ran a spark streaming application longer, I noticed the local 
directory's size was kept increasing.

I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

The spark streaming batch duration is 10 seconds and checkpoint duration is 10 
minutes.

The setting took effect but after that, below exception happened.

Do you have any idea about this error? Thank you!



15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 
27045, host2): java.io.IOException:
org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
broadcast_82
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
oadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
adcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
a:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
ala:87)
at
org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
8)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
1)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_82_piece0 of broadcast_82
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
ala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
1.apply(TorrentBroadcast.scala:174)
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
... 25 more

15/06/09 12:57:30 ERROR 

回复:Re: How to decrease the time of storing block in memory

2015-06-09 Thread luohui20001
Only 1 minor GC, 0.07s. 




 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das ak...@sigmoidanalytics.com
收件人:罗辉 luohui20...@sina.com
抄送人:user user@spark.apache.org
主题:Re: How to decrease the time of storing block in memory
日期:2015年06月09日 15点02分

May be you should check in your driver UI and see if there's any GC time 
involved etc. ThanksBest Regards

On Mon, Jun 8, 2015 at 5:45 PM,  luohui20...@sina.com wrote:
hi there  I am trying to descrease my app's running time in worker node. I 
checked the log and found the most time-wasting part is below:15/06/08 16:14:23 
INFO storage.MemoryStore: Block broadcast_0 stored as values in memory 
(estimated size 2.1 KB, free 353.3 MB)
15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 
0). 693 bytes result sent to driver

I don't know why it needs 19s to storing 2.1KB size data to memory. Is there 
any tuning method?

The attache is the full log, here it is:15/06/08 16:14:02 INFO 
executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, 
HUP, INT]
15/06/08 16:14:07 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
15/06/08 16:14:10 INFO spark.SecurityManager: Changing view acls to: root
15/06/08 16:14:10 INFO spark.SecurityManager: Changing modify acls to: root
15/06/08 16:14:10 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); users with 
modify permissions: Set(root)
15/06/08 16:14:14 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/06/08 16:14:14 INFO Remoting: Starting remoting
15/06/08 16:14:15 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://driverPropsFetcher@slave5:54684]
15/06/08 16:14:15 INFO util.Utils: Successfully started service 
'driverPropsFetcher' on port 54684.
15/06/08 16:14:16 INFO spark.SecurityManager: Changing view acls to: root
15/06/08 16:14:16 INFO spark.SecurityManager: Changing modify acls to: root
15/06/08 16:14:16 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); users with 
modify permissions: Set(root)
15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Shutting down remote daemon.
15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote 
daemon shut down; proceeding with flushing remote transports.
15/06/08 16:14:16 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/06/08 16:14:16 INFO Remoting: Starting remoting
15/06/08 16:14:17 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Remoting shut down.
15/06/08 16:14:17 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkExecutor@slave5:49169]
15/06/08 16:14:17 INFO util.Utils: Successfully started service 'sparkExecutor' 
on port 49169.
15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to MapOutputTracker: 
akka.tcp://sparkDriver@slave5:58630/user/MapOutputTracker
15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to BlockManagerMaster: 
akka.tcp://sparkDriver@slave5:58630/user/BlockManagerMaster
15/06/08 16:14:17 INFO storage.DiskBlockManager: Created local directory at 
/tmp/spark-548b4618-4aba-4b63-9467-381fbfea8d5b/spark-83737dd6-46b0-47ee-82a5-5afee46bdbf5/spark-0fe9d8ba-2910-44a2-bf4f-80d179f5d58b/blockmgr-b4884e7a-2527-447a-9fc5-1823d923c2f1
15/06/08 16:14:17 INFO storage.MemoryStore: MemoryStore started with capacity 
353.3 MB
15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to OutputCommitCoordinator: 
akka.tcp://sparkDriver@slave5:58630/user/OutputCommitCoordinator
15/06/08 16:14:17 INFO executor.CoarseGrainedExecutorBackend: Connecting to 
driver: akka.tcp://sparkDriver@slave5:58630/user/CoarseGrainedScheduler
15/06/08 16:14:18 INFO worker.WorkerWatcher: Connecting to worker 
akka.tcp://sparkWorker@slave5:48926/user/Worker
15/06/08 16:14:18 INFO executor.CoarseGrainedExecutorBackend: Successfully 
registered with driver
15/06/08 16:14:18 INFO executor.Executor: Starting executor ID 0 on host slave5
15/06/08 16:14:18 INFO worker.WorkerWatcher: Successfully connected to 
akka.tcp://sparkWorker@slave5:48926/user/Worker
15/06/08 16:14:21 WARN internal.ThreadLocalRandom: Failed to generate a seed 
from SecureRandom within 3 seconds. Not enough entrophy?
15/06/08 16:14:21 INFO netty.NettyBlockTransferService: Server created on 53449
15/06/08 16:14:21 INFO storage.BlockManagerMaster: Trying to register 
BlockManager
15/06/08 16:14:21 INFO storage.BlockManagerMaster: Registered BlockManager
15/06/08 16:14:21 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: 
akka.tcp://sparkDriver@slave5:58630/user/HeartbeatReceiver
15/06/08 16:14:21 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0
15/06/08 16:14:21 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
15/06/08 16:14:21 INFO executor.Executor: Fetching 

Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS

2015-06-09 Thread Akhil Das
You can put a Thread.sleep(10) in the code to have the UI available for
quiet some time. (Put it just before starting any of your transformations)
Or you can enable the spark history server
https://spark.apache.org/docs/latest/monitoring.html too. I believe --jars
https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
would download the dependency jars on all your worker machines (can be
found in spark work dir of your application along with stderr stdout files).

Thanks
Best Regards

On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei dong...@microsoft.com wrote:

  Thanks Akhil:



 The driver fails so fast to get a look at 4040. Is there any other way to
 see the download and ship process of the files?



 Is driver supposed to download these jars from HDFS to some location, then
 ship them to excutors?

 I can see from log that the driver downloaded the application jar but not
 the other jars specified by “—jars”.



 Or I misunderstand the usage of “--jars”, and the jars should be already
 in every worker, driver will not download them?

 Is there some useful docs?



 Thanks

 Dong Lei





 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Tuesday, June 9, 2015 3:24 PM
 *To:* Dong Lei
 *Cc:* user@spark.apache.org
 *Subject:* Re: ClassNotDefException when using spark-submit with multiple
 jars and files located on HDFS



 Once you submits the application, you can check in the driver UI (running
 on port 4040) Environment Tab to see whether those jars you added got
 shipped or not. If they are shipped and still you are getting NoClassDef
 exceptions then it means that you are having a jar conflict which you can
 resolve by putting the jar with the class in it on the top of your
 classpath.


   Thanks

 Best Regards



 On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.com wrote:

  Hi, spark-users:



 I’m using spark-submit to submit multiple jars and files(all in HDFS) to
 run a job, with the following command:



 Spark-submit

   --class myClass

  --master spark://localhost:7077/

   --deploy-mode cluster

   --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar

   --files hdfs://localhost/1.txt, hdfs://localhost/2.txt

  hdfs://localhost/main.jar



 the stderr in the driver showed java.lang.ClassNotDefException for a class
 in 1.jar.



 I checked the log that spark has added these jars:

  INFO SparkContext: Added JAR hdfs:// …1.jar

  INFO SparkContext: Added JAR hdfs:// …2.jar



 In the folder of the driver, I only saw the main.jar is copied to that
 place, *but  the other jars and files were not there*



 Could someone explain *how should I pass the jars and files* needed by
 the main jar to spark?



 If my class in main.jar refer to these files with a relative path, *will
 spark copy these files into one folder*?



 BTW, my class works in a client mode with all jars and files in local.



 Thanks

 Dong Lei





Re: Re: How to decrease the time of storing block in memory

2015-06-09 Thread Akhil Das
Is it that task taking 19s? It won't be simply taking 19s to store 2KB of
data into memory there could be other operations happening too (the
transformations that you are doing), It would be good if you can paste the
code snippet that you are running to have a better understanding.

Thanks
Best Regards

On Tue, Jun 9, 2015 at 2:09 PM, luohui20...@sina.com wrote:

 Only 1 minor GC, 0.07s.


 

 Thanksamp;Best regards!
 San.Luo

 - 原始邮件 -
 发件人:Akhil Das ak...@sigmoidanalytics.com
 收件人:罗辉 luohui20...@sina.com
 抄送人:user user@spark.apache.org
 主题:Re: How to decrease the time of storing block in memory
 日期:2015年06月09日 15点02分

 May be you should check in your driver UI and see if there's any GC time
 involved etc.

 Thanks
 Best Regards

 On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote:

 hi there

   I am trying to descrease my app's running time in worker node. I
 checked the log and found the most time-wasting part is below:

 15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 2.1 KB, free 353.3 MB)
 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0
 (TID 0). 693 bytes result sent to driver


 I don't know why it needs 19s to storing 2.1KB size data to memory. Is
 there any tuning method?



 The attache is the full log, here it is:

 15/06/08 16:14:02 INFO executor.CoarseGrainedExecutorBackend: Registered
 signal handlers for [TERM, HUP, INT]
 15/06/08 16:14:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/06/08 16:14:10 INFO spark.SecurityManager: Changing view acls to: root
 15/06/08 16:14:10 INFO spark.SecurityManager: Changing modify acls to: root
 15/06/08 16:14:10 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(root); users with modify permissions: Set(root)
 15/06/08 16:14:14 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/06/08 16:14:14 INFO Remoting: Starting remoting
 15/06/08 16:14:15 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://driverPropsFetcher@slave5:54684]
 15/06/08 16:14:15 INFO util.Utils: Successfully started service
 'driverPropsFetcher' on port 54684.
 15/06/08 16:14:16 INFO spark.SecurityManager: Changing view acls to: root
 15/06/08 16:14:16 INFO spark.SecurityManager: Changing modify acls to: root
 15/06/08 16:14:16 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(root); users with modify permissions: Set(root)
 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Shutting down remote daemon.
 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Remote daemon shut down; proceeding with flushing remote transports.
 15/06/08 16:14:16 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/06/08 16:14:16 INFO Remoting: Starting remoting
 15/06/08 16:14:17 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Remoting shut down.
 15/06/08 16:14:17 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkExecutor@slave5:49169]
 15/06/08 16:14:17 INFO util.Utils: Successfully started service
 'sparkExecutor' on port 49169.
 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to MapOutputTracker:
 akka.tcp://sparkDriver@slave5:58630/user/MapOutputTracker
 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to BlockManagerMaster:
 akka.tcp://sparkDriver@slave5:58630/user/BlockManagerMaster
 15/06/08 16:14:17 INFO storage.DiskBlockManager: Created local directory
 at
 /tmp/spark-548b4618-4aba-4b63-9467-381fbfea8d5b/spark-83737dd6-46b0-47ee-82a5-5afee46bdbf5/spark-0fe9d8ba-2910-44a2-bf4f-80d179f5d58b/blockmgr-b4884e7a-2527-447a-9fc5-1823d923c2f1
 15/06/08 16:14:17 INFO storage.MemoryStore: MemoryStore started with
 capacity 353.3 MB
 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to
 OutputCommitCoordinator: akka.tcp://sparkDriver@slave5
 :58630/user/OutputCommitCoordinator
 15/06/08 16:14:17 INFO executor.CoarseGrainedExecutorBackend: Connecting
 to driver: akka.tcp://sparkDriver@slave5:58630/user/CoarseGrainedScheduler
 15/06/08 16:14:18 INFO worker.WorkerWatcher: Connecting to worker
 akka.tcp://sparkWorker@slave5:48926/user/Worker
 15/06/08 16:14:18 INFO executor.CoarseGrainedExecutorBackend: Successfully
 registered with driver
 15/06/08 16:14:18 INFO executor.Executor: Starting executor ID 0 on host
 slave5
 15/06/08 16:14:18 INFO worker.WorkerWatcher: Successfully connected to
 akka.tcp://sparkWorker@slave5:48926/user/Worker
 15/06/08 16:14:21 WARN internal.ThreadLocalRandom: Failed to generate a
 seed from SecureRandom within 3 seconds. Not enough entrophy?
 15/06/08 16:14:21 INFO netty.NettyBlockTransferService: Server created on
 53449
 15/06/08 16:14:21 INFO storage.BlockManagerMaster: Trying to register
 BlockManager
 15/06/08 16:14:21 INFO 

RE: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS

2015-06-09 Thread Dong Lei
Thanks Akhil:

The driver fails so fast to get a look at 4040. Is there any other way to see 
the download and ship process of the files?

Is driver supposed to download these jars from HDFS to some location, then ship 
them to excutors?
I can see from log that the driver downloaded the application jar but not the 
other jars specified by “—jars”.

Or I misunderstand the usage of “--jars”, and the jars should be already in 
every worker, driver will not download them?
Is there some useful docs?

Thanks
Dong Lei


From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Tuesday, June 9, 2015 3:24 PM
To: Dong Lei
Cc: user@spark.apache.org
Subject: Re: ClassNotDefException when using spark-submit with multiple jars 
and files located on HDFS

Once you submits the application, you can check in the driver UI (running on 
port 4040) Environment Tab to see whether those jars you added got shipped or 
not. If they are shipped and still you are getting NoClassDef exceptions then 
it means that you are having a jar conflict which you can resolve by putting 
the jar with the class in it on the top of your classpath.

Thanks
Best Regards

On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei 
dong...@microsoft.commailto:dong...@microsoft.com wrote:
Hi, spark-users:

I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a 
job, with the following command:

Spark-submit
  --class myClass
 --master spark://localhost:7077/
  --deploy-mode cluster
  --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar
  --files hdfs://localhost/1.txt, hdfs://localhost/2.txt
 hdfs://localhost/main.jar

the stderr in the driver showed java.lang.ClassNotDefException for a class in 
1.jar.

I checked the log that spark has added these jars:
 INFO SparkContext: Added JAR hdfs:// …1.jar
 INFO SparkContext: Added JAR hdfs:// …2.jar

In the folder of the driver, I only saw the main.jar is copied to that place, 
but  the other jars and files were not there

Could someone explain how should I pass the jars and files needed by the main 
jar to spark?

If my class in main.jar refer to these files with a relative path, will spark 
copy these files into one folder?

BTW, my class works in a client mode with all jars and files in local.

Thanks
Dong Lei



Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]

2015-06-09 Thread amit tewari
Actually the question was will keyBy() take accept multiple fields (eg
x(0), x(1)) as Key?

On Tue, Jun 9, 2015 at 1:07 PM, amit tewari amittewar...@gmail.com wrote:

 Thanks Akhil, as you suggested, I have to go keyBy(route) as need the
 columns intact.
 But wil keyBy() take accept multiple fields (eg x(0), x(1))?

 Thanks
 Amit

 On Tue, Jun 9, 2015 at 12:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Try this way:

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3)))
 scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3)))

 scala input11.join(input22).take(10)


 PairFunctions basically requires RDD[K,V] and in your case its ((String,
 String), String, String). You can also look in keyBy if you don't want to
 concatenate your keys.

 Thanks
 Best Regards

 On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com
 wrote:

 Hi Dear Spark Users

 I am very new to Spark/Scala.

 Am using Datastax (4.7/Spark 1.2.1) and struggling with following
 error/issue.

 Already tried options like import org.apache.spark.SparkContext._ or
 explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions.
 But error not resolved.

 Help much appreciated.

 Thanks
 AT

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3)))
 scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3)))

  scala input11.join(input22).take(10)

 console:66: error: value join is not a member of
 org.apache.spark.rdd.RDD[((String, String), String, String)]

   input11.join(input22).take(10)











Re: Cassandra Submit

2015-06-09 Thread Yasemin Kaya
I couldn't find any solution. I can write but I can't read from Cassandra.

2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Thanks alot Mohammed, Gerard and Yana.
 I can write to table, but exception returns me. It says *Exception in
 thread main java.io.IOException: Failed to open thrift connection to
 Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160*

 In yaml file :
 rpc_address: localhost
 rpc_port: 9160

 And at project :

 .set(spark.cassandra.connection.host, 127.0.0.1)
 .set(spark.cassandra.connection.rpc.port, 9160);

 or

 .set(spark.cassandra.connection.host, localhost)
 .set(spark.cassandra.connection.rpc.port, 9160);

 whatever I write setting,  I get same exception. Any help ??


 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 yes, whatever you put for listen_address in cassandra.yaml. Also, you
 should try to connect to your cassandra cluster via bin/cqlsh to make sure
 you have connectivity before you try to make a a connection via spark.

 On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,
 I run my project on local. How can find ip address of my cassandra host
 ? From cassandra.yaml or ??

 yasemin

 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com:

 ? = ip address of your cassandra host

 On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should I
 change ? Should I change cassandra.yaml ?

 Error says me *Exception in thread main java.io.IOException:
 Failed to open native connection to Cassandra at {127.0.1.1}:9042*

 What should I add *SparkConf sparkConf = new
 SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts,
 true).set(spark.cassandra.connection.host, ?);*

 Best
 yasemin

 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com:

  Check your spark.cassandra.connection.host setting. It should be
 pointing to one of your Cassandra nodes.



 Mohammed



 *From:* Yasemin Kaya [mailto:godo...@gmail.com]
 *Sent:* Friday, June 5, 2015 7:31 AM
 *To:* user@spark.apache.org
 *Subject:* Cassandra Submit



 Hi,



 I am using cassandraDB in my project. I had that error *Exception in
 thread main java.io.IOException: Failed to open native connection to
 Cassandra at {127.0.1.1}:9042*



 I think I have to modify the submit line. What should I add or remove
 when I submit my project?



 Best,

 yasemin





 --

 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç





 --
 hiç ender hiç




-- 
hiç ender hiç


Re: FileOutputCommitter deadlock 1.3.1

2015-06-09 Thread Steve Loughran


On 8 Jun 2015, at 15:55, Richard Marscher 
rmarsc...@localytics.commailto:rmarsc...@localytics.com wrote:

Hi,

we've been seeing occasional issues in production with the FileOutCommitter 
reaching a deadlock situation.

We are writing our data to S3 and currently have speculation enabled. What we 
see is that Spark get's a file not found error trying to access a temporary 
part file that it wrote (part-#2 file it seems to be every time?), so the task 
fails. But the file actually exists in S3 so subsequent speculations and task 
retries all fail because the committer tells them the file exists. This will 
persist until human intervention kills the application. Usually rerunning the 
application will succeed on the next try so it is not deterministic with the 
dataset or anything.

It seems like there isn't a good story yet for file writing and speculation 
(https://issues.apache.org/jira/browse/SPARK-4879), although our error here 
seems worse that reports in that issue since I believe ours deadlocks and those 
don't?

S3 isn't a real filesystem, which is why the hadoop docs say don't turn 
speculation on if your destination is s3n/s3a or swift

http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/introduction.html

1. rename of directories aren't atomic —but rename is exactly speculation uses 
to ensure an atomic commit of one tasks output
2. eventual consistency means that things aren't immediately visible
3. when you create an output file for writing, it's not there until the final 
close(), so other code that looks for the file existing so as to implement any 
locking algorithm is in trouble.

These are fundamental issues which hurt all the bits of Hadoop code that expect 
basic POSIX-ish filesystem behaviours of: files immediately visible on create, 
atomic and O(1) mv and rmdir, efficient seek(), and consistency of Create, 
Update and Delete across the entire cluster.

What to do?
-keep speculation on for output to HDFS; turn it off for s3
-lift the amazon hadoop-aws JAR from EMR (hadoop 2.6+, for earlier versions you 
need the whole of hadoop-common)
-look at netflix s3mper 
http://techblog.netflix.com/2014/01/s3mper-consistency-in-cloud.html
-don't use US East, as it has the worst consistency (it doesn't even support 
create consistency). That may help, but it still won't address non-atomic 
directory rename

I don't know about SPARK-4879, so can't comment on what's happening there, but 
if you are using s3 as your destination it's irrelevant: you will have problems 
if speculation is turned on. That's the same as if you use classic Hadoop MR, 
Tez or others. It's also why s3 isn't supported as a filesystem for things like 
HBase. Hopefully the work on HADOOP-9565 will eventually add some operations to 
enable speculative commits for code written explicitly against it.

-Steve




RE: Spark SQL with Thrift Server is very very slow and finally failing

2015-06-09 Thread Cheng, Hao
Is it the large result set return from the Thrift Server? And can you paste the 
SQL and physical plan?

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Tuesday, June 9, 2015 12:01 PM
To: Sourav Mazumder
Cc: user
Subject: Re: Spark SQL with Thrift Server is very very slow and finally failing

Which Spark release are you using ?

Can you pastebin the stack trace w.r.t. ExecutorLostFailure ?

Thanks

On Mon, Jun 8, 2015 at 8:52 PM, Sourav Mazumder 
sourav.mazumde...@gmail.commailto:sourav.mazumde...@gmail.com wrote:
Hi,
I am trying to run a SQL form a JDBC driver using Spark's Thrift Server.
I'm doing a join between a Hive Table of size around 100 GB and another Hive 
Table with 10 KB, with a filter on a particular column
The query takes more than 45 minutes and then I get ExecutorLostFailure. That 
is because of memory as once I increase the memory the failure happens but 
after a long time.
I'm having executor memory 20 GB, Spark DRiver Memory 2 GB, Executor Instances 
2 and Executor Core 2.
Running the job using Yarn with master as 'yarn-client'.
Any idea if I'm missing any other configuration ?
Regards,
Sourav



Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]

2015-06-09 Thread Akhil Das
Try this way:

scalaval input1 = sc.textFile(/test7).map(line =
line.split(,).map(_.trim));
scalaval input2 = sc.textFile(/test8).map(line =
line.split(,).map(_.trim));
scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3)))
scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3)))

scala input11.join(input22).take(10)


PairFunctions basically requires RDD[K,V] and in your case its ((String,
String), String, String). You can also look in keyBy if you don't want to
concatenate your keys.

Thanks
Best Regards

On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com wrote:

 Hi Dear Spark Users

 I am very new to Spark/Scala.

 Am using Datastax (4.7/Spark 1.2.1) and struggling with following
 error/issue.

 Already tried options like import org.apache.spark.SparkContext._ or
 explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions.
 But error not resolved.

 Help much appreciated.

 Thanks
 AT

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3)))
 scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3)))

  scala input11.join(input22).take(10)

 console:66: error: value join is not a member of
 org.apache.spark.rdd.RDD[((String, String), String, String)]

   input11.join(input22).take(10)









Re: Saving compressed textFiles from a DStream in Scala

2015-06-09 Thread Akhil Das
like this?

myDStream.foreachRDD(rdd = rdd.saveAsTextFile(/sigmoid/, codec ))


Thanks
Best Regards

On Mon, Jun 8, 2015 at 8:06 PM, Bob Corsaro rcors...@gmail.com wrote:

 It looks like saveAsTextFiles doesn't support the compression parameter of
 RDD.saveAsTextFile. Is there a way to add the functionality in my client
 code without patching Spark? I tried making my own saveFunc function and
 calling DStream.foreachRDD but ran into trouble with invoking rddToFileName
 and making the RDD type parameter work properly. It's probably just do to
 my lack of Scala knowledge. Can anyone give me a hand?

   def saveAsTextFiles(prefix: String, suffix: String = ): Unit =
 ssc.withScope {
 val saveFunc = (rdd: RDD[T], time: Time) = {
   val file = rddToFileName(prefix, suffix, time)
   rdd.saveAsTextFile(file)
 }
 this.foreachRDD(saveFunc)
   }




RE: SparkSQL: How to specify replication factor on the persisted parquet files?

2015-06-09 Thread Haopu Wang
Cheng,

yes, it works, I set the property in SparkConf before initiating
SparkContext.
The property name is spark.hadoop.dfs.replication
Thanks fro the help!

-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com] 
Sent: Monday, June 08, 2015 6:41 PM
To: Haopu Wang; user
Subject: Re: SparkSQL: How to specify replication factor on the
persisted parquet files?

Then one possible workaround is to set dfs.replication in 
sc.hadoopConfiguration.

However, this configuration is shared by all Spark jobs issued within 
the same application. Since different Spark jobs can be issued from 
different threads, you need to pay attention to synchronization.

Cheng

On 6/8/15 2:46 PM, Haopu Wang wrote:
 Cheng, thanks for the response.

 Yes, I was using HiveContext.setConf() to set dfs.replication.
 However, I cannot change the value in Hadoop core-site.xml because
that
 will change every HDFS file.
 I only want to change the replication factor of some specific files.

 -Original Message-
 From: Cheng Lian [mailto:lian.cs@gmail.com]
 Sent: Sunday, June 07, 2015 10:17 PM
 To: Haopu Wang; user
 Subject: Re: SparkSQL: How to specify replication factor on the
 persisted parquet files?

 Were you using HiveContext.setConf()?

 dfs.replication is a Hadoop configuration, but setConf() is only
used
 to set Spark SQL specific configurations. You may either set it in
your
 Hadoop core-site.xml.

 Cheng


 On 6/2/15 2:28 PM, Haopu Wang wrote:
 Hi,

 I'm trying to save SparkSQL DataFrame to a persistent Hive table
using
 the default parquet data source.

 I don't know how to change the replication factor of the generated
 parquet files on HDFS.

 I tried to set dfs.replication on HiveContext but that didn't work.
 Any suggestions are appreciated very much!


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error in using saveAsParquetFile

2015-06-09 Thread Bipin Nag
Cheng you were right. It works when I remove the field from either one. I
should have checked the types beforehand. What confused me is that Spark
attempted to join it and midway threw the error. It isn't quite there yet.
Thanks for the help.

On Mon, Jun 8, 2015 at 8:29 PM Cheng Lian lian.cs@gmail.com wrote:

  I suspect that Bookings and Customerdetails both have a PolicyType field,
 one is string and the other is an int.


 Cheng


 On 6/8/15 9:15 PM, Bipin Nag wrote:

  Hi Jeetendra, Cheng

  I am using following code for joining

 val Bookings = sqlContext.load(/home/administrator/stageddata/Bookings)
 val Customerdetails =
 sqlContext.load(/home/administrator/stageddata/Customerdetails)

 val CD = Customerdetails.
 where($CreatedOn  2015-04-01 00:00:00.0).
 where($CreatedOn  2015-05-01 00:00:00.0)

 //Bookings by CD
 val r1 = Bookings.
 withColumnRenamed(ID,ID2)
 val r2 = CD.
 join(r1,CD.col(CustomerID) === r1.col(ID2),left)

 r2.saveAsParquetFile(/home/administrator/stageddata/BOOKING_FULL);

  @Cheng I am not appending the joined table to an existing parquet file,
 it is a new file.
  @Jitender I have a rather large parquet file and it also contains some
 confidential data. Can you tell me what you need to check in it.

  Thanks


 On 8 June 2015 at 16:47, Jeetendra Gangele gangele...@gmail.com wrote:

 Parquet file when are you loading these file?
 can you please share the code where you are passing parquet file to
 spark?.

 On 8 June 2015 at 16:39, Cheng Lian lian.cs@gmail.com wrote:

 Are you appending the joined DataFrame whose PolicyType is string to an
 existing Parquet file whose PolicyType is int? The exception indicates that
 Parquet found a column with conflicting data types.

 Cheng


 On 6/8/15 5:29 PM, bipin wrote:

 Hi I get this error message when saving a table:

 parquet.io.ParquetDecodingException: The requested schema is not
 compatible
 with the file schema. incompatible types: optional binary PolicyType
 (UTF8)
 != optional int32 PolicyType
 at

 parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:105)
 at

 parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:97)
 at parquet.schema.PrimitiveType.accept(PrimitiveType.java:386)
 at

 parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:87)
 at

 parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:61)
 at parquet.schema.MessageType.accept(MessageType.java:55)
 at
 parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:148)
 at
 parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:137)
 at
 parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:157)
 at

 parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:107)
 at

 parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
 at
 parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
 at

 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
 at

 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
 at
 org.apache.spark.sql.parquet.ParquetRelation2.org
 $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:667)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 I joined two tables both loaded from parquet file, the joined table when
 saved throws this error. I could not find anything about this error.
 Could
 this be a bug ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-using-saveAsParquetFile-tp23204.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




   --
  Hi,

  Find my attached resume. I have total around 7 years of work experience.
 I worked for Amazon and Expedia in my 

Re: How to decrease the time of storing block in memory

2015-06-09 Thread Akhil Das
May be you should check in your driver UI and see if there's any GC time
involved etc.

Thanks
Best Regards

On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote:

 hi there

   I am trying to descrease my app's running time in worker node. I
 checked the log and found the most time-wasting part is below:

 15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 2.1 KB, free 353.3 MB)
 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0
 (TID 0). 693 bytes result sent to driver


 I don't know why it needs 19s to storing 2.1KB size data to memory. Is
 there any tuning method?



 The attache is the full log, here it is:

 15/06/08 16:14:02 INFO executor.CoarseGrainedExecutorBackend: Registered
 signal handlers for [TERM, HUP, INT]
 15/06/08 16:14:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/06/08 16:14:10 INFO spark.SecurityManager: Changing view acls to: root
 15/06/08 16:14:10 INFO spark.SecurityManager: Changing modify acls to: root
 15/06/08 16:14:10 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(root); users with modify permissions: Set(root)
 15/06/08 16:14:14 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/06/08 16:14:14 INFO Remoting: Starting remoting
 15/06/08 16:14:15 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://driverPropsFetcher@slave5:54684]
 15/06/08 16:14:15 INFO util.Utils: Successfully started service
 'driverPropsFetcher' on port 54684.
 15/06/08 16:14:16 INFO spark.SecurityManager: Changing view acls to: root
 15/06/08 16:14:16 INFO spark.SecurityManager: Changing modify acls to: root
 15/06/08 16:14:16 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(root); users with modify permissions: Set(root)
 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Shutting down remote daemon.
 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Remote daemon shut down; proceeding with flushing remote transports.
 15/06/08 16:14:16 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/06/08 16:14:16 INFO Remoting: Starting remoting
 15/06/08 16:14:17 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Remoting shut down.
 15/06/08 16:14:17 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkExecutor@slave5:49169]
 15/06/08 16:14:17 INFO util.Utils: Successfully started service
 'sparkExecutor' on port 49169.
 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to MapOutputTracker:
 akka.tcp://sparkDriver@slave5:58630/user/MapOutputTracker
 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to BlockManagerMaster:
 akka.tcp://sparkDriver@slave5:58630/user/BlockManagerMaster
 15/06/08 16:14:17 INFO storage.DiskBlockManager: Created local directory
 at
 /tmp/spark-548b4618-4aba-4b63-9467-381fbfea8d5b/spark-83737dd6-46b0-47ee-82a5-5afee46bdbf5/spark-0fe9d8ba-2910-44a2-bf4f-80d179f5d58b/blockmgr-b4884e7a-2527-447a-9fc5-1823d923c2f1
 15/06/08 16:14:17 INFO storage.MemoryStore: MemoryStore started with
 capacity 353.3 MB
 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to
 OutputCommitCoordinator: akka.tcp://sparkDriver@slave5
 :58630/user/OutputCommitCoordinator
 15/06/08 16:14:17 INFO executor.CoarseGrainedExecutorBackend: Connecting
 to driver: akka.tcp://sparkDriver@slave5:58630/user/CoarseGrainedScheduler
 15/06/08 16:14:18 INFO worker.WorkerWatcher: Connecting to worker
 akka.tcp://sparkWorker@slave5:48926/user/Worker
 15/06/08 16:14:18 INFO executor.CoarseGrainedExecutorBackend: Successfully
 registered with driver
 15/06/08 16:14:18 INFO executor.Executor: Starting executor ID 0 on host
 slave5
 15/06/08 16:14:18 INFO worker.WorkerWatcher: Successfully connected to
 akka.tcp://sparkWorker@slave5:48926/user/Worker
 15/06/08 16:14:21 WARN internal.ThreadLocalRandom: Failed to generate a
 seed from SecureRandom within 3 seconds. Not enough entrophy?
 15/06/08 16:14:21 INFO netty.NettyBlockTransferService: Server created on
 53449
 15/06/08 16:14:21 INFO storage.BlockManagerMaster: Trying to register
 BlockManager
 15/06/08 16:14:21 INFO storage.BlockManagerMaster: Registered BlockManager
 15/06/08 16:14:21 INFO util.AkkaUtils: Connecting to HeartbeatReceiver:
 akka.tcp://sparkDriver@slave5:58630/user/HeartbeatReceiver
 15/06/08 16:14:21 INFO executor.CoarseGrainedExecutorBackend: Got assigned
 task 0
 15/06/08 16:14:21 INFO executor.Executor: Running task 0.0 in stage 0.0
 (TID 0)
 15/06/08 16:14:21 INFO executor.Executor: Fetching
 http://192.168.100.11:50648/jars/ShellCompare.jar with timestamp
 1433751228699
 15/06/08 16:14:21 INFO util.Utils: Fetching
 http://192.168.100.11:50648/jars/ShellCompare.jar to
 

[SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Haopu Wang
When I ran a spark streaming application longer, I noticed the local
directory's size was kept increasing.

I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

The spark streaming batch duration is 10 seconds and checkpoint duration
is 10 minutes.

The setting took effect but after that, below exception happened.

Do you have any idea about this error? Thank you!



15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
(TID 27045, host2): java.io.IOException:
org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
broadcast_82
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
oadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
adcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
a:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
ala:87)
at
org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
8)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
1)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_82_piece0 of broadcast_82
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
ala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
1.apply(TorrentBroadcast.scala:174)
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
... 25 more

15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4
times; aborting job
15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job
143382585 ms.0





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]

2015-06-09 Thread amit tewari
Thanks Akhil, as you suggested, I have to go keyBy(route) as need the
columns intact.
But wil keyBy() take accept multiple fields (eg x(0), x(1))?

Thanks
Amit

On Tue, Jun 9, 2015 at 12:26 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Try this way:

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3)))
 scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3)))

 scala input11.join(input22).take(10)


 PairFunctions basically requires RDD[K,V] and in your case its ((String,
 String), String, String). You can also look in keyBy if you don't want to
 concatenate your keys.

 Thanks
 Best Regards

 On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com
 wrote:

 Hi Dear Spark Users

 I am very new to Spark/Scala.

 Am using Datastax (4.7/Spark 1.2.1) and struggling with following
 error/issue.

 Already tried options like import org.apache.spark.SparkContext._ or
 explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions.
 But error not resolved.

 Help much appreciated.

 Thanks
 AT

 scalaval input1 = sc.textFile(/test7).map(line =
 line.split(,).map(_.trim));
 scalaval input2 = sc.textFile(/test8).map(line =
 line.split(,).map(_.trim));
 scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3)))
 scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3)))

  scala input11.join(input22).take(10)

 console:66: error: value join is not a member of
 org.apache.spark.rdd.RDD[((String, String), String, String)]

   input11.join(input22).take(10)










Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS

2015-06-09 Thread Akhil Das
Once you submits the application, you can check in the driver UI (running
on port 4040) Environment Tab to see whether those jars you added got
shipped or not. If they are shipped and still you are getting NoClassDef
exceptions then it means that you are having a jar conflict which you can
resolve by putting the jar with the class in it on the top of your
classpath.

Thanks
Best Regards

On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.com wrote:

  Hi, spark-users:



 I’m using spark-submit to submit multiple jars and files(all in HDFS) to
 run a job, with the following command:



 Spark-submit

   --class myClass

  --master spark://localhost:7077/

   --deploy-mode cluster

   --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar

   --files hdfs://localhost/1.txt, hdfs://localhost/2.txt

  hdfs://localhost/main.jar



 the stderr in the driver showed java.lang.ClassNotDefException for a class
 in 1.jar.



 I checked the log that spark has added these jars:

  INFO SparkContext: Added JAR hdfs:// …1.jar

  INFO SparkContext: Added JAR hdfs:// …2.jar



 In the folder of the driver, I only saw the main.jar is copied to that
 place, *but  the other jars and files were not there*



 Could someone explain *how should I pass the jars and files* needed by
 the main jar to spark?



 If my class in main.jar refer to these files with a relative path, *will
 spark copy these files into one folder*?



 BTW, my class works in a client mode with all jars and files in local.



 Thanks

 Dong Lei



Re: SparkSQL: How to specify replication factor on the persisted parquet files?

2015-06-09 Thread ayan guha
Hi

I am little confused here. If I am writing to HDFS,shouldn't HDFS
replication factor will automatically kick in? In other words, how spark
writer is different than a hdfs -put commnd (from perspective of HDFS, of
course)?

Best
Ayan

On Tue, Jun 9, 2015 at 5:17 PM, Haopu Wang hw...@qilinsoft.com wrote:

 Cheng,

 yes, it works, I set the property in SparkConf before initiating
 SparkContext.
 The property name is spark.hadoop.dfs.replication
 Thanks fro the help!

 -Original Message-
 From: Cheng Lian [mailto:lian.cs@gmail.com]
 Sent: Monday, June 08, 2015 6:41 PM
 To: Haopu Wang; user
 Subject: Re: SparkSQL: How to specify replication factor on the
 persisted parquet files?

 Then one possible workaround is to set dfs.replication in
 sc.hadoopConfiguration.

 However, this configuration is shared by all Spark jobs issued within
 the same application. Since different Spark jobs can be issued from
 different threads, you need to pay attention to synchronization.

 Cheng

 On 6/8/15 2:46 PM, Haopu Wang wrote:
  Cheng, thanks for the response.
 
  Yes, I was using HiveContext.setConf() to set dfs.replication.
  However, I cannot change the value in Hadoop core-site.xml because
 that
  will change every HDFS file.
  I only want to change the replication factor of some specific files.
 
  -Original Message-
  From: Cheng Lian [mailto:lian.cs@gmail.com]
  Sent: Sunday, June 07, 2015 10:17 PM
  To: Haopu Wang; user
  Subject: Re: SparkSQL: How to specify replication factor on the
  persisted parquet files?
 
  Were you using HiveContext.setConf()?
 
  dfs.replication is a Hadoop configuration, but setConf() is only
 used
  to set Spark SQL specific configurations. You may either set it in
 your
  Hadoop core-site.xml.
 
  Cheng
 
 
  On 6/2/15 2:28 PM, Haopu Wang wrote:
  Hi,
 
  I'm trying to save SparkSQL DataFrame to a persistent Hive table
 using
  the default parquet data source.
 
  I don't know how to change the replication factor of the generated
  parquet files on HDFS.
 
  I tried to set dfs.replication on HiveContext but that didn't work.
  Any suggestions are appreciated very much!
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards,
Ayan Guha


Re: Error in using saveAsParquetFile

2015-06-09 Thread Cheng Lian
Yeah, this does look confusing. We are trying to improve the error 
reporting by catching similar issues at the end of the analysis phase 
and give more descriptive error messages. Part of the work can be found 
here: 
https://github.com/apache/spark/blob/0902a11940e550e85a53e110b490fe90e16ddaf4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala


Cheng

On 6/9/15 2:51 PM, Bipin Nag wrote:
Cheng you were right. It works when I remove the field from either 
one. I should have checked the types beforehand. What confused me is 
that Spark attempted to join it and midway threw the error. It isn't 
quite there yet. Thanks for the help.


On Mon, Jun 8, 2015 at 8:29 PM Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


I suspect that Bookings and Customerdetails both have a PolicyType
field, one is string and the other is an int.


Cheng


On 6/8/15 9:15 PM, Bipin Nag wrote:

Hi Jeetendra, Cheng

I am using following code for joining

val Bookings =
sqlContext.load(/home/administrator/stageddata/Bookings)
val Customerdetails =
sqlContext.load(/home/administrator/stageddata/Customerdetails)

val CD = Customerdetails.
where($CreatedOn  2015-04-01 00:00:00.0).
where($CreatedOn  2015-05-01 00:00:00.0)

//Bookings by CD
val r1 = Bookings.
withColumnRenamed(ID,ID2)
val r2 = CD.
join(r1,CD.col(CustomerID) === r1.col(ID2),left)

r2.saveAsParquetFile(/home/administrator/stageddata/BOOKING_FULL);

@Cheng I am not appending the joined table to an existing parquet
file, it is a new file.
@Jitender I have a rather large parquet file and it also contains
some confidential data. Can you tell me what you need to check in it.

Thanks


On 8 June 2015 at 16:47, Jeetendra Gangele gangele...@gmail.com
mailto:gangele...@gmail.com wrote:

Parquet file when are you loading these file?
can you please share the code where you are passing parquet
file to spark?.

On 8 June 2015 at 16:39, Cheng Lian lian.cs@gmail.com
mailto:lian.cs@gmail.com wrote:

Are you appending the joined DataFrame whose PolicyType
is string to an existing Parquet file whose PolicyType is
int? The exception indicates that Parquet found a column
with conflicting data types.

Cheng


On 6/8/15 5:29 PM, bipin wrote:

Hi I get this error message when saving a table:

parquet.io
http://parquet.io.ParquetDecodingException: The
requested schema is not compatible
with the file schema. incompatible types: optional
binary PolicyType (UTF8)
!= optional int32 PolicyType
at

parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:105)
at

parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:97)
at
parquet.schema.PrimitiveType.accept(PrimitiveType.java:386)
at

parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:87)
at

parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:61)
at
parquet.schema.MessageType.accept(MessageType.java:55)
at
parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:148)
at
parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:137)
at
parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:157)
at

parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:107)
at

parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
at

parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
at

parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at

parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.ParquetRelation2.org

http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:667)
at

org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at


Different Sorting RDD methods in Apache Spark

2015-06-09 Thread raggy
For a research project, I tried sorting the elements in an RDD. I did this in
two different approaches.

In the first method, I applied a mapPartitions() function on the RDD, so
that it would sort the contents of the RDD, and provide a result RDD that
contains the sorted list as the only record in the RDD. Then, I applied a
reduce function which basically merges sorted lists.

I ran these experiments on an EC2 cluster containing 30 nodes. I set it up
using the spark ec2 script. The data file was stored in HDFS.

In the second approach I used the sortBy method in Spark.

I performed these operation on the US census data(100MB) found here

A single lines looks like this

9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not
in universe or children, Not in universe, White, All other, Female, Not in
universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not
in universe, Not in universe, Child 18 never marr not in subfamily, Child
under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in
universe, 0, Both parents present, United-States, United-States,
United-States, Native- Born in the United States, 0, Not in universe, 0, 0,
94, - 5.
I sorted based on the 25th value in the CSV. In this line that is 1758.14.

I noticed that sortBy performs worse than the other method. Is this the
expected scenario? If it is, why wouldn't the mapPartitions() and reduce()
be the default sorting approach?

Here is my implementation

public static void sortBy(JavaSparkContext sc){
JavaRDDString rdd = sc.textFile(/data.txt,32);
long start = System.currentTimeMillis();
rdd.sortBy(new FunctionString, Double(){

@Override
public Double call(String v1) throws Exception {
  // TODO Auto-generated method stub
  String [] arr = v1.split(,);
  return Double.parseDouble(arr[24]);   
}
}, true, 9).collect();
long end = System.currentTimeMillis();
System.out.println(SortBy:  + (end - start));
  }

public static void sortList(JavaSparkContext sc){
JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l,
8);
long start = System.currentTimeMillis();
JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 =
rdd.mapPartitions(new FlatMapFunctionIteratorlt;String,
LinkedListTuple2lt;Double, String(){

@Override
public IterableLinkedListlt;Tuple2lt;Double, String
call(IteratorString t)
throws Exception {
  // TODO Auto-generated method stub
  LinkedListTuple2lt;Double, String lines = new
LinkedListTuple2lt;Double, String();
  while(t.hasNext()){   
String s = t.next();
String arr1[] = s.split(,);
Tuple2Double, String t1 = new Tuple2Double,
String(Double.parseDouble(arr1[24]),s);
lines.add(t1);
  }
  Collections.sort(lines, new IncomeComparator());
  LinkedListLinkedListlt;Tuple2lt;Double, String list = new
LinkedListLinkedListlt;Tuple2lt;Double, String();
  list.add(lines);
  return list;
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cassandra Submit

2015-06-09 Thread Yasemin Kaya
Yes my cassandra is listening on 9160 I think. Actually I know from yaml
file. The file includes :

rpc_address: localhost
# port for Thrift to listen for clients on
rpc_port: 9160

I check the port nc -z localhost 9160; echo $? it returns me 0. I think
it close, should I open this port ?

2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Is your cassandra installation actually listening on 9160?

 lsof -i :9160COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
 java29232 ykadiysk   69u  IPv4 42152497  0t0  TCP localhost:9160 
 (LISTEN)

 ​
 I am running an out-of-the box cassandra conf where

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160



 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote:

 I couldn't find any solution. I can write but I can't read from
 Cassandra.

 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Thanks alot Mohammed, Gerard and Yana.
 I can write to table, but exception returns me. It says *Exception in
 thread main java.io.IOException: Failed to open thrift connection to
 Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160*

 In yaml file :
 rpc_address: localhost
 rpc_port: 9160

 And at project :

 .set(spark.cassandra.connection.host, 127.0.0.1)
 .set(spark.cassandra.connection.rpc.port, 9160);

 or

 .set(spark.cassandra.connection.host, localhost)
 .set(spark.cassandra.connection.rpc.port, 9160);

 whatever I write setting,  I get same exception. Any help ??


 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 yes, whatever you put for listen_address in cassandra.yaml. Also, you
 should try to connect to your cassandra cluster via bin/cqlsh to make sure
 you have connectivity before you try to make a a connection via spark.

 On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,
 I run my project on local. How can find ip address of my cassandra
 host ? From cassandra.yaml or ??

 yasemin

 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com:

 ? = ip address of your cassandra host

 On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should I
 change ? Should I change cassandra.yaml ?

 Error says me *Exception in thread main java.io.IOException:
 Failed to open native connection to Cassandra at {127.0.1.1}:9042*

 What should I add *SparkConf sparkConf = new
 SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts,
 true).set(spark.cassandra.connection.host, ?);*

 Best
 yasemin

 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com:

  Check your spark.cassandra.connection.host setting. It should be
 pointing to one of your Cassandra nodes.



 Mohammed



 *From:* Yasemin Kaya [mailto:godo...@gmail.com]
 *Sent:* Friday, June 5, 2015 7:31 AM
 *To:* user@spark.apache.org
 *Subject:* Cassandra Submit



 Hi,



 I am using cassandraDB in my project. I had that error *Exception
 in thread main java.io.IOException: Failed to open native connection 
 to
 Cassandra at {127.0.1.1}:9042*



 I think I have to modify the submit line. What should I add or
 remove when I submit my project?



 Best,

 yasemin





 --

 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç





 --
 hiç ender hiç




 --
 hiç ender hiç





-- 
hiç ender hiç


Re: BigDecimal problem in parquet file

2015-06-09 Thread Cheng Lian
Would you please provide a snippet that reproduce this issue? What 
version of Spark were you using?


Cheng

On 6/9/15 8:18 PM, bipin wrote:

Hi,
When I try to save my data frame as a parquet file I get the following
error:

java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
org.apache.spark.sql.types.Decimal
at
org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
at
org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

How to fix this problem ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



traverse a graph based on edge properties whilst counting matching vertex attributes

2015-06-09 Thread MA2
Hi All, 

I was hoping somebody might be able to help out, 

I currently have a network built using graphx which looks like the following
(only with a much larger number of vertices and edges) 

Vertices 
ID, Attribute1, Attribute2 
1001 2 0 
1002 1 0 
1003 2 1 
1004 3 2 
1006 4 0 
1007 5 1 

Edges 
Source, Destination, Attribute 
1001 1002 7 
1002 1003 7 
1003 1004 7 
1004 1005 3 
1002 1006 5 
1006 1007 5 

For each vertex I need to send a message down a chain to each connected
component based on the edge attribute and count how many matches there are
of the vertex attribute to another vertex attribute along the chain. 

So for example: 
For vertex 1004 the connecting edge attribute is 7, so I want to identify
each component which is connected to 1004 by edge attribute 7, in this case
it would be 1001-1002-1003-1004, then pattern match the second vertex
attribute from 1004 (in this case 2) to any matching first vertex attributes
along the chain (in this case it would match with 1003 and 1001, giving me a
total count of 2). 


The two bits of code I have at the moment are firstly to subgraph by vertex
ID (although I have to specify the edge property) 


  val edgeProperty = 7 
  val fGraph = graph.subgraph(epred = e = e.attr == edgeProperty) 


and currently I have code which counts the nearest neighbour, but does not
count down the chain of connected nodes. 

val counts = { 
  fGraph.collectNeighbors(EdgeDirection.In).join(graph.vertices).map { 
case (id, t) = { 
  val neighbors: Array[(VertexId, Array[String])] = t._1 
  val nodeAttr = t._2 
  neighbors.map(_._2).filter(x = x(0) == nodeAttr(2)).size 
} 
  } 
} 

The problem I’m having is to make this process automatic and to pattern
match down all connected components, so for each vertex: 

1. Subgraph by all edge properties which connect to it 
2. Count all matching vertex properties along each of these subgraphs 
3. Produce a count at the end for each vertex 


I’m still pretty new to spark and scala so I may be looking to do this in a
very inefficient way, any suggestions of how best to achieve this task would
be most welcome, for example would this be possible using Pregel?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/traverse-a-graph-based-on-edge-properties-whilst-counting-matching-vertex-attributes-tp23224.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cassandra Submit

2015-06-09 Thread Yasemin Kaya
Sorry my answer I hit terminal lsof -i:9160: result is

lsof -i:9160
COMMAND  PIDUSER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java7597 inosens  101u  IPv4  85754  0t0  TCP localhost:9160
(LISTEN)

so 9160 port is available or not ?

2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Yes my cassandra is listening on 9160 I think. Actually I know from yaml
 file. The file includes :

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160

 I check the port nc -z localhost 9160; echo $? it returns me 0. I
 think it close, should I open this port ?

 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Is your cassandra installation actually listening on 9160?

 lsof -i :9160COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
 java29232 ykadiysk   69u  IPv4 42152497  0t0  TCP localhost:9160 
 (LISTEN)

 ​
 I am running an out-of-the box cassandra conf where

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160



 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote:

 I couldn't find any solution. I can write but I can't read from
 Cassandra.

 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Thanks alot Mohammed, Gerard and Yana.
 I can write to table, but exception returns me. It says *Exception in
 thread main java.io.IOException: Failed to open thrift connection to
 Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160*

 In yaml file :
 rpc_address: localhost
 rpc_port: 9160

 And at project :

 .set(spark.cassandra.connection.host, 127.0.0.1)
 .set(spark.cassandra.connection.rpc.port, 9160);

 or

 .set(spark.cassandra.connection.host, localhost)
 .set(spark.cassandra.connection.rpc.port, 9160);

 whatever I write setting,  I get same exception. Any help ??


 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 yes, whatever you put for listen_address in cassandra.yaml. Also, you
 should try to connect to your cassandra cluster via bin/cqlsh to make sure
 you have connectivity before you try to make a a connection via spark.

 On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi,
 I run my project on local. How can find ip address of my cassandra
 host ? From cassandra.yaml or ??

 yasemin

 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com:

 ? = ip address of your cassandra host

 On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should I
 change ? Should I change cassandra.yaml ?

 Error says me *Exception in thread main java.io.IOException:
 Failed to open native connection to Cassandra at {127.0.1.1}:9042*

 What should I add *SparkConf sparkConf = new
 SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts,
 true).set(spark.cassandra.connection.host, ?);*

 Best
 yasemin

 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com:

  Check your spark.cassandra.connection.host setting. It should be
 pointing to one of your Cassandra nodes.



 Mohammed



 *From:* Yasemin Kaya [mailto:godo...@gmail.com]
 *Sent:* Friday, June 5, 2015 7:31 AM
 *To:* user@spark.apache.org
 *Subject:* Cassandra Submit



 Hi,



 I am using cassandraDB in my project. I had that error *Exception
 in thread main java.io.IOException: Failed to open native 
 connection to
 Cassandra at {127.0.1.1}:9042*



 I think I have to modify the submit line. What should I add or
 remove when I submit my project?



 Best,

 yasemin





 --

 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç





 --
 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç




-- 
hiç ender hiç


Issue running Spark 1.4 on Yarn

2015-06-09 Thread Matt Kapilevich
Hi all,

I'm manually building Spark from source against 1.4 branch and submitting
the job against Yarn. I am seeing very strange behavior. The first 2 or 3
times I submit the job, it runs fine, computes Pi, and exits. The next time
I run it, it gets stuck in the ACCEPTED state.

I'm kicking off a job using yarn-client mode like this:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
yarn-client  --num-executors 3--driver-memory 4g --executor-memory
2g--executor-cores 1--queue thequeue
examples/target/scala-2.10/spark-examples*.jar10

Here's what ResourceManager shows:[image: Yarn ResourceManager UI]

In Yarn ResourceManager logs, all I'm seeing is this:

2015-06-08 14:49:57,166 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler:
Added Application Attempt appattempt_1433789077942_0004_01 to scheduler
from user: root
2015-06-08 14:49:57,166 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1433789077942_0004_01 State change from SUBMITTED to
SCHEDULED

There's nothing in the NodeManager logs (though its up and running), the
job isn't getting that far.

It seems to me that there's an issue somewhere between Spark 1.4 and Yarn
integration. Hadoop runs without any issues. I've ran the below multiple
times.

yarn jar
/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.4.2.jar pi
16 100

For reference, I'm compiling the source against 1.4 branch, and running it
on a single-node cluster with CDH5.4 and Hadoop 2.6, distributed mode. I am
using the following to compile: mvn -Phadoop-2.6 -Dhadoop.version=2.6.0
-Pyarn -Phive -Phive-thriftserver -DskipTests clean package

Any help appreciated.

Thanks,
-Matt


Join between DStream and Periodically-Changing-RDD

2015-06-09 Thread Ilove Data
Hi,

I'm trying to join DStream with interval let say 20s, join with RDD loaded
from HDFS folder which is changing periodically, let say new file is coming
to the folder for every 10 minutes.

How should it be done, considering the HDFS files in the folder is
periodically changing/adding new files? Do RDD automatically detect changes
in HDFS folder as RDD source and automatically reload RDD?

Thanks!
Rendy


Costs of transformations

2015-06-09 Thread Vijayasarathy Kannan
Is it possible bound costs of operations such as flatMap(), collect() based
on the size of RDDs?


Re: Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException

2015-06-09 Thread Josh Mahonin
This may or may not be helpful for your classpath issues, but I wanted to
verify that basic functionality worked, so I made a sample app here:

https://github.com/jmahonin/spark-streaming-phoenix

This consumes events off a Kafka topic using spark streaming, and writes
out event counts to Phoenix using the new phoenix-spark functionality:
http://phoenix.apache.org/phoenix_spark.html

It's definitely overkill, and would probably be more efficient to use the
JDBC driver directly, but it serves as a proof-of-concept.

I've only tested this in local mode. To convert it to a full jobs JAR, I
suspect that keeping all of the spark and phoenix dependencies marked as
'provided', and including the Phoenix client JAR in the Spark classpath
would work as well.

Good luck,

Josh

On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek j.v...@anchormen.nl wrote:

 Hi,

 I posted a question with regards to Phoenix and Spark Streaming on
 StackOverflow [1]. Please find a copy of the question to this email below
 the
 first stack trace. I also already contacted the Phoenix mailing list and
 tried
 the suggestion of setting spark.driver.userClassPathFirst. Unfortunately
 that
 only pushed me further into the dependency hell, which I tried to resolve
 until I hit a wall with an UnsatisfiedLinkError on Snappy.

 What I am trying to achieve: To save a stream from Kafka into
 Phoenix/Hbase
 via Spark Streaming. I'm using MapR as a platform and the original
 exception
 happens both on a 3-node cluster, as on the MapR Sandbox (a VM for
 experimentation), in YARN and stand-alone mode. Further experimentation
 (like
 the saveAsNewHadoopApiFile below), was done only on the sandbox in
 standalone
 mode.

 Phoenix only supports Spark from 4.4.0 onwards, but I thought I could
 use a naive implementation that creates a new connection for
 every RDD from the DStream in 4.3.1.  This resulted in the
 ClassNotFoundException described in [1], so I switched to 4.4.0.

 Unfortunately the saveToPhoenix method is only available in Scala. So I did
 find the suggestion to try it via the saveAsNewHadoopApiFile method [2]
 and an
 example implementation [3], which I adapted to my own needs.

 However, 4.4.0 + saveAsNewHadoopApiFile  raises the same
 ClassNotFoundExeption, just a slightly different stacktrace:

   java.lang.RuntimeException: java.sql.SQLException: ERROR 103
 (08004): Unable to establish connection.
 at

 org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58)
 at

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:995)
 at

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to
 establish connection.
 at

 org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386)
 at

 org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
 at

 org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288)
 at

 org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171)
 at

 org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1881)
 at

 org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860)
 at

 org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
 at

 org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860)
 at

 org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
 at

 org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131)
 at
 org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
 at
 java.sql.DriverManager.getConnection(DriverManager.java:571)
 at
 java.sql.DriverManager.getConnection(DriverManager.java:187)
 at

 org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92)
 at

 org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80)
 at

 org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68)
 at

 org.apache.phoenix.mapreduce.PhoenixRecordWriter.init(PhoenixRecordWriter.java:49)
 

Re: Spark 1.3.1 SparkSQL metastore exceptions

2015-06-09 Thread Cheng Lian
Seems that you're using a DB2 Hive metastore? I'm not sure whether Hive 
0.12.0 officially supports DB2, but probably not? (Since I didn't find 
DB2 scripts under the metastore/scripts/upgrade folder in Hive source tree.)


Cheng

On 6/9/15 8:28 PM, Needham, Guy wrote:

Hi,
I’m using Spark 1.3.1 to insert into a Hive 0.12 table from a SparkSQL 
query. The query is a very simple select from a dummy Hive table used 
for benchmarking.
I’m using a create table as statement to do the insert. No matter if I 
do that or an insert overwrite, I get the same Hive exception, unable 
to alter table, with some Hive metastore issues.
The data is inserted into the Hive table as expected, however I get a 
very long stacktrace. Does anyone know the meaning of the stacktrace 
and how I can avoid generating it every time I insert into a table?
scala hiveContext.sql(create table 
benchmarking.spark_logins_benchmark as select * from 
benchmarking.logins_benchmark limit 10)

org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table.
at 
org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:387)
at 
org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1448)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:235)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:123)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:255)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)
at 
org.apache.spark.sql.hive.execution.CreateTableAsSelect.run(CreateTableAsSelect.scala:70)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54)
at 
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)

at org.apache.spark.sql.DataFrame.init(DataFrame.scala:147)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:130)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at 
org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)
at $iwC$$iwC$$iwC$$iwC.init(console:35)
at $iwC$$iwC$$iwC.init(console:37)
at $iwC$$iwC.init(console:39)
at $iwC.init(console:41)
at init(console:43)
at .init(console:47)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at 

Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Marcelo Vanzin
If your application is stuck in that state, it generally means your cluster
doesn't have enough resources to start it.

In the RM logs you can see how many vcores / memory the application is
asking for, and then you can check your RM configuration to see if that's
currently available on any single NM.

On Tue, Jun 9, 2015 at 7:56 AM, Matt Kapilevich matve...@gmail.com wrote:

 Hi all,

 I'm manually building Spark from source against 1.4 branch and submitting
 the job against Yarn. I am seeing very strange behavior. The first 2 or 3
 times I submit the job, it runs fine, computes Pi, and exits. The next time
 I run it, it gets stuck in the ACCEPTED state.

 I'm kicking off a job using yarn-client mode like this:

 ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-client  --num-executors 3--driver-memory 4g --executor-memory
 2g--executor-cores 1--queue thequeue
 examples/target/scala-2.10/spark-examples*.jar10

 Here's what ResourceManager shows:[image: Yarn ResourceManager UI]

 In Yarn ResourceManager logs, all I'm seeing is this:

 2015-06-08 14:49:57,166 INFO
 org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler:
 Added Application Attempt appattempt_1433789077942_0004_01 to scheduler
 from user: root
 2015-06-08 14:49:57,166 INFO
 org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
 appattempt_1433789077942_0004_01 State change from SUBMITTED to
 SCHEDULED

 There's nothing in the NodeManager logs (though its up and running), the
 job isn't getting that far.

 It seems to me that there's an issue somewhere between Spark 1.4 and Yarn
 integration. Hadoop runs without any issues. I've ran the below multiple
 times.

 yarn jar
 /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.4.2.jar pi
 16 100

 For reference, I'm compiling the source against 1.4 branch, and running it
 on a single-node cluster with CDH5.4 and Hadoop 2.6, distributed mode. I am
 using the following to compile: mvn -Phadoop-2.6 -Dhadoop.version=2.6.0
 -Pyarn -Phive -Phive-thriftserver -DskipTests clean package

 Any help appreciated.

 Thanks,
 -Matt




-- 
Marcelo


Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Raghav Shankar
Thank you for you responses!

You mention that it only works as long as the data fits on a single
machine. What I am tying to do is receive the sorted contents of my
dataset. For this to be possible, the entire dataset should be able to fit
on a single machine. Are you saying that sorting the entire data and
collecting it on the driver node is not a typical use case? If I want to do
this using sortBy(), I would first call sortBy() followed by a collect().
Collect() would involve gathering all the data on a single machine as well.

Thanks,
Raghav

On Tuesday, June 9, 2015, Mark Hamstra m...@clearstorydata.com wrote:

 Correct.  Trading away scalability for increased performance is not an
 option for the standard Spark API.

 On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com
 javascript:_e(%7B%7D,'cvml','daniel.dara...@lynxanalytics.com'); wrote:

 It would be even faster to load the data on the driver and sort it there
 without using Spark :). Using reduce() is cheating, because it only works
 as long as the data fits on one machine. That is not the targeted use case
 of a distributed computation system. You can repeat your test with more
 data (that doesn't fit on one machine) to see what I mean.

 On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com
 javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote:

 For a research project, I tried sorting the elements in an RDD. I did
 this in
 two different approaches.

 In the first method, I applied a mapPartitions() function on the RDD, so
 that it would sort the contents of the RDD, and provide a result RDD that
 contains the sorted list as the only record in the RDD. Then, I applied a
 reduce function which basically merges sorted lists.

 I ran these experiments on an EC2 cluster containing 30 nodes. I set it
 up
 using the spark ec2 script. The data file was stored in HDFS.

 In the second approach I used the sortBy method in Spark.

 I performed these operation on the US census data(100MB) found here

 A single lines looks like this

 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married,
 Not
 in universe or children, Not in universe, White, All other, Female, Not
 in
 universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler,
 Not
 in universe, Not in universe, Child 18 never marr not in subfamily,
 Child
 under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not
 in
 universe, 0, Both parents present, United-States, United-States,
 United-States, Native- Born in the United States, 0, Not in universe, 0,
 0,
 94, - 5.
 I sorted based on the 25th value in the CSV. In this line that is
 1758.14.

 I noticed that sortBy performs worse than the other method. Is this the
 expected scenario? If it is, why wouldn't the mapPartitions() and
 reduce()
 be the default sorting approach?

 Here is my implementation

 public static void sortBy(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 long start = System.currentTimeMillis();
 rdd.sortBy(new FunctionString, Double(){

 @Override
 public Double call(String v1) throws Exception {
   // TODO Auto-generated method stub
   String [] arr = v1.split(,);
   return Double.parseDouble(arr[24]);
 }
 }, true, 9).collect();
 long end = System.currentTimeMillis();
 System.out.println(SortBy:  + (end - start));
   }

 public static void sortList(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 //parallelize(l,
 8);
 long start = System.currentTimeMillis();
 JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 =
 rdd.mapPartitions(new FlatMapFunctionIteratorlt;String,
 LinkedListTuple2lt;Double, String(){

 @Override
 public IterableLinkedListlt;Tuple2lt;Double, String
 call(IteratorString t)
 throws Exception {
   // TODO Auto-generated method stub
   LinkedListTuple2lt;Double, String lines = new
 LinkedListTuple2lt;Double, String();
   while(t.hasNext()){
 String s = t.next();
 String arr1[] = s.split(,);
 Tuple2Double, String t1 = new Tuple2Double,
 String(Double.parseDouble(arr1[24]),s);
 lines.add(t1);
   }
   Collections.sort(lines, new IncomeComparator());
   LinkedListLinkedListlt;Tuple2lt;Double, String list = new
 LinkedListLinkedListlt;Tuple2lt;Double, String();
   list.add(lines);
   return list;
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 

Implementing top() using treeReduce()

2015-06-09 Thread raggy
I am trying to implement top-k in scala within apache spark. I am aware that
spark has a top action. But, top() uses reduce(). Instead, I would like to
use treeReduce(). I am trying to compare the performance of reduce() and
treeReduce().

The main issue I have is that I cannot use these 2 lines of code which are
used in the top() action within my Spark application.

val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)

How can I go about implementing top() using treeReduce()?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cassandra Submit

2015-06-09 Thread Yana Kadiyska
hm. Yeah, your port is good...have you seen this thread:
http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector
? It seems that you might be running into version mis-match issues?

What versions of Spark/Cassandra-connector are you trying to use?

On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.com wrote:

 Sorry my answer I hit terminal lsof -i:9160: result is

 lsof -i:9160
 COMMAND  PIDUSER   FD   TYPE DEVICE SIZE/OFF NODE NAME
 java7597 inosens  101u  IPv4  85754  0t0  TCP localhost:9160
 (LISTEN)

 so 9160 port is available or not ?

 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Yes my cassandra is listening on 9160 I think. Actually I know from yaml
 file. The file includes :

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160

 I check the port nc -z localhost 9160; echo $? it returns me 0. I
 think it close, should I open this port ?

 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Is your cassandra installation actually listening on 9160?

 lsof -i :9160COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
 java29232 ykadiysk   69u  IPv4 42152497  0t0  TCP localhost:9160 
 (LISTEN)

 ​
 I am running an out-of-the box cassandra conf where

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160



 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote:

 I couldn't find any solution. I can write but I can't read from
 Cassandra.

 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Thanks alot Mohammed, Gerard and Yana.
 I can write to table, but exception returns me. It says *Exception
 in thread main java.io.IOException: Failed to open thrift connection to
 Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160*

 In yaml file :
 rpc_address: localhost
 rpc_port: 9160

 And at project :

 .set(spark.cassandra.connection.host, 127.0.0.1)
 .set(spark.cassandra.connection.rpc.port, 9160);

 or

 .set(spark.cassandra.connection.host, localhost)
 .set(spark.cassandra.connection.rpc.port, 9160);

 whatever I write setting,  I get same exception. Any help ??


 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 yes, whatever you put for listen_address in cassandra.yaml. Also, you
 should try to connect to your cassandra cluster via bin/cqlsh to make 
 sure
 you have connectivity before you try to make a a connection via spark.

 On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi,
 I run my project on local. How can find ip address of my cassandra
 host ? From cassandra.yaml or ??

 yasemin

 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com:

 ? = ip address of your cassandra host

 On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should I
 change ? Should I change cassandra.yaml ?

 Error says me *Exception in thread main java.io.IOException:
 Failed to open native connection to Cassandra at {127.0.1.1}:9042*

 What should I add *SparkConf sparkConf = new
 SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts,
 true).set(spark.cassandra.connection.host, ?);*

 Best
 yasemin

 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com
 :

  Check your spark.cassandra.connection.host setting. It should
 be pointing to one of your Cassandra nodes.



 Mohammed



 *From:* Yasemin Kaya [mailto:godo...@gmail.com]
 *Sent:* Friday, June 5, 2015 7:31 AM
 *To:* user@spark.apache.org
 *Subject:* Cassandra Submit



 Hi,



 I am using cassandraDB in my project. I had that error *Exception
 in thread main java.io.IOException: Failed to open native 
 connection to
 Cassandra at {127.0.1.1}:9042*



 I think I have to modify the submit line. What should I add or
 remove when I submit my project?



 Best,

 yasemin





 --

 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç





 --
 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç




 --
 hiç ender hiç



Re: RDD of RDDs

2015-06-09 Thread Mark Hamstra
That would constitute a major change in Spark's architecture.  It's not
happening anytime soon.

On Tue, Jun 9, 2015 at 1:34 AM, kiran lonikar loni...@gmail.com wrote:

 Possibly in future, if and when spark architecture allows workers to
 launch spark jobs (the functions passed to transformation or action APIs of
 RDD), it will be possible to have RDD of RDD.

 On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar loni...@gmail.com wrote:

 Simillar question was asked before:
 http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html

 Here is one of the reasons why I think RDD[RDD[T]] is not possible:

- RDD is only a handle to the actual data partitions. It has a
reference/pointer to the *SparkContext* object (*sc*) and a list of
partitions.
- The *SparkContext *is an object in the Spark Application/Driver
Program's JVM. Similarly, the list of partitions is also in the JVM of the
driver program. Each partition contains kind of remote references to the
partition data on the worker JVMs.
- The functions passed to RDD's transformations and actions execute
in the worker's JVMs on different nodes. For example, in *rdd.map {
x = x*x }*, the function performing *x*x* runs on the JVMs of the
worker nodes where the partitions of the RDD reside. These JVMs do not 
 have
access to the *sc* since its only on the driver's JVM.
- Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd =
innerRDD.filter { x = x*x } }*, the worker nodes will not be able to
execute the *filter* on *innerRDD *as the code in the worker does not
have access to sc and can not launch a spark job.


 Hope it helps. You need to consider List[RDD] or some other collection.

 -Kiran

 On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote:

 Hi,


 The problem I am looking at is as follows:

 - I read in a log file of multiple users as a RDD

 - I'd like to group the above RDD into *multiple RDDs* by userIds (the
 key)

 - my processEachUser() function then takes in each RDD mapped into
 each individual user, and calls for RDD.map or DataFrame operations on
 them. (I already had the function coded, I am therefore reluctant to work
 with the ResultIterable object coming out of rdd.groupByKey() ... )

 I've searched the mailing list and googled on RDD of RDDs and seems
 like it isn't a thing at all.

 A few choices left seem to be: 1) groupByKey() and then work with the
 ResultIterable object; 2) groupbyKey() and then write each group into a
 file, and read them back as individual rdds to process..

 Anyone got a better idea or had a similar problem before?


 Thanks!
 Ping






 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721






Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Mark Hamstra

 Are you saying that sorting the entire data and collecting it on the
 driver node is not a typical use case?


It most definitely is not.  Spark is designed and intended to be used with
very large datasets.  Far from being typical, collecting hundreds of
gigabytes, terabytes or petabytes to the driver node is not feasible.

On Tue, Jun 9, 2015 at 10:01 AM, Raghav Shankar raghav0110...@gmail.com
wrote:

 Thank you for you responses!

 You mention that it only works as long as the data fits on a single
 machine. What I am tying to do is receive the sorted contents of my
 dataset. For this to be possible, the entire dataset should be able to fit
 on a single machine. Are you saying that sorting the entire data and
 collecting it on the driver node is not a typical use case? If I want to do
 this using sortBy(), I would first call sortBy() followed by a collect().
 Collect() would involve gathering all the data on a single machine as well.

 Thanks,
 Raghav

 On Tuesday, June 9, 2015, Mark Hamstra m...@clearstorydata.com wrote:

 Correct.  Trading away scalability for increased performance is not an
 option for the standard Spark API.

 On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 It would be even faster to load the data on the driver and sort it there
 without using Spark :). Using reduce() is cheating, because it only works
 as long as the data fits on one machine. That is not the targeted use case
 of a distributed computation system. You can repeat your test with more
 data (that doesn't fit on one machine) to see what I mean.

 On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com wrote:

 For a research project, I tried sorting the elements in an RDD. I did
 this in
 two different approaches.

 In the first method, I applied a mapPartitions() function on the RDD, so
 that it would sort the contents of the RDD, and provide a result RDD
 that
 contains the sorted list as the only record in the RDD. Then, I applied
 a
 reduce function which basically merges sorted lists.

 I ran these experiments on an EC2 cluster containing 30 nodes. I set it
 up
 using the spark ec2 script. The data file was stored in HDFS.

 In the second approach I used the sortBy method in Spark.

 I performed these operation on the US census data(100MB) found here

 A single lines looks like this

 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married,
 Not
 in universe or children, Not in universe, White, All other, Female, Not
 in
 universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler,
 Not
 in universe, Not in universe, Child 18 never marr not in subfamily,
 Child
 under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not
 in
 universe, 0, Both parents present, United-States, United-States,
 United-States, Native- Born in the United States, 0, Not in universe,
 0, 0,
 94, - 5.
 I sorted based on the 25th value in the CSV. In this line that is
 1758.14.

 I noticed that sortBy performs worse than the other method. Is this the
 expected scenario? If it is, why wouldn't the mapPartitions() and
 reduce()
 be the default sorting approach?

 Here is my implementation

 public static void sortBy(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 long start = System.currentTimeMillis();
 rdd.sortBy(new FunctionString, Double(){

 @Override
 public Double call(String v1) throws Exception {
   // TODO Auto-generated method stub
   String [] arr = v1.split(,);
   return Double.parseDouble(arr[24]);
 }
 }, true, 9).collect();
 long end = System.currentTimeMillis();
 System.out.println(SortBy:  + (end - start));
   }

 public static void sortList(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 //parallelize(l,
 8);
 long start = System.currentTimeMillis();
 JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 =
 rdd.mapPartitions(new FlatMapFunctionIteratorlt;String,
 LinkedListTuple2lt;Double, String(){

 @Override
 public IterableLinkedListlt;Tuple2lt;Double, String
 call(IteratorString t)
 throws Exception {
   // TODO Auto-generated method stub
   LinkedListTuple2lt;Double, String lines = new
 LinkedListTuple2lt;Double, String();
   while(t.hasNext()){
 String s = t.next();
 String arr1[] = s.split(,);
 Tuple2Double, String t1 = new Tuple2Double,
 String(Double.parseDouble(arr1[24]),s);
 lines.add(t1);
   }
   Collections.sort(lines, new IncomeComparator());
   LinkedListLinkedListlt;Tuple2lt;Double, String list =
 new
 LinkedListLinkedListlt;Tuple2lt;Double, String();
   list.add(lines);
   return list;
 }



 --
 View this message in context:
 

Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Mark Hamstra
Correct.  Trading away scalability for increased performance is not an
option for the standard Spark API.

On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 It would be even faster to load the data on the driver and sort it there
 without using Spark :). Using reduce() is cheating, because it only works
 as long as the data fits on one machine. That is not the targeted use case
 of a distributed computation system. You can repeat your test with more
 data (that doesn't fit on one machine) to see what I mean.

 On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com wrote:

 For a research project, I tried sorting the elements in an RDD. I did
 this in
 two different approaches.

 In the first method, I applied a mapPartitions() function on the RDD, so
 that it would sort the contents of the RDD, and provide a result RDD that
 contains the sorted list as the only record in the RDD. Then, I applied a
 reduce function which basically merges sorted lists.

 I ran these experiments on an EC2 cluster containing 30 nodes. I set it up
 using the spark ec2 script. The data file was stored in HDFS.

 In the second approach I used the sortBy method in Spark.

 I performed these operation on the US census data(100MB) found here

 A single lines looks like this

 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not
 in universe or children, Not in universe, White, All other, Female, Not in
 universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler,
 Not
 in universe, Not in universe, Child 18 never marr not in subfamily, Child
 under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in
 universe, 0, Both parents present, United-States, United-States,
 United-States, Native- Born in the United States, 0, Not in universe, 0,
 0,
 94, - 5.
 I sorted based on the 25th value in the CSV. In this line that is 1758.14.

 I noticed that sortBy performs worse than the other method. Is this the
 expected scenario? If it is, why wouldn't the mapPartitions() and reduce()
 be the default sorting approach?

 Here is my implementation

 public static void sortBy(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 long start = System.currentTimeMillis();
 rdd.sortBy(new FunctionString, Double(){

 @Override
 public Double call(String v1) throws Exception {
   // TODO Auto-generated method stub
   String [] arr = v1.split(,);
   return Double.parseDouble(arr[24]);
 }
 }, true, 9).collect();
 long end = System.currentTimeMillis();
 System.out.println(SortBy:  + (end - start));
   }

 public static void sortList(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 //parallelize(l,
 8);
 long start = System.currentTimeMillis();
 JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 =
 rdd.mapPartitions(new FlatMapFunctionIteratorlt;String,
 LinkedListTuple2lt;Double, String(){

 @Override
 public IterableLinkedListlt;Tuple2lt;Double, String
 call(IteratorString t)
 throws Exception {
   // TODO Auto-generated method stub
   LinkedListTuple2lt;Double, String lines = new
 LinkedListTuple2lt;Double, String();
   while(t.hasNext()){
 String s = t.next();
 String arr1[] = s.split(,);
 Tuple2Double, String t1 = new Tuple2Double,
 String(Double.parseDouble(arr1[24]),s);
 lines.add(t1);
   }
   Collections.sort(lines, new IncomeComparator());
   LinkedListLinkedListlt;Tuple2lt;Double, String list = new
 LinkedListLinkedListlt;Tuple2lt;Double, String();
   list.add(lines);
   return list;
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Problem getting program to run on 15TB input

2015-06-09 Thread Arun Luthra
I found that the problem was due to garbage collection in filter(). Using
Hive to do the filter solved the problem.

A lot of other problems went away when I upgraded to Spark 1.2.0, which
compresses various task overhead data (HighlyCompressedMapStatus etc.).

It has been running very very smoothly with these two changes.

I'm fairly sure that I tried coalesce(), it resulted into tasks that were
too big, the code has evolved too much to easily double check it now.

On Sat, Jun 6, 2015 at 12:50 AM, Kapil Malik kma...@adobe.com wrote:

  Very interesting and relevant thread for production level usage of spark.



 @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?



 Thanks,



 Kapil Malik | kma...@adobe.com | 33430 / 8800836581



 *From:* Daniel Mahler [mailto:dmah...@gmail.com]
 *Sent:* 13 April 2015 15:42
 *To:* Arun Luthra
 *Cc:* Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
 *Subject:* Re: Problem getting program to run on 15TB input



 Sometimes a large number of partitions leads to memory problems.

 Something like



 val rdd1 = sc.textFile(file1).coalesce(500). ...

 val rdd2 = sc.textFile(file2).coalesce(500). ...



 may help.





 On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Everything works smoothly if I do the 99%-removal filter in Hive first.
 So, all the baggage from garbage collection was breaking it.



 Is there a way to filter() out 99% of the data without having to garbage
 collect 99% of the RDD?



 On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote:

 I tried a shorter simper version of the program, with just 1 RDD,
  essentially it is:



 sc.textFile(..., N).map().filter().map( blah = (id,
 1L)).reduceByKey().saveAsTextFile(...)



 Here is a typical GC log trace from one of the yarn container logs:



 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
 real=0.02 secs]

 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
 real=0.04 secs]

 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
 real=0.08 secs]

 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
 real=0.02 secs]

 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
 real=0.02 secs]

 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
 real=0.02 secs]



 So ~9GB is getting GC'ed every few seconds. Which seems like a lot.



 Question: The filter() is removing 99% of the data. Does this 99% of the
 data get GC'ed?



 Now, I was able to finally get to reduceByKey() by reducing the number of
 executor-cores (to 2), based on suggestions at
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.



 I ran this with more executor-memory and less executors (most important
 thing was fewer executor-cores):



 --num-executors 150 \

 --driver-memory 15g \

 --executor-memory 110g \

 --executor-cores 32 \



 But then, reduceByKey() fails with:

 java.lang.OutOfMemoryError: Java heap space









 On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The Spark UI names the line number and name of the operation (repartition
 in this case) that it is performing. Only if this information is wrong
 (just a possibility), could it have started groupByKey already.



 I will try to analyze the amount of skew in the data by using reduceByKey
 (or simply countByKey) which is relatively inexpensive. For the purposes of
 this algorithm I can simply log and remove keys with huge counts, before
 doing groupByKey.



 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).



 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I understand
 how it works. How do you know that you haven't reached the groupbykey
 phase? Are you using a profiler or do yoi base that assumption only on logs?



 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:



 A correction to my first post:




Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Matt Kapilevich
Hi Marcelo,

Thanks. I think something more subtle is happening.

I'm running a single-node cluster, so there's only 1 NM. When I executed
the exact same job the 4th time, the cluster was idle, and there was
nothing else being executed. RM currently reports that I have 6.5GB of
memory and 4 cpus available. However, the job is still stuck in the
ACCEPTED state a day later. Like I mentioned earlier, I'm able to execute
Hadoop jobs fine even now - this problem is specific to Spark.

Thanks,
-Matt

On Tue, Jun 9, 2015 at 12:32 PM, Marcelo Vanzin van...@cloudera.com wrote:

 If your application is stuck in that state, it generally means your
 cluster doesn't have enough resources to start it.

 In the RM logs you can see how many vcores / memory the application is
 asking for, and then you can check your RM configuration to see if that's
 currently available on any single NM.

 On Tue, Jun 9, 2015 at 7:56 AM, Matt Kapilevich matve...@gmail.com
 wrote:

 Hi all,

 I'm manually building Spark from source against 1.4 branch and submitting
 the job against Yarn. I am seeing very strange behavior. The first 2 or 3
 times I submit the job, it runs fine, computes Pi, and exits. The next time
 I run it, it gets stuck in the ACCEPTED state.

 I'm kicking off a job using yarn-client mode like this:

 ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-client  --num-executors 3--driver-memory 4g --executor-memory
 2g--executor-cores 1--queue thequeue
 examples/target/scala-2.10/spark-examples*.jar10

 Here's what ResourceManager shows:[image: Yarn ResourceManager UI]

 In Yarn ResourceManager logs, all I'm seeing is this:

 2015-06-08 14:49:57,166 INFO
 org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler:
 Added Application Attempt appattempt_1433789077942_0004_01 to scheduler
 from user: root
 2015-06-08 14:49:57,166 INFO
 org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
 appattempt_1433789077942_0004_01 State change from SUBMITTED to
 SCHEDULED

 There's nothing in the NodeManager logs (though its up and running), the
 job isn't getting that far.

 It seems to me that there's an issue somewhere between Spark 1.4 and Yarn
 integration. Hadoop runs without any issues. I've ran the below multiple
 times.

 yarn jar
 /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.4.2.jar pi
 16 100

 For reference, I'm compiling the source against 1.4 branch, and running
 it on a single-node cluster with CDH5.4 and Hadoop 2.6, distributed mode. I
 am using the following to compile: mvn -Phadoop-2.6 -Dhadoop.version=2.6.0
 -Pyarn -Phive -Phive-thriftserver -DskipTests clean package

 Any help appreciated.

 Thanks,
 -Matt




 --
 Marcelo



Re: Cassandra Submit

2015-06-09 Thread Yasemin Kaya
My jar files are:

cassandra-driver-core-2.1.5.jar
cassandra-thrift-2.1.3.jar
guava-18.jar
jsr166e-1.1.0.jar
spark-assembly-1.3.0.jar
spark-cassandra-connector_2.10-1.3.0-M1.jar
spark-cassandra-connector-java_2.10-1.3.0-M1.jar
spark-core_2.10-1.3.1.jar
spark-streaming_2.10-1.3.1.jar

And my code from datastax spark-cassandra-connector
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java
.

Thanx alot.
yasemin

2015-06-09 18:58 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 hm. Yeah, your port is good...have you seen this thread:
 http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector
 ? It seems that you might be running into version mis-match issues?

 What versions of Spark/Cassandra-connector are you trying to use?

 On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.com wrote:

 Sorry my answer I hit terminal lsof -i:9160: result is

 lsof -i:9160
 COMMAND  PIDUSER   FD   TYPE DEVICE SIZE/OFF NODE NAME
 java7597 inosens  101u  IPv4  85754  0t0  TCP localhost:9160
 (LISTEN)

 so 9160 port is available or not ?

 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Yes my cassandra is listening on 9160 I think. Actually I know from yaml
 file. The file includes :

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160

 I check the port nc -z localhost 9160; echo $? it returns me 0. I
 think it close, should I open this port ?

 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Is your cassandra installation actually listening on 9160?

 lsof -i :9160COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
 java29232 ykadiysk   69u  IPv4 42152497  0t0  TCP localhost:9160 
 (LISTEN)

 ​
 I am running an out-of-the box cassandra conf where

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160



 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote:

 I couldn't find any solution. I can write but I can't read from
 Cassandra.

 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Thanks alot Mohammed, Gerard and Yana.
 I can write to table, but exception returns me. It says *Exception
 in thread main java.io.IOException: Failed to open thrift connection to
 Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160*

 In yaml file :
 rpc_address: localhost
 rpc_port: 9160

 And at project :

 .set(spark.cassandra.connection.host, 127.0.0.1)
 .set(spark.cassandra.connection.rpc.port, 9160);

 or

 .set(spark.cassandra.connection.host, localhost)
 .set(spark.cassandra.connection.rpc.port, 9160);

 whatever I write setting,  I get same exception. Any help ??


 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 yes, whatever you put for listen_address in cassandra.yaml. Also,
 you should try to connect to your cassandra cluster via bin/cqlsh to 
 make
 sure you have connectivity before you try to make a a connection via 
 spark.

 On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi,
 I run my project on local. How can find ip address of my cassandra
 host ? From cassandra.yaml or ??

 yasemin

 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com:

 ? = ip address of your cassandra host

 On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should I
 change ? Should I change cassandra.yaml ?

 Error says me *Exception in thread main java.io.IOException:
 Failed to open native connection to Cassandra at {127.0.1.1}:9042*

 What should I add *SparkConf sparkConf = new
 SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts,
 true).set(spark.cassandra.connection.host, ?);*

 Best
 yasemin

 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com
 :

  Check your spark.cassandra.connection.host setting. It should
 be pointing to one of your Cassandra nodes.



 Mohammed



 *From:* Yasemin Kaya [mailto:godo...@gmail.com]
 *Sent:* Friday, June 5, 2015 7:31 AM
 *To:* user@spark.apache.org
 *Subject:* Cassandra Submit



 Hi,



 I am using cassandraDB in my project. I had that error *Exception
 in thread main java.io.IOException: Failed to open native 
 connection to
 Cassandra at {127.0.1.1}:9042*



 I think I have to modify the submit line. What should I add or
 remove when I submit my project?



 Best,

 yasemin





 --

 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç





 --
 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç




 --
 hiç ender hiç





-- 
hiç ender hiç


RE: Cassandra Submit

2015-06-09 Thread Mohammed Guller
It is strange that writes works but read does not. If it was a Cassandra 
connectivity issue, then neither write or read would work. Perhaps the problem 
is somewhere else.

Can you send the complete exception trace?

Also, just to make sure that there is no DNS issue, try this:
~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h 127.0.0.1 -p 9160

Mohammed

From: Yasemin Kaya [mailto:godo...@gmail.com]
Sent: Tuesday, June 9, 2015 11:32 AM
To: Yana Kadiyska
Cc: Gerard Maas; Mohammed Guller; user@spark.apache.org
Subject: Re: Cassandra Submit

I removed core and streaming jar. And the exception still same.

I tried what you said then results:

~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h localhost -p 9160
Connected to: Test Cluster on localhost/9160
Welcome to Cassandra CLI version 2.1.5

The CLI is deprecated and will be removed in Cassandra 3.0.  Consider migrating 
to cqlsh.
CQL is fully backwards compatible with Thrift data; see 
http://www.datastax.com/dev/blog/thrift-to-cql3

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown]

and

~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh
Connected to Test Cluster at 127.0.0.1:9042http://127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh

Thank you for your kind responses ...


2015-06-09 20:59 GMT+03:00 Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com:
Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly 
1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix 
versions. Spark-assembly bundless all spark packages, so either do them 
separately or use spark-assembly but don't mix like you've shown.

As to the port issue -- what about this:

$bin/cassandra-cli -h localhost -p 9160
Connected to: Test Cluster on localhost/9160
Welcome to Cassandra CLI version 2.1.5


On Tue, Jun 9, 2015 at 1:29 PM, Yasemin Kaya 
godo...@gmail.commailto:godo...@gmail.com wrote:
My jar files are:

cassandra-driver-core-2.1.5.jar
cassandra-thrift-2.1.3.jar
guava-18.jar
jsr166e-1.1.0.jar
spark-assembly-1.3.0.jar
spark-cassandra-connector_2.10-1.3.0-M1.jar
spark-cassandra-connector-java_2.10-1.3.0-M1.jar
spark-core_2.10-1.3.1.jar
spark-streaming_2.10-1.3.1.jar

And my code from datastax 
spark-cassandra-connectorhttps://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java.

Thanx alot.
yasemin

2015-06-09 18:58 GMT+03:00 Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com:
hm. Yeah, your port is good...have you seen this thread: 
http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector
 ? It seems that you might be running into version mis-match issues?

What versions of Spark/Cassandra-connector are you trying to use?

On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya 
godo...@gmail.commailto:godo...@gmail.com wrote:
Sorry my answer I hit terminal lsof -i:9160: result is

lsof -i:9160
COMMAND  PIDUSER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java7597 inosens  101u  IPv4  85754  0t0  TCP localhost:9160 (LISTEN)

so 9160 port is available or not ?

2015-06-09 17:16 GMT+03:00 Yasemin Kaya 
godo...@gmail.commailto:godo...@gmail.com:
Yes my cassandra is listening on 9160 I think. Actually I know from yaml file. 
The file includes :

rpc_address: localhost
# port for Thrift to listen for clients on
rpc_port: 9160

I check the port nc -z localhost 9160; echo $? it returns me 0. I think it 
close, should I open this port ?

2015-06-09 16:55 GMT+03:00 Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com:
Is your cassandra installation actually listening on 9160?

lsof -i :9160COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME

java29232 ykadiysk   69u  IPv4 42152497  0t0  TCP localhost:9160 
(LISTEN)
​
I am running an out-of-the box cassandra conf where

rpc_address: localhost
# port for Thrift to listen for clients on
rpc_port: 9160



On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya 
godo...@gmail.commailto:godo...@gmail.com wrote:
I couldn't find any solution. I can write but I can't read from Cassandra.

2015-06-09 8:52 GMT+03:00 Yasemin Kaya 
godo...@gmail.commailto:godo...@gmail.com:
Thanks alot Mohammed, Gerard and Yana.
I can write to table, but exception returns me. It says Exception in thread 
main java.io.IOException: Failed to open thrift connection to Cassandra at 
127.0.0.1:9160http://127.0.0.1:9160

In yaml file :
rpc_address: localhost
rpc_port: 9160

And at project :

.set(spark.cassandra.connection.host, 127.0.0.1)
.set(spark.cassandra.connection.rpc.port, 9160);

or

.set(spark.cassandra.connection.host, localhost)
.set(spark.cassandra.connection.rpc.port, 9160);

whatever I write setting,  I get same exception. Any help ??


2015-06-08 18:23 GMT+03:00 Yana Kadiyska 

Re: Running SparkSql against Hive tables

2015-06-09 Thread James Pirz
Thanks Ayan, I used beeline in Spark to connect to Hiveserver2 that I
started from my Hive. So as you said, It is really interacting with Hive as
a typical 3rd party application, and it is NOT using Spark execution
engine. I was thinking that it gets metastore info from Hive, but uses
Spark to execute the query.

I already have created  loaded tables in Hive, and now I want to use Spark
to run SQL queries against those tables. I just want to submit SQL queries
in Spark, and against the data in Hive, wout writing an application (Just
similar to the way that one would pass SQL scripts to Hive or Shark). Going
through the Spark documentation, I realized Spark SQL is the component that
I need to use. But do you mean I have to write a client Spark application
to do that ? Is there any way that one could pass SQL scripts directly
through command-line  Spark runs it in distributed mode on the cluster,
against the already existing data in Hive ?

On Mon, Jun 8, 2015 at 5:53 PM, ayan guha guha.a...@gmail.com wrote:

 I am afraid you are going other way around :) If you want to use Hive in
 spark, you'd need a HiveContext with  hive config files in spark cluster
 (eveery node). This was spark can talk to hive metastore. Then you can
 write queries on hive table using hiveContext's sql method and spark will
 run it (either by reading from hive and creating RDD or lettinghive run the
 query using MR). Final result will be a spark dataFrame.

 What you currently doing is using beeline to connect to hive, which should
 work even without spark.

 Best
 Ayan

 On Tue, Jun 9, 2015 at 10:42 AM, James Pirz james.p...@gmail.com wrote:

 Thanks for the help!
 I am actually trying Spark SQL to run queries against tables that I've
 defined in Hive.

 I follow theses steps:
 - I start hiveserver2 and in Spark, I start Spark's Thrift server by:
 $SPARK_HOME/sbin/start-thriftserver.sh --master
 spark://spark-master-node-ip:7077

 - and I start beeline:
 $SPARK_HOME/bin/beeline

 - In my beeline session, I connect to my running hiveserver2
 !connect jdbc:hive2://hive-node-ip:1

 and I can run queries successfully. But based on hiveserver2 logs, It
 seems it actually uses Hadoop's MR to run queries,  *not* Spark's
 workers. My goals is to access Hive's tables' data, but run queries through
 Spark SQL using Spark workers (not Hadoop).

 Is it possible to do that via Spark SQL (its CLI) or through its thrift
 server ? (I tried to find some basic examples in the documentation, but I
 was not able to) - Any suggestion or hint on how I can do that would be
 highly appreciated.

 Thnx

 On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian lian.cs@gmail.com wrote:



 On 6/6/15 9:06 AM, James Pirz wrote:

 I am pretty new to Spark, and using Spark 1.3.1, I am trying to use
 'Spark SQL' to run some SQL scripts, on the cluster. I realized that for a
 better performance, it is a good idea to use Parquet files. I have 2
 questions regarding that:

  1) If I wanna use Spark SQL against  *partitioned  bucketed* tables
 with Parquet format in Hive, does the provided spark binary on the apache
 website support that or do I need to build a new spark binary with some
 additional flags ? (I found a note
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables
  in
 the documentation about enabling Hive support, but I could not fully get it
 as what the correct way of building is, if I need to build)

 Yes, Hive support is enabled by default now for the binaries on the
 website. However, currently Spark SQL doesn't support buckets yet.


  2) Does running Spark SQL against tables in Hive downgrade the
 performance, and it is better that I load parquet files directly to HDFS or
 having Hive in the picture is harmless ?

 If you're using Parquet, then it should be fine since by default Spark
 SQL uses its own native Parquet support to read Parquet Hive tables.


  Thnx






 --
 Best Regards,
 Ayan Guha



Re: Cassandra Submit

2015-06-09 Thread Yana Kadiyska
Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly
1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix
versions. Spark-assembly bundless all spark packages, so either do them
separately or use spark-assembly but don't mix like you've shown.

As to the port issue -- what about this:

$bin/cassandra-cli -h localhost -p 9160
Connected to: Test Cluster on localhost/9160
Welcome to Cassandra CLI version 2.1.5


On Tue, Jun 9, 2015 at 1:29 PM, Yasemin Kaya godo...@gmail.com wrote:

 My jar files are:

 cassandra-driver-core-2.1.5.jar
 cassandra-thrift-2.1.3.jar
 guava-18.jar
 jsr166e-1.1.0.jar
 spark-assembly-1.3.0.jar
 spark-cassandra-connector_2.10-1.3.0-M1.jar
 spark-cassandra-connector-java_2.10-1.3.0-M1.jar
 spark-core_2.10-1.3.1.jar
 spark-streaming_2.10-1.3.1.jar

 And my code from datastax spark-cassandra-connector
 https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java
 .

 Thanx alot.
 yasemin

 2015-06-09 18:58 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 hm. Yeah, your port is good...have you seen this thread:
 http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector
 ? It seems that you might be running into version mis-match issues?

 What versions of Spark/Cassandra-connector are you trying to use?

 On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.com wrote:

 Sorry my answer I hit terminal lsof -i:9160: result is

 lsof -i:9160
 COMMAND  PIDUSER   FD   TYPE DEVICE SIZE/OFF NODE NAME
 java7597 inosens  101u  IPv4  85754  0t0  TCP localhost:9160
 (LISTEN)

 so 9160 port is available or not ?

 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Yes my cassandra is listening on 9160 I think. Actually I know from
 yaml file. The file includes :

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160

 I check the port nc -z localhost 9160; echo $? it returns me 0. I
 think it close, should I open this port ?

 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Is your cassandra installation actually listening on 9160?

 lsof -i :9160COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE 
 NAME
 java29232 ykadiysk   69u  IPv4 42152497  0t0  TCP localhost:9160 
 (LISTEN)

 ​
 I am running an out-of-the box cassandra conf where

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160



 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 I couldn't find any solution. I can write but I can't read from
 Cassandra.

 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Thanks alot Mohammed, Gerard and Yana.
 I can write to table, but exception returns me. It says *Exception
 in thread main java.io.IOException: Failed to open thrift connection 
 to
 Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160*

 In yaml file :
 rpc_address: localhost
 rpc_port: 9160

 And at project :

 .set(spark.cassandra.connection.host, 127.0.0.1)
 .set(spark.cassandra.connection.rpc.port, 9160);

 or

 .set(spark.cassandra.connection.host, localhost)
 .set(spark.cassandra.connection.rpc.port, 9160);

 whatever I write setting,  I get same exception. Any help ??


 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 yes, whatever you put for listen_address in cassandra.yaml. Also,
 you should try to connect to your cassandra cluster via bin/cqlsh to 
 make
 sure you have connectivity before you try to make a a connection via 
 spark.

 On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi,
 I run my project on local. How can find ip address of my
 cassandra host ? From cassandra.yaml or ??

 yasemin

 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com:

 ? = ip address of your cassandra host

 On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should
 I change ? Should I change cassandra.yaml ?

 Error says me *Exception in thread main java.io.IOException:
 Failed to open native connection to Cassandra at {127.0.1.1}:9042*

 What should I add *SparkConf sparkConf = new
 SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts,
 true).set(spark.cassandra.connection.host, ?);*

 Best
 yasemin

 2015-06-06 3:04 GMT+03:00 Mohammed Guller 
 moham...@glassbeam.com:

  Check your spark.cassandra.connection.host setting. It should
 be pointing to one of your Cassandra nodes.



 Mohammed



 *From:* Yasemin Kaya [mailto:godo...@gmail.com]
 *Sent:* Friday, June 5, 2015 7:31 AM
 *To:* user@spark.apache.org
 *Subject:* Cassandra Submit



 Hi,



 I am using cassandraDB in my project. I had that error *Exception
 in thread main java.io.IOException: Failed to open native 
 connection to
 Cassandra at {127.0.1.1}:9042*



 I think I have to 

Re: Running SparkSql against Hive tables

2015-06-09 Thread James Pirz
I am trying to use Spark 1.3 (Standalone) against Hive 1.2 running on
Hadoop 2.6.
I looked the ThriftServer2 logs, and I realized that the server was not
starting properly, because of failure in creating a server socket. In fact,
I had passed the URI to my Hiveserver2 service, launched from Hive, and the
beeline in Spark was directly talking to Hive's hiveserver2 and it was just
using it as a Hive service.

I could fix starting the Thriftserver2 in Spark (by changing port), but I
guess the missing puzzle piece for me is: How does Spark SQL re-uses the
already created table in Hive ? I mean do I have to write an application
that uses HiveContext to do that and submit it to Spark for execution, or
is there a way to run SQL scripts directly via command line (in distributed
mode and on the cluster) - (Just similar to the way that one would use Hive
(or Shark) command line by passing a query file with -f flag). Looking at
the Spark SQL documentation, it seems that it is possible. Please correct
me if I am wrong.

On Mon, Jun 8, 2015 at 6:56 PM, Cheng Lian lian.cs@gmail.com wrote:


 On 6/9/15 8:42 AM, James Pirz wrote:

 Thanks for the help!
 I am actually trying Spark SQL to run queries against tables that I've
 defined in Hive.

  I follow theses steps:
 - I start hiveserver2 and in Spark, I start Spark's Thrift server by:
 $SPARK_HOME/sbin/start-thriftserver.sh --master
 spark://spark-master-node-ip:7077

  - and I start beeline:
 $SPARK_HOME/bin/beeline

  - In my beeline session, I connect to my running hiveserver2
 !connect jdbc:hive2://hive-node-ip:1

  and I can run queries successfully. But based on hiveserver2 logs, It
 seems it actually uses Hadoop's MR to run queries,  *not* Spark's
 workers. My goals is to access Hive's tables' data, but run queries through
 Spark SQL using Spark workers (not Hadoop).

 Hm, interesting. HiveThriftServer2 should never issue MR jobs to perform
 queries. I did receive two reports in the past which also say MR jobs
 instead of Spark jobs were issued to perform the SQL query. However, I only
 reproduced this issue in a rare corner case, which uses HTTP mode to
 connect to Hive 0.12.0. Apparently this isn't your case. Would you mind to
 provide more details so that I can dig in?  The following information would
 be very helpful:

 1. Hive version
 2. A copy of your hive-site.xml
 3. Hadoop version
 4. Full HiveThriftServer2 log (which can be found in $SPARK_HOME/logs)

 Thanks in advance!


  Is it possible to do that via Spark SQL (its CLI) or through its thrift
 server ? (I tried to find some basic examples in the documentation, but I
 was not able to) - Any suggestion or hint on how I can do that would be
 highly appreciated.

  Thnx

 On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian lian.cs@gmail.com wrote:



 On 6/6/15 9:06 AM, James Pirz wrote:

 I am pretty new to Spark, and using Spark 1.3.1, I am trying to use
 'Spark SQL' to run some SQL scripts, on the cluster. I realized that for a
 better performance, it is a good idea to use Parquet files. I have 2
 questions regarding that:

  1) If I wanna use Spark SQL against  *partitioned  bucketed* tables
 with Parquet format in Hive, does the provided spark binary on the apache
 website support that or do I need to build a new spark binary with some
 additional flags ? (I found a note
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables
  in
 the documentation about enabling Hive support, but I could not fully get it
 as what the correct way of building is, if I need to build)

  Yes, Hive support is enabled by default now for the binaries on the
 website. However, currently Spark SQL doesn't support buckets yet.


  2) Does running Spark SQL against tables in Hive downgrade the
 performance, and it is better that I load parquet files directly to HDFS or
 having Hive in the picture is harmless ?

  If you're using Parquet, then it should be fine since by default Spark
 SQL uses its own native Parquet support to read Parquet Hive tables.


  Thnx







Kafka Spark Streaming: ERROR EndpointWriter: dropping message

2015-06-09 Thread karma243
Hello,

While trying to link kafka to spark, I'm not able to get data from kafka.
This is the error that I'm getting from spark logs:

ERROR EndpointWriter: dropping message [class
akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://sparkMaster@localhost:7077/]] arriving at
[akka.tcp://sparkMaster@localhost:7077] inbound addresses are
[akka.tcp://sparkMaster@karma-HP-Pavilion-g6-Notebook-PC:7077]




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark eventLog and history server

2015-06-09 Thread Richard Marscher
Hi,

I don't have a complete answer to your questions but:

Removing the suffix does not solve the problem - unfortunately this is
true, the master web UI only tries to build out a Spark UI from the event
logs once, at the time the context is closed. If the event logs are
in-progress at this time, then you basically missed the opportunity.

Does it mean I don't need to start history server if I only use spark in
standalone mode? - Yes, you don't need to start the history server.

On Mon, Jun 8, 2015 at 7:57 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 Event log is enabled in my spark streaming app. My code runs in standalone
 mode and the spark version is 1.3.1. I periodically stop and restart the
 streaming context by calling ssc.stop(). However, from the web UI, when
 clicking on a past job, it says the job is still in progress and does not
 show the event log. The event log files have suffix .inprogress. Removing
 the suffix does not solve the problem. Do I need to do anything here in
 order to view the event logs of finished jobs? Or do I need to stop ssc
 differently?

 In addition, the documentation seems to suggest history server is used for
 Mesos or YARN mode. Does it mean I don't need to start history server if I
 only use spark in standalone mode?

 Thanks,
 Du



Re: Cassandra Submit

2015-06-09 Thread Yasemin Kaya
I removed core and streaming jar. And the exception still same.

I tried what you said then results:

~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h localhost -p 9160
Connected to: Test Cluster on localhost/9160
Welcome to Cassandra CLI version 2.1.5

The CLI is deprecated and will be removed in Cassandra 3.0.  Consider
migrating to cqlsh.
CQL is fully backwards compatible with Thrift data; see
http://www.datastax.com/dev/blog/thrift-to-cql3

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown]

and

~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh

Thank you for your kind responses ...


2015-06-09 20:59 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly
 1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix
 versions. Spark-assembly bundless all spark packages, so either do them
 separately or use spark-assembly but don't mix like you've shown.

 As to the port issue -- what about this:

 $bin/cassandra-cli -h localhost -p 9160
 Connected to: Test Cluster on localhost/9160
 Welcome to Cassandra CLI version 2.1.5


 On Tue, Jun 9, 2015 at 1:29 PM, Yasemin Kaya godo...@gmail.com wrote:

 My jar files are:

 cassandra-driver-core-2.1.5.jar
 cassandra-thrift-2.1.3.jar
 guava-18.jar
 jsr166e-1.1.0.jar
 spark-assembly-1.3.0.jar
 spark-cassandra-connector_2.10-1.3.0-M1.jar
 spark-cassandra-connector-java_2.10-1.3.0-M1.jar
 spark-core_2.10-1.3.1.jar
 spark-streaming_2.10-1.3.1.jar

 And my code from datastax spark-cassandra-connector
 https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java
 .

 Thanx alot.
 yasemin

 2015-06-09 18:58 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 hm. Yeah, your port is good...have you seen this thread:
 http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector
 ? It seems that you might be running into version mis-match issues?

 What versions of Spark/Cassandra-connector are you trying to use?

 On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.com wrote:

 Sorry my answer I hit terminal lsof -i:9160: result is

 lsof -i:9160
 COMMAND  PIDUSER   FD   TYPE DEVICE SIZE/OFF NODE NAME
 java7597 inosens  101u  IPv4  85754  0t0  TCP localhost:9160
 (LISTEN)

 so 9160 port is available or not ?

 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Yes my cassandra is listening on 9160 I think. Actually I know from
 yaml file. The file includes :

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160

 I check the port nc -z localhost 9160; echo $? it returns me 0. I
 think it close, should I open this port ?

 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Is your cassandra installation actually listening on 9160?

 lsof -i :9160COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE 
 NAME
 java29232 ykadiysk   69u  IPv4 42152497  0t0  TCP localhost:9160 
 (LISTEN)

 ​
 I am running an out-of-the box cassandra conf where

 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: 9160



 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 I couldn't find any solution. I can write but I can't read from
 Cassandra.

 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Thanks alot Mohammed, Gerard and Yana.
 I can write to table, but exception returns me. It says *Exception
 in thread main java.io.IOException: Failed to open thrift connection 
 to
 Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160*

 In yaml file :
 rpc_address: localhost
 rpc_port: 9160

 And at project :

 .set(spark.cassandra.connection.host, 127.0.0.1)
 .set(spark.cassandra.connection.rpc.port, 9160);

 or

 .set(spark.cassandra.connection.host, localhost)
 .set(spark.cassandra.connection.rpc.port, 9160);

 whatever I write setting,  I get same exception. Any help ??


 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 yes, whatever you put for listen_address in cassandra.yaml. Also,
 you should try to connect to your cassandra cluster via bin/cqlsh to 
 make
 sure you have connectivity before you try to make a a connection via 
 spark.

 On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi,
 I run my project on local. How can find ip address of my
 cassandra host ? From cassandra.yaml or ??

 yasemin

 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com:

 ? = ip address of your cassandra host

 On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com
  wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should
 I change ? Should I change cassandra.yaml ?

 Error says me *Exception 

Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out

2015-06-09 Thread Snehal Nagmote
Hi Dibyendu,

Thank you for your reply.

I am using Kafka https://github.com/dibbhatt/kafka-spark-consumer which
uses spark-core and spark-streaming  *1.2.2*

Spark cluster on which I am running application is* 1.3.1* . I will test it
with latest changes .

Yes Underlying BlockManager gives error and tries 3 times and then die.
Problem is even if it dies,  I cant restart it because application status
says running

Looks like its related to the issue
https://issues.apache.org/jira/browse/SPARK-5220


Thanks,
Snehal







On 8 June 2015 at 20:56, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Seems to be related to this JIRA :
 https://issues.apache.org/jira/browse/SPARK-3612 ?



 On Tue, Jun 9, 2015 at 7:39 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Snehal

 Are you running the latest kafka consumer from github/spark-packages ? If
 not can you take the latest changes. This low level receiver will make
 attempt to keep trying if underlying BlockManager gives error. Are you see
 those retry cycle in log ? If yes then there is issue writing blocks to
 blockmanager and spark not able to recover from this failure but Receivet
 keep trying ..

 Which version of Spark you are using ?

 Dibyendu
 On Jun 9, 2015 5:14 AM, Snehal Nagmote nagmote.sne...@gmail.com
 wrote:

 All,

 I am using Kafka Spark Consumer
 https://github.com/dibbhatt/kafka-spark-consumer  in  spark streaming
 job .

 After spark streaming job runs for few hours , all executors exit and I
 still see status of application on SPARK UI as running

 Does anyone know cause of this exception and how to fix this ?


  WARN  [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - 
 Error reported by receiver for stream 7: Error While Store for Partition 
 Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - 
 org.apache.spark.SparkException: Error sending message [message = 
 UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, 
 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 
 1),10492,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
 at 
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at 
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at 
 org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
 at 
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812)
 at 
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
 at 
 org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71)
 at 
 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161)
 at 
 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136)
 at 
 org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152)
 at consumer.kafka.PartitionManager.next(PartitionManager.java:215)
 at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75)
 at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
 [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

 ... 14 more WARN  
 [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error 
 sending message [message = UpdateBlockInfo(BlockManagerId(driver, 
 dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, 
 false, false, 1),0,0,0)] in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
 at 
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
 at 
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
 at scala.concurrent.Await$.result(package.scala:107)
 at 

Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Matt Kapilevich
Yes! If I either specify a different queue or don't specify a queue at all,
it works.

On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Does it work if you don't specify a queue?

 On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com
 wrote:

 Hi Marcelo,

 Yes, restarting YARN fixes this behavior and it again works the first few
 times. The only thing that's consistent is that once Spark job submissions
 stop working, it's broken for good.

 On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Apologies, I see you already posted everything from the RM logs that
 mention your stuck app.

 Have you tried restarting the YARN cluster to see if that changes
 anything? Does it go back to the first few tries work behaviour?

 I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like
 this.


 On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com
 wrote:

  Like I mentioned earlier, I'm able to execute Hadoop jobs fine even
 now - this problem is specific to Spark.


 That doesn't necessarily mean anything. Spark apps have different
 resource requirements than Hadoop apps.

 Check your RM logs for any line that mentions your Spark app id. That
 may give you some insight into what's happening or not.

 --
 Marcelo




 --
 Marcelo





 --
 Marcelo



Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Matt Kapilevich
From the RM scheduler, I see 3 applications currently stuck in the
root.thequeue queue.

Used Resources: memory:0, vCores:0
Num Active Applications: 0
Num Pending Applications: 3
Min Resources: memory:0, vCores:0
Max Resources: memory:6655, vCores:4
Steady Fair Share: memory:1664, vCores:0
Instantaneous Fair Share: memory:6655, vCores:0

On Tue, Jun 9, 2015 at 4:30 PM, Matt Kapilevich matve...@gmail.com wrote:

 Yes! If I either specify a different queue or don't specify a queue at
 all, it works.

 On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Does it work if you don't specify a queue?

 On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com
 wrote:

 Hi Marcelo,

 Yes, restarting YARN fixes this behavior and it again works the first
 few times. The only thing that's consistent is that once Spark job
 submissions stop working, it's broken for good.

 On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Apologies, I see you already posted everything from the RM logs that
 mention your stuck app.

 Have you tried restarting the YARN cluster to see if that changes
 anything? Does it go back to the first few tries work behaviour?

 I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like
 this.


 On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com
 wrote:

  Like I mentioned earlier, I'm able to execute Hadoop jobs fine even
 now - this problem is specific to Spark.


 That doesn't necessarily mean anything. Spark apps have different
 resource requirements than Hadoop apps.

 Check your RM logs for any line that mentions your Spark app id. That
 may give you some insight into what's happening or not.

 --
 Marcelo




 --
 Marcelo





 --
 Marcelo





[SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or

2015-06-09 Thread Shuai Zheng
Hi All,

 

I have some code to access s3 from Spark. The code is as simple as:

 

  JavaSparkContext ctx = new JavaSparkContext(sparkConf);

  Configuration hadoopConf = ctx.hadoopConfiguration();

  // aws.secretKey=Zqhjim3GB69hMBvfjh+7NX84p8sMF39BHfXwO3Hs

  // aws.accessKey=AKIAI4YXBAJTJ77VKS4A

 

  hadoopConf.set(fs.s3n.impl,
org.apache.hadoop.fs.s3native.NativeS3FileSystem);

  hadoopConf.set(fs.s3n.awsAccessKeyId,
---);

  hadoopConf.set(fs.s3n.awsSecretAccessKey,
--);

  SQLContext sql = new SQLContext(ctx);

  DataFrame grid_lookup =
sql.parquetFile(s3n://---);

  grid_lookup.count();

  ctx.stop();

 

The code works for 1.3.1. And for 1.4.0 and latest 1.5.0, it always give me
below exception:

 

Exception in thread main java.lang.IllegalArgumentException: AWS Access
Key ID and Secret Access Key must be specified as the username or password
(respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
fs.s3.awsSecretAccessKey properties (respectively).

 

I don't know why, I remember this is a known issue in 1.3.0:
https://issues.apache.org/jira/browse/SPARK-6330, and solved in 1.3.1

But now it is not working again for a newer version? 

 

I remember while I switched to 1.4.0, for a while it works (while I worked
with the master branch of the latest source code), and I just refresh latest
code, and I am given this error again. 

 

Anyone has idea?

 

Regards,

 

Shuai



Linear Regression with SGD

2015-06-09 Thread Stephen Carman
Hi User group,

We are using spark Linear Regression with SGD as the optimization technique and 
we are achieving very sub-optimal results.

Can anyone shed some light on why this implementation seems to produce such 
poor results vs our own implementation?

We are using a very small dataset, but we have to use a very large number of 
iterations to achieve similar results to our implementation, we’ve tried 
normalizing the data
not normalizing the data and tuning every param. Our implementation is a closed 
form solution so we should be guaranteed convergence but the spark one is not, 
which is
understandable, but why is it so far off?

Has anyone experienced this?

Steve Carman, M.S.
Artificial Intelligence Engineer
Coldlight-PTC
scar...@coldlight.com
This e-mail is intended solely for the above-mentioned recipient and it may 
contain confidential or privileged information. If you have received it in 
error, please notify us immediately and delete the e-mail. You must not copy, 
distribute, disclose or take any action in reliance on it. In addition, the 
contents of an attachment to this e-mail may contain software viruses which 
could damage your own computer system. While ColdLight Solutions, LLC has taken 
every reasonable precaution to minimize this risk, we cannot accept liability 
for any damage which you sustain as a result of software viruses. You should 
perform your own virus checks before opening the attachment.


Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Marcelo Vanzin
Apologies, I see you already posted everything from the RM logs that
mention your stuck app.

Have you tried restarting the YARN cluster to see if that changes anything?
Does it go back to the first few tries work behaviour?

I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like
this.


On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com
 wrote:

  Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now
 - this problem is specific to Spark.


 That doesn't necessarily mean anything. Spark apps have different resource
 requirements than Hadoop apps.

 Check your RM logs for any line that mentions your Spark app id. That may
 give you some insight into what's happening or not.

 --
 Marcelo




-- 
Marcelo


Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Marcelo Vanzin
Does it work if you don't specify a queue?

On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com wrote:

 Hi Marcelo,

 Yes, restarting YARN fixes this behavior and it again works the first few
 times. The only thing that's consistent is that once Spark job submissions
 stop working, it's broken for good.

 On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Apologies, I see you already posted everything from the RM logs that
 mention your stuck app.

 Have you tried restarting the YARN cluster to see if that changes
 anything? Does it go back to the first few tries work behaviour?

 I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like
 this.


 On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com
 wrote:

  Like I mentioned earlier, I'm able to execute Hadoop jobs fine even
 now - this problem is specific to Spark.


 That doesn't necessarily mean anything. Spark apps have different
 resource requirements than Hadoop apps.

 Check your RM logs for any line that mentions your Spark app id. That
 may give you some insight into what's happening or not.

 --
 Marcelo




 --
 Marcelo





-- 
Marcelo


Re: which database for gene alignment data ?

2015-06-09 Thread roni
Hi Frank,
Thanks for the reply. I downloaded ADAM and built it but it does not seem
to list this function for command line options.
Are these exposed as public API and I can call it from code ?

Also , I need to save all my intermediate data.  Seems like ADAM stores
data in Parquet on HDFS.
I want to save something in an external database, so that  we can re-use
the saved data in multiple ways by multiple people.
Any suggestions on the DB selection or keeping data centralized for use by
multiple distinct groups?
Thanks
-Roni



On Mon, Jun 8, 2015 at 12:47 PM, Frank Austin Nothaft fnoth...@berkeley.edu
 wrote:

 Hi Roni,

 We have a full suite of genomic feature parsers that can read BED,
 narrowPeak, GATK interval lists, and GTF/GFF into Spark RDDs in ADAM
 https://github.com/bigdatagenomics/adam  Additionally, we have support
 for efficient overlap joins (query 3 in your email below). You can load the
 genomic features with ADAMContext.loadFeatures
 https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala#L438.
 We have two tools for the overlap computation: you can use a
 BroadcastRegionJoin
 https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala
  if
 one of the datasets you want to overlap is small or a ShuffleRegionJoin
 https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala
  if
 both datasets are large.

 Regards,

 Frank Austin Nothaft
 fnoth...@berkeley.edu
 fnoth...@eecs.berkeley.edu
 202-340-0466

 On Jun 8, 2015, at 9:39 PM, roni roni.epi...@gmail.com wrote:

 Sorry for the delay.
 The files (called .bed files) have format like -

 Chromosome start  endfeature score  strand

 chr1   713776  714375  peak.1  599+
 chr1   752401  753000  peak.2  599+

 The mandatory fields are


1. chrom - The name of the chromosome (e.g. chr3, chrY, chr2_random) or 
 scaffold (e.g. scaffold10671).
2. chromStart - The starting position of the feature in the chromosome or 
 scaffold. The first base in a chromosome is numbered 0.
3. chromEnd - The ending position of the feature in the chromosome or 
 scaffold. The *chromEnd* base is not included in the display of the feature. 
 For example, the first 100 bases of a chromosome are defined as 
 *chromStart=0, chromEnd=100*, and span the bases numbered 0-99.

 There can be more data as described - 
 https://genome.ucsc.edu/FAQ/FAQformat.html#format1
 Many times the use cases are like
 1. find the features between given start and end positions
 2.Find features which have overlapping start and end points with another 
 feature.
 3. read external (reference) data which will have similar format (chr10   
 4851478549604641MAPK8   49514785+) and find all the 
 data points which are overlapping with the other  .bed files.

 The data is huge. .bed files can range from .5 GB to 5 gb (or more)
 I was thinking of using cassandra, but not sue if the overlapping queries can 
 be supported and will be fast enough.

 Thanks for the help
 -Roni


 On Sat, Jun 6, 2015 at 7:03 AM, Ted Yu yuzhih...@gmail.com wrote:

 Can you describe your use case in a bit more detail since not all people
 on this mailing list are familiar with gene sequencing alignments data ?

 Thanks

 On Fri, Jun 5, 2015 at 11:42 PM, roni roni.epi...@gmail.com wrote:

 I want to use spark for reading compressed .bed file for reading gene
 sequencing alignments data.
 I want to store bed file data in db and then use external gene
 expression data to find overlaps etc, which database is best for it ?
 Thanks
 -Roni







Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Matt Kapilevich
Hi Marcelo,

Yes, restarting YARN fixes this behavior and it again works the first few
times. The only thing that's consistent is that once Spark job submissions
stop working, it's broken for good.

On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Apologies, I see you already posted everything from the RM logs that
 mention your stuck app.

 Have you tried restarting the YARN cluster to see if that changes
 anything? Does it go back to the first few tries work behaviour?

 I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like
 this.


 On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com
 wrote:

  Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now
 - this problem is specific to Spark.


 That doesn't necessarily mean anything. Spark apps have different
 resource requirements than Hadoop apps.

 Check your RM logs for any line that mentions your Spark app id. That may
 give you some insight into what's happening or not.

 --
 Marcelo




 --
 Marcelo



spark on yarn

2015-06-09 Thread Neera
In my test data, I have a JavaRDD with a single String(size of this RDD is
1).

On a 3 node Yarn cluster, mapToPair function on this RDD sends the same
input String to 2 different nodes. Container logs on these nodes show the
same string as input.   

Overriding default partition count by 
JavaRDDString input = sparkContext.textFile(hdfsPath, 0);

didn't change anything and the same input string is being processed twice.
Is there a way to make sure that each string in a RDD is processed exactly
once?

Thanks,
Neera 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-on-yarn-tp23230.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Marcelo Vanzin
On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote:

  Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now -
 this problem is specific to Spark.


That doesn't necessarily mean anything. Spark apps have different resource
requirements than Hadoop apps.

Check your RM logs for any line that mentions your Spark app id. That may
give you some insight into what's happening or not.

-- 
Marcelo


Re: Kafka Spark Streaming: ERROR EndpointWriter: dropping message

2015-06-09 Thread nsalian
1) Could you share your command?

2) Are the kafka brokers on the same host?

3) Could you run a --describe on the topic to see if the topic is setup
correctly (just to be sure)?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228p23235.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Determining number of executors within RDD

2015-06-09 Thread maxdml
You should try, from the SparkConf object, to issue a get.

I don't have the exact name for the matching key, but from reading the code
in SparkSubmit.scala, it should be something like:

conf.get(spark.executor.instances)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23234.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can a Spark App run with spark-submit write pdf files to HDFS

2015-06-09 Thread nsalian
By writing PDF files, do you mean something equivalent to a hadoop fs -put
/path?

I'm not sure how Pdfbox works though, have you tried writing individually
without spark?

We can potentially look if you have established that as a starting point to
see how Spark can be interfaced to write to HDFS.

Moreover, is there a specific need to use Spark in this case?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-Spark-App-run-with-spark-submit-write-pdf-files-to-HDFS-tp23233p23237.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread nsalian
I see the other jobs SUCCEEDED without issues.

Could you snapshot the FairScheduler activity as well? 
My guess it, with the single core, it is reaching a NodeManager that is
still busy with other jobs and the job ends up in a waiting state.

Does the job eventually complete?

Could you potentially add another node to the cluster to see if my guess is
right? I just see one Active NM.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-running-Spark-1-4-on-Yarn-tp23211p23236.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can a Spark App run with spark-submit write pdf files to HDFS

2015-06-09 Thread Richard Catlin
I would like to write pdf files using pdfbox to HDFS from my Spark
application.  Can this be done?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-Spark-App-run-with-spark-submit-write-pdf-files-to-HDFS-tp23233.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issue running Spark 1.4 on Yarn

2015-06-09 Thread Matt Kapilevich
I've tried running a Hadoop app pointing to the same queue. Same thing now,
the job doesn't get accepted. I've cleared out the queue and killed all the
pending jobs, the queue is still unusable.

It seems like an issue with YARN, but it's specifically Spark that leaves
the queue in this state. I've ran a Hadoop job in a for loop 10x, while
specifying the queue explicitly, just to double-check.

On Tue, Jun 9, 2015 at 4:45 PM, Matt Kapilevich matve...@gmail.com wrote:

 From the RM scheduler, I see 3 applications currently stuck in the
 root.thequeue queue.

 Used Resources: memory:0, vCores:0
 Num Active Applications: 0
 Num Pending Applications: 3
 Min Resources: memory:0, vCores:0
 Max Resources: memory:6655, vCores:4
 Steady Fair Share: memory:1664, vCores:0
 Instantaneous Fair Share: memory:6655, vCores:0

 On Tue, Jun 9, 2015 at 4:30 PM, Matt Kapilevich matve...@gmail.com
 wrote:

 Yes! If I either specify a different queue or don't specify a queue at
 all, it works.

 On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Does it work if you don't specify a queue?

 On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com
 wrote:

 Hi Marcelo,

 Yes, restarting YARN fixes this behavior and it again works the first
 few times. The only thing that's consistent is that once Spark job
 submissions stop working, it's broken for good.

 On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Apologies, I see you already posted everything from the RM logs that
 mention your stuck app.

 Have you tried restarting the YARN cluster to see if that changes
 anything? Does it go back to the first few tries work behaviour?

 I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything
 like this.


 On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com
 wrote:

  Like I mentioned earlier, I'm able to execute Hadoop jobs fine even
 now - this problem is specific to Spark.


 That doesn't necessarily mean anything. Spark apps have different
 resource requirements than Hadoop apps.

 Check your RM logs for any line that mentions your Spark app id. That
 may give you some insight into what's happening or not.

 --
 Marcelo




 --
 Marcelo





 --
 Marcelo






Re: spark-submit working differently than pyspark when trying to find external jars

2015-06-09 Thread Walt Schlender
I figured it out *in case anyone else has this problem in the future.


spark-submit --driver-class-path lib/postgresql-9.4-1201.jdbc4.jar
--packages com.databricks:spark-csv_2.10:1.0.3 path/to/my/script.py


What I found is that you MUST put the path to your script at the end of the
spark-submit command. Also, wildcards in the --driver-class-path work when
using pyspark but don't work when using spark-submit.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-working-differently-than-pyspark-when-trying-to-find-external-jars-tp23231p23232.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Cassandra Submit

2015-06-09 Thread Mohammed Guller
Looks like the real culprit is a library version mismatch:

Caused by: java.lang.NoSuchMethodError: 
org.apache.cassandra.thrift.TFramedTransportFactory.openTransport(Ljava/lang/String;I)Lorg/apache/thrift/transport/TTransport;
 at 
com.datastax.spark.connector.cql.DefaultConnectionFactory$.createThriftClient(CassandraConnectionFactory.scala:41)
 at 
com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:134)
 ... 28 more

The Spark Cassandra Connector is  trying to use a method, which does not 
exists. That means your assembly jar has the wrong version of the library that 
SCC is trying to use. Welcome to jar hell!

Mohammed

From: Yasemin Kaya [mailto:godo...@gmail.com]
Sent: Tuesday, June 9, 2015 12:24 PM
To: Mohammed Guller
Cc: Yana Kadiyska; Gerard Maas; user@spark.apache.org
Subject: Re: Cassandra Submit

My code https://gist.github.com/yaseminn/d77dd9baa6c3c43c7594  and 
exceptionhttps://gist.github.com/yaseminn/fdd6e5a6efa26219b4d3.

and
~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh
Connected to Test Cluster at 127.0.0.1:9042http://127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh use test;
cqlsh:test select * from people;

 id | name
+-
  5 |   eslem
  1 | yasemin
  8 | ali
  2 |   busra
  4 |   ilham
  7 |   kubra
  6 |tuba
  9 |aslı
  3 |  Andrew

(9 rows)
cqlsh:test

bin/cassandra-cli -h 127.0.0.1 -p 9160
Connected to: Test Cluster on 127.0.0.1/9160http://127.0.0.1/9160
Welcome to Cassandra CLI version 2.1.5

The CLI is deprecated and will be removed in Cassandra 3.0.  Consider migrating 
to cqlsh.
CQL is fully backwards compatible with Thrift data; see 
http://www.datastax.com/dev/blog/thrift-to-cql3

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown]


yasemin

2015-06-09 22:03 GMT+03:00 Mohammed Guller 
moham...@glassbeam.commailto:moham...@glassbeam.com:
It is strange that writes works but read does not. If it was a Cassandra 
connectivity issue, then neither write or read would work. Perhaps the problem 
is somewhere else.

Can you send the complete exception trace?

Also, just to make sure that there is no DNS issue, try this:
~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h 127.0.0.1 -p 9160

Mohammed

From: Yasemin Kaya [mailto:godo...@gmail.commailto:godo...@gmail.com]
Sent: Tuesday, June 9, 2015 11:32 AM
To: Yana Kadiyska
Cc: Gerard Maas; Mohammed Guller; 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Cassandra Submit

I removed core and streaming jar. And the exception still same.

I tried what you said then results:

~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h localhost -p 9160
Connected to: Test Cluster on localhost/9160
Welcome to Cassandra CLI version 2.1.5

The CLI is deprecated and will be removed in Cassandra 3.0.  Consider migrating 
to cqlsh.
CQL is fully backwards compatible with Thrift data; see 
http://www.datastax.com/dev/blog/thrift-to-cql3

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown]

and

~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh
Connected to Test Cluster at 127.0.0.1:9042http://127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh

Thank you for your kind responses ...


2015-06-09 20:59 GMT+03:00 Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com:
Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly 
1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix 
versions. Spark-assembly bundless all spark packages, so either do them 
separately or use spark-assembly but don't mix like you've shown.

As to the port issue -- what about this:

$bin/cassandra-cli -h localhost -p 9160
Connected to: Test Cluster on localhost/9160
Welcome to Cassandra CLI version 2.1.5


On Tue, Jun 9, 2015 at 1:29 PM, Yasemin Kaya 
godo...@gmail.commailto:godo...@gmail.com wrote:
My jar files are:

cassandra-driver-core-2.1.5.jar
cassandra-thrift-2.1.3.jar
guava-18.jar
jsr166e-1.1.0.jar
spark-assembly-1.3.0.jar
spark-cassandra-connector_2.10-1.3.0-M1.jar
spark-cassandra-connector-java_2.10-1.3.0-M1.jar
spark-core_2.10-1.3.1.jar
spark-streaming_2.10-1.3.1.jar

And my code from datastax 
spark-cassandra-connectorhttps://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java.

Thanx alot.
yasemin

2015-06-09 18:58 GMT+03:00 Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com:
hm. Yeah, your port is good...have you seen this thread: 
http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector
 ? It seems that you might be running into version mis-match issues?

What versions of Spark/Cassandra-connector are you trying to use?

On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya 

Re: Linear Regression with SGD

2015-06-09 Thread Robin East
Hi Stephen

How many is a very large number of iterations? SGD is notorious for requiring 
100s or 1000s of iterations, also you may need to spend some time tweaking the 
step-size. In 1.4 there is an implementation of ElasticNet Linear Regression 
which is supposed to compare favourably with an equivalent R implementation.
 On 9 Jun 2015, at 22:05, Stephen Carman scar...@coldlight.com wrote:
 
 Hi User group,
 
 We are using spark Linear Regression with SGD as the optimization technique 
 and we are achieving very sub-optimal results.
 
 Can anyone shed some light on why this implementation seems to produce such 
 poor results vs our own implementation?
 
 We are using a very small dataset, but we have to use a very large number of 
 iterations to achieve similar results to our implementation, we’ve tried 
 normalizing the data
 not normalizing the data and tuning every param. Our implementation is a 
 closed form solution so we should be guaranteed convergence but the spark one 
 is not, which is
 understandable, but why is it so far off?
 
 Has anyone experienced this?
 
 Steve Carman, M.S.
 Artificial Intelligence Engineer
 Coldlight-PTC
 scar...@coldlight.com
 This e-mail is intended solely for the above-mentioned recipient and it may 
 contain confidential or privileged information. If you have received it in 
 error, please notify us immediately and delete the e-mail. You must not copy, 
 distribute, disclose or take any action in reliance on it. In addition, the 
 contents of an attachment to this e-mail may contain software viruses which 
 could damage your own computer system. While ColdLight Solutions, LLC has 
 taken every reasonable precaution to minimize this risk, we cannot accept 
 liability for any damage which you sustain as a result of software viruses. 
 You should perform your own virus checks before opening the attachment.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



BigDecimal problem in parquet file

2015-06-09 Thread bipin
Hi,
When I try to save my data frame as a parquet file I get the following
error:

java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
org.apache.spark.sql.types.Decimal
at
org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
at
org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

How to fix this problem ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.3.1 SparkSQL metastore exceptions

2015-06-09 Thread Needham, Guy
Hi,

I'm using Spark 1.3.1 to insert into a Hive 0.12 table from a SparkSQL query. 
The query is a very simple select from a dummy Hive table used for benchmarking.
I'm using a create table as statement to do the insert. No matter if I do that 
or an insert overwrite, I get the same Hive exception, unable to alter table, 
with some Hive metastore issues.

The data is inserted into the Hive table as expected, however I get a very long 
stacktrace. Does anyone know the meaning of the stacktrace and how I can avoid 
generating it every time I insert into a table?

scala hiveContext.sql(create table benchmarking.spark_logins_benchmark as 
select * from benchmarking.logins_benchmark limit 10)
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table.
at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:387)
at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1448)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:235)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:123)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:255)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)
at 
org.apache.spark.sql.hive.execution.CreateTableAsSelect.run(CreateTableAsSelect.scala:70)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54)
at 
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:147)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:130)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)
at $iwC$$iwC$$iwC$$iwC.init(console:35)
at $iwC$$iwC$$iwC.init(console:37)
at $iwC$$iwC.init(console:39)
at $iwC.init(console:41)
at init(console:43)
at .init(console:47)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 

Re: Implementing top() using treeReduce()

2015-06-09 Thread DB Tsai
Having the following code in RDD.scala works for me. PS, in the following
code, I merge the smaller queue into larger one. I wonder if this will help
performance. Let me know when you do the benchmark.

def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  if (num == 0) {
Array.empty
  } else {
val mapRDDs = mapPartitions { items =
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
  Array.empty
} else {
  mapRDDs.treeReduce { (queue1, queue2) =
if (queue1.size  queue2.size) {
  queue1 ++= queue2
  queue1
} else {
  queue2 ++= queue1
  queue2
}
  }.toArray.sorted(ord)
}
  }
}

def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  treeTakeOrdered(num)(ord.reverse)
}



Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D
https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D

On Tue, Jun 9, 2015 at 10:09 AM, raggy raghav0110...@gmail.com wrote:

 I am trying to implement top-k in scala within apache spark. I am aware
 that
 spark has a top action. But, top() uses reduce(). Instead, I would like to
 use treeReduce(). I am trying to compare the performance of reduce() and
 treeReduce().

 The main issue I have is that I cannot use these 2 lines of code which are
 used in the top() action within my Spark application.

 val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
 queue ++= util.collection.Utils.takeOrdered(items, num)(ord)

 How can I go about implementing top() using treeReduce()?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




how to clear state in Spark Streaming based on emitting

2015-06-09 Thread Robert Towne
With Spark Streaming, I am maintaining a state (updateStateByKey every 30s) and 
emitting to file parts of that state that have been closed every 5 minutes, but 
only care about the last state collected.
In 5m, there will be 10 updateStateByKey iterations called.

For example:
…
  val ssc =  new StreamingContext(sc, Seconds(30))

val expiredState = state
.filter(_._2.expired == true)
.window(windowDuration = Seconds(30), slideDuration)
…

When I go to emit, I want to update a Boolean flag in my collection of state 
that says it has been collected, so that the next time state is updated I can 
remove what has been emitted.

Is there a way to do this or maybe a better pattern or approach to solve this 
problem?

Hopefully I have given enough information to explain the use case.

Thanks,
Robert




Re: Can a Spark App run with spark-submit write pdf files to HDFS

2015-06-09 Thread William Briggs
I don't know anything about your use case, so take this with a grain of
salt, but typically if you are operating at a scale that benefits from
Spark, then you likely will not want to write your output records as
individual files into HDFS. Spark has built-in support for the Hadoop
SequenceFile container format, which is a more scalable way to handle
writing out your results; you could write your Spark RDD transformations in
such a way that your final RDD is a PairRDD with a unique key (possibly
what would normally have been the standalone file name) and the value (in
this case, likely the byte array of the PDF you generated).

It looks like PDFBox's PDDocument class allows you to save the document
to an OutputStream
https://pdfbox.apache.org/docs/1.8.9/javadocs/org/apache/pdfbox/pdmodel/PDDocument.html#save(java.io.OutputStream),
so you could probably get away with saving to a ByteArrayOutputStream, and
snagging the bytes that comprise the final document. You can see more about
how to write SequenceFiles from Spark here
https://spark.apache.org/docs/latest/programming-guide.html#actions.

As an aside, one hint that I have found helpful since I starting working
with Spark is that if your transformation requires classes that are
expensive to instantiate, you may want to look into mapPartitions, which
allows you to do the setup once per partition instead of once per record. I
haven't used PDFBox, but it wouldn't surprise me to learn that there's some
non-neglible overhead involved.

Hope that helps,
Will

On Tue, Jun 9, 2015 at 5:57 PM, Richard Catlin richard.m.cat...@gmail.com
wrote:

 I would like to write pdf files using pdfbox to HDFS from my Spark
 application.  Can this be done?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-Spark-App-run-with-spark-submit-write-pdf-files-to-HDFS-tp23233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: flatMap output on disk / flatMap memory overhead

2015-06-09 Thread Imran Rashid
I agree with Richard.  It looks like the issue here is shuffling, and
shuffle data is always written to disk, so the issue is definitely not that
all the output of flatMap has to be stored in memory.

If at all possible, I'd first suggest upgrading to a new version of spark
-- even in 1.2, there were big improvements to shuffle with sort based
shuffle as the default.

On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com
wrote:

 Are you sure it's memory related? What is the disk utilization and IO
 performance on the workers? The error you posted looks to be related to
 shuffle trying to obtain block data from another worker node and failing to
 do so in reasonable amount of time. It may still be memory related, but I'm
 not sure that other resources are ruled out yet.

 On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch
  wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a set
 of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





spark-submit does not use hive-site.xml

2015-06-09 Thread James Pirz
I am using Spark (standalone) to run queries (from a remote client) against
data in tables that are already defined/loaded in Hive.

I have started metastore service in Hive successfully, and by putting
hive-site.xml, with proper metastore.uri, in $SPARK_HOME/conf directory, I
tried to share its config with spark.

When I start spark-shell, it gives me a default sqlContext, and I can use
that to access my Hive's tables with no problem.

But once I submit a similar query via Spark application through
'spark-submit', it does not see the tables and it seems it does not pick
hive-site.xml which is under conf directory in Spark's home. I tried to use
'--files' argument with spark-submit to pass hive-site.xml' to the
workers, but it did not change anything.

Here is how I try to run the application:

$SPARK_HOME/bin/spark-submit --class SimpleClient --master
spark://my-spark-master:7077 --files=$SPARK_HOME/conf/hive-site.xml
 simple-sql-client-1.0.jar

Here is the simple example code that I try to run (in Java):

SparkConf conf = new SparkConf().setAppName(Simple SQL Client);

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame res = sqlContext.sql(show tables);

res.show();


Here are the SW versions:
Spark: 1.3
Hive: 1.2
Hadoop: 2.6

Thanks in advance for any suggestion.


Re: Linear Regression with SGD

2015-06-09 Thread DB Tsai
As Robin suggested, you may try the following new implementation.

https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef

Thanks.

Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D
https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D

On Tue, Jun 9, 2015 at 3:22 PM, Robin East robin.e...@xense.co.uk wrote:

 Hi Stephen

 How many is a very large number of iterations? SGD is notorious for
 requiring 100s or 1000s of iterations, also you may need to spend some time
 tweaking the step-size. In 1.4 there is an implementation of ElasticNet
 Linear Regression which is supposed to compare favourably with an
 equivalent R implementation.
  On 9 Jun 2015, at 22:05, Stephen Carman scar...@coldlight.com wrote:
 
  Hi User group,
 
  We are using spark Linear Regression with SGD as the optimization
 technique and we are achieving very sub-optimal results.
 
  Can anyone shed some light on why this implementation seems to produce
 such poor results vs our own implementation?
 
  We are using a very small dataset, but we have to use a very large
 number of iterations to achieve similar results to our implementation,
 we’ve tried normalizing the data
  not normalizing the data and tuning every param. Our implementation is a
 closed form solution so we should be guaranteed convergence but the spark
 one is not, which is
  understandable, but why is it so far off?
 
  Has anyone experienced this?
 
  Steve Carman, M.S.
  Artificial Intelligence Engineer
  Coldlight-PTC
  scar...@coldlight.com
  This e-mail is intended solely for the above-mentioned recipient and it
 may contain confidential or privileged information. If you have received it
 in error, please notify us immediately and delete the e-mail. You must not
 copy, distribute, disclose or take any action in reliance on it. In
 addition, the contents of an attachment to this e-mail may contain software
 viruses which could damage your own computer system. While ColdLight
 Solutions, LLC has taken every reasonable precaution to minimize this risk,
 we cannot accept liability for any damage which you sustain as a result of
 software viruses. You should perform your own virus checks before opening
 the attachment.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Cassandra Submit

2015-06-09 Thread Yana Kadiyska
Is your cassandra installation actually listening on 9160?

lsof -i :9160COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java29232 ykadiysk   69u  IPv4 42152497  0t0  TCP
localhost:9160 (LISTEN)

​
I am running an out-of-the box cassandra conf where

rpc_address: localhost
# port for Thrift to listen for clients on
rpc_port: 9160



On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote:

 I couldn't find any solution. I can write but I can't read from Cassandra.

 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com:

 Thanks alot Mohammed, Gerard and Yana.
 I can write to table, but exception returns me. It says *Exception in
 thread main java.io.IOException: Failed to open thrift connection to
 Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160*

 In yaml file :
 rpc_address: localhost
 rpc_port: 9160

 And at project :

 .set(spark.cassandra.connection.host, 127.0.0.1)
 .set(spark.cassandra.connection.rpc.port, 9160);

 or

 .set(spark.cassandra.connection.host, localhost)
 .set(spark.cassandra.connection.rpc.port, 9160);

 whatever I write setting,  I get same exception. Any help ??


 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com:

 yes, whatever you put for listen_address in cassandra.yaml. Also, you
 should try to connect to your cassandra cluster via bin/cqlsh to make sure
 you have connectivity before you try to make a a connection via spark.

 On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,
 I run my project on local. How can find ip address of my cassandra
 host ? From cassandra.yaml or ??

 yasemin

 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com:

 ? = ip address of your cassandra host

 On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com
 wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should I
 change ? Should I change cassandra.yaml ?

 Error says me *Exception in thread main java.io.IOException:
 Failed to open native connection to Cassandra at {127.0.1.1}:9042*

 What should I add *SparkConf sparkConf = new
 SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts,
 true).set(spark.cassandra.connection.host, ?);*

 Best
 yasemin

 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com:

  Check your spark.cassandra.connection.host setting. It should be
 pointing to one of your Cassandra nodes.



 Mohammed



 *From:* Yasemin Kaya [mailto:godo...@gmail.com]
 *Sent:* Friday, June 5, 2015 7:31 AM
 *To:* user@spark.apache.org
 *Subject:* Cassandra Submit



 Hi,



 I am using cassandraDB in my project. I had that error *Exception
 in thread main java.io.IOException: Failed to open native connection 
 to
 Cassandra at {127.0.1.1}:9042*



 I think I have to modify the submit line. What should I add or
 remove when I submit my project?



 Best,

 yasemin





 --

 hiç ender hiç




 --
 hiç ender hiç





 --
 hiç ender hiç





 --
 hiç ender hiç




 --
 hiç ender hiç



Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-09 Thread Dmitry Goldenberg
At which point would I call cache()?  I just want the runtime to spill to
disk when necessary without me having to know when the necessary is.


On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger c...@koeninger.org wrote:

 direct stream isn't a receiver, it isn't required to cache data anywhere
 unless you want it to.

 If you want it, just call cache.

 On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 set the storage policy for the DStream RDDs to MEMORY AND DISK - it
 appears the storage level can be specified in the createStream methods but
 not createDirectStream...


 On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You can also try Dynamic Resource Allocation




 https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation



 Also re the Feedback Loop for automatic message consumption rate
 adjustment – there is a “dumb” solution option – simply set the storage
 policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
 exhausted spark streaming will resort to keeping new RDDs on disk which
 will prevent it from crashing and hence loosing them. Then some memory will
 get freed and it will resort back to RAM and so on and so forth





 Sent from Samsung Mobile

  Original message 

 From: Evo Eftimov

 Date:2015/05/28 13:22 (GMT+00:00)

 To: Dmitry Goldenberg

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
 growth in Kafka or Spark's metrics?



 You can always spin new boxes in the background and bring them into the
 cluster fold when fully operational and time that with job relaunch and
 param change



 Kafka offsets are mabaged automatically for you by the kafka clients
 which keep them in zoomeeper dont worry about that ad long as you shut down
 your job gracefuly. Besides msnaging the offsets explicitly is not a big
 deal if necessary





 Sent from Samsung Mobile



  Original message 

 From: Dmitry Goldenberg

 Date:2015/05/28 13:16 (GMT+00:00)

 To: Evo Eftimov

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
 growth in Kafka or Spark's metrics?



 Thanks, Evo.  Per the last part of your comment, it sounds like we will
 need to implement a job manager which will be in control of starting the
 jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
 marking them as ones to relaunch, scaling the cluster up/down by
 adding/removing machines, and relaunching the 'suspended' (shut down) jobs.



 I suspect that relaunching the jobs may be tricky since that means
 keeping track of the starter offsets in Kafka topic(s) from which the jobs
 started working on.



 Ideally, we'd want to avoid a re-launch.  The 'suspension' and
 relaunching of jobs, coupled with the wait for the new machines to come
 online may turn out quite time-consuming which will make for lengthy
 request times, and our requests are not asynchronous.  Ideally, the
 currently running jobs would continue to run on the machines currently
 available in the cluster.



 In the scale-down case, the job manager would want to signal to Spark's
 job scheduler not to send work to the node being taken out, find out when
 the last job has finished running on the node, then take the node out.



 This is somewhat like changing the number of cylinders in a car engine
 while the car is running...



 Sounds like a great candidate for a set of enhancements in Spark...



 On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 @DG; The key metrics should be



 -  Scheduling delay – its ideal state is to remain constant
 over time and ideally be less than the time of the microbatch window

 -  The average job processing time should remain less than the
 micro-batch window

 -  Number of Lost Jobs – even if there is a single Job lost
 that means that you have lost all messages for the DStream RDD processed by
 that job due to the previously described spark streaming memory leak
 condition and subsequent crash – described in previous postings submitted
 by me



 You can even go one step further and periodically issue “get/check free
 memory” to see whether it is decreasing relentlessly at a constant rate –
 if it touches a predetermined RAM threshold that should be your third
 metric



 Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
 you can implement one on your own without waiting for Jiras and new
 features whenever they might be implemented by the Spark dev team –
 moreover you can avoid using slow mechanisms such as ZooKeeper and even
 incorporate some Machine Learning in your Feedback Loop to make it handle
 the message consumption rate more intelligently and benefit from ongoing
 online learning – BUT this is STILL about voluntarily sacrificing your
 performance in the name of keeping your system stable – it is not 

Re: Spark SQL with Thrift Server is very very slow and finally failing

2015-06-09 Thread Sourav Mazumder
From log file I noticed that the ExecutorLostFailure happens after the
memory used by Executor becomes more than the Executor memory value.
However, even if I increase the value of Executor Memory the Executor fails
- only that it takes longer time.

I'm wondering that for joining 2 Hive tables, one with 100 MB data (around
1 M rows) and another with 20 KB data (around 100 rows) why an executor is
consuming so much of memory. Even if I increase the memory to 20 GB. The
same failure happens.

Regards,
Sourav

On Tue, Jun 9, 2015 at 12:58 PM, Sourav Mazumder 
sourav.mazumde...@gmail.com wrote:

 Hi,

 I'm just doing a select statement which is supposed to return 10 MB data
 maximum. The driver memory is 2G and executor memory is 20 G.

 The query I'm trying to run is something like

 SELECT PROJECT_LIVE_DT, FLOORPLAN_NM, FLOORPLAN_DB_KEY
 FROM POG_PRE_EXT P, PROJECT_CALENDAR_EXT C
 WHERE PROJECT_TYPE = 'CR'

 Not sure what exactly you mean by physical plan.

 Here is he stack trace from the machine where the thrift process is
 running.

 Regards,
 Sourav

 On Mon, Jun 8, 2015 at 11:18 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Is it the large result set return from the Thrift Server? And can you
 paste the SQL and physical plan?



 *From:* Ted Yu [mailto:yuzhih...@gmail.com]
 *Sent:* Tuesday, June 9, 2015 12:01 PM
 *To:* Sourav Mazumder
 *Cc:* user
 *Subject:* Re: Spark SQL with Thrift Server is very very slow and
 finally failing



 Which Spark release are you using ?



 Can you pastebin the stack trace w.r.t. ExecutorLostFailure ?



 Thanks



 On Mon, Jun 8, 2015 at 8:52 PM, Sourav Mazumder 
 sourav.mazumde...@gmail.com wrote:

  Hi,

 I am trying to run a SQL form a JDBC driver using Spark's Thrift Server.

 I'm doing a join between a Hive Table of size around 100 GB and another
 Hive Table with 10 KB, with a filter on a particular column

 The query takes more than 45 minutes and then I get ExecutorLostFailure.
 That is because of memory as once I increase the memory the failure happens
 but after a long time.

 I'm having executor memory 20 GB, Spark DRiver Memory 2 GB, Executor
 Instances 2 and Executor Core 2.

 Running the job using Yarn with master as 'yarn-client'.

 Any idea if I'm missing any other configuration ?

 Regards,

 Sourav







Spark's Scala shell killing itself

2015-06-09 Thread Chandrashekhar Kotekar
Hi,

I have configured Spark to run on YARN. Whenever I start spark shell using
'spark-shell' command, it automatically gets killed. Output looks like
below:

ubuntu@dev-cluster-gateway:~$ ls shekhar/
edx-spark
ubuntu@dev-cluster-gateway:~$ spark-shell
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.0-SNAPSHOT
  /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
15/06/10 05:20:45 WARN Utils: Your hostname, dev-cluster-gateway resolves
to a loopback address: 127.0.0.1; using 10.182.149.171 instead (on
interface eth0)
15/06/10 05:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
15/06/10 05:21:20 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
/usr/lib/spark/bin/spark-shell: line 48: 15573 Killed
 $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main
${SUBMISSION_OPTS[@]} spark-shell ${APPLICATION_OPTS[@]}


Any clue why spark shell gets killed? Please let me know if other
configuration/information is required.

Thanks,
Chandrash3khar Kotekar


append file on hdfs

2015-06-09 Thread Pa Rö
hi community,

i want append results to one file. if i work local my function build all
right,
if i run this on a yarn cluster, i lost same rows.

here my function to write:

points.foreach(
new VoidFunctionTuple2Integer, GeoTimeDataTupel() {

private static final long serialVersionUID =
2459995649387229261L;

public void call(Tuple2Integer, GeoTimeDataTupel
entry)throws Exception {
try {
FileSystem fs = FileSystem.get(new
URI(pro.getProperty(hdfs.namenode)),new Configuration());
Path pt=new
Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results);

if(fs.exists(pt)) {
FSDataInputStream in = fs.open(pt);
Path pt_temp = new
Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results_temp);
backup(fs.getConf(), fs, in, pt_temp);
in.close();

FSDataOutputStream out = fs.create((pt), true);
FSDataInputStream backup = fs.open(pt_temp);

int offset = 0;
int bufferSize = 4096;

int result = 0;

byte[] buffer = new byte[bufferSize];
// pre read a part of content from input stream
result = backup.read(offset, buffer, 0,
bufferSize);
// loop read input stream until it does not
fill whole size of buffer
while (result == bufferSize) {
out.write(buffer);
// read next segment from input stream by
moving the offset pointer
offset += bufferSize;
result = backup.read(offset, buffer, 0,
bufferSize);
}

if (result  0  result  bufferSize) {
for (int i = 0; i  result; i++) {
out.write(buffer[i]);
}
}
out.writeBytes(Cluster: +entry._1+, Point:
+entry._2.toString()+\n);
out.close();
}
else {
BufferedWriter bw =new BufferedWriter(new
OutputStreamWriter(fs.create(pt)));
bw.write(Cluster: +entry._1+, Point:
+entry._2.toString()+\n);
bw.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

public void backup(Configuration conf, FileSystem
fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception {

FSDataOutputStream out = fs.create(pt_temp, true);
IOUtils.copyBytes(sourceContent, out, 4096, false);
out.close();
}

where is my fault?? or give it a function to write(append) to the hadoop
hdfs?

best regards,
paul


Re: RDD of RDDs

2015-06-09 Thread kiran lonikar
Yes true. That's why I said if and when.

But hopefully I have given correct explanation of why rdd of rdd is not
possible.
On 09-Jun-2015 10:22 pm, Mark Hamstra m...@clearstorydata.com wrote:

 That would constitute a major change in Spark's architecture.  It's not
 happening anytime soon.

 On Tue, Jun 9, 2015 at 1:34 AM, kiran lonikar loni...@gmail.com wrote:

 Possibly in future, if and when spark architecture allows workers to
 launch spark jobs (the functions passed to transformation or action APIs of
 RDD), it will be possible to have RDD of RDD.

 On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar loni...@gmail.com wrote:

 Simillar question was asked before:
 http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html

 Here is one of the reasons why I think RDD[RDD[T]] is not possible:

- RDD is only a handle to the actual data partitions. It has a
reference/pointer to the *SparkContext* object (*sc*) and a list of
partitions.
- The *SparkContext *is an object in the Spark Application/Driver
Program's JVM. Similarly, the list of partitions is also in the JVM of 
 the
driver program. Each partition contains kind of remote references to 
 the
partition data on the worker JVMs.
- The functions passed to RDD's transformations and actions execute
in the worker's JVMs on different nodes. For example, in *rdd.map {
x = x*x }*, the function performing *x*x* runs on the JVMs of
the worker nodes where the partitions of the RDD reside. These JVMs do 
 not
have access to the *sc* since its only on the driver's JVM.
- Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd =
innerRDD.filter { x = x*x } }*, the worker nodes will not be able
to execute the *filter* on *innerRDD *as the code in the worker does
not have access to sc and can not launch a spark job.


 Hope it helps. You need to consider List[RDD] or some other collection.

 -Kiran

 On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote:

 Hi,


 The problem I am looking at is as follows:

 - I read in a log file of multiple users as a RDD

 - I'd like to group the above RDD into *multiple RDDs* by userIds (the
 key)

 - my processEachUser() function then takes in each RDD mapped into
 each individual user, and calls for RDD.map or DataFrame operations on
 them. (I already had the function coded, I am therefore reluctant to work
 with the ResultIterable object coming out of rdd.groupByKey() ... )

 I've searched the mailing list and googled on RDD of RDDs and seems
 like it isn't a thing at all.

 A few choices left seem to be: 1) groupByKey() and then work with the
 ResultIterable object; 2) groupbyKey() and then write each group into a
 file, and read them back as individual rdds to process..

 Anyone got a better idea or had a similar problem before?


 Thanks!
 Ping






 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721







RE: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS

2015-06-09 Thread Dong Lei
Thanks So much!

I did put sleep on my code to have the UI available.

Now from the UI, I can see:

· In the “SparkProperty” Section,  the spark.jars and spark.files are 
set as what I want.

· In the “Classpath Entries” Section, my jars and files paths are 
there(with a HDFS path)

And I check the HTTP file server directory, the stuctrue is like:
 D:\data\temp
  \ --spark-UUID
   \-- httpd-UUID
\jars [empty]
\files [empty]

So I guess the files and jars and not properly downloaded from HDFS to these 
folders?

I’m using standalone mode.

Any ideas?

Thanks
Dong Lei

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Tuesday, June 9, 2015 4:46 PM
To: Dong Lei
Cc: user@spark.apache.org
Subject: Re: ClassNotDefException when using spark-submit with multiple jars 
and files located on HDFS

You can put a Thread.sleep(10) in the code to have the UI available for 
quiet some time. (Put it just before starting any of your transformations) Or 
you can enable the spark history 
serverhttps://spark.apache.org/docs/latest/monitoring.html too. I believe 
--jarshttps://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
 would download the dependency jars on all your worker machines (can be found 
in spark work dir of your application along with stderr stdout files).

Thanks
Best Regards

On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei 
dong...@microsoft.commailto:dong...@microsoft.com wrote:
Thanks Akhil:

The driver fails so fast to get a look at 4040. Is there any other way to see 
the download and ship process of the files?

Is driver supposed to download these jars from HDFS to some location, then ship 
them to excutors?
I can see from log that the driver downloaded the application jar but not the 
other jars specified by “—jars”.

Or I misunderstand the usage of “--jars”, and the jars should be already in 
every worker, driver will not download them?
Is there some useful docs?

Thanks
Dong Lei


From: Akhil Das 
[mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com]
Sent: Tuesday, June 9, 2015 3:24 PM
To: Dong Lei
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: ClassNotDefException when using spark-submit with multiple jars 
and files located on HDFS

Once you submits the application, you can check in the driver UI (running on 
port 4040) Environment Tab to see whether those jars you added got shipped or 
not. If they are shipped and still you are getting NoClassDef exceptions then 
it means that you are having a jar conflict which you can resolve by putting 
the jar with the class in it on the top of your classpath.

Thanks
Best Regards

On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei 
dong...@microsoft.commailto:dong...@microsoft.com wrote:
Hi, spark-users:

I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a 
job, with the following command:

Spark-submit
  --class myClass
 --master spark://localhost:7077/
  --deploy-mode cluster
  --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar
  --files hdfs://localhost/1.txt, hdfs://localhost/2.txt
 hdfs://localhost/main.jar

the stderr in the driver showed java.lang.ClassNotDefException for a class in 
1.jar.

I checked the log that spark has added these jars:
 INFO SparkContext: Added JAR hdfs:// …1.jar
 INFO SparkContext: Added JAR hdfs:// …2.jar

In the folder of the driver, I only saw the main.jar is copied to that place, 
but  the other jars and files were not there

Could someone explain how should I pass the jars and files needed by the main 
jar to spark?

If my class in main.jar refer to these files with a relative path, will spark 
copy these files into one folder?

BTW, my class works in a client mode with all jars and files in local.

Thanks
Dong Lei




Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS

2015-06-09 Thread Jörn Franke
I am not sure they work with HDFS pathes. You may want to look at the
source code. Alternatively you can create a fat jar containing all jars
(let your build tool set correctly METAINF). This always works.

Le mer. 10 juin 2015 à 6:22, Dong Lei dong...@microsoft.com a écrit :

  Thanks So much!



 I did put sleep on my code to have the UI available.



 Now from the UI, I can see:

 · In the “SparkProperty” Section,  the spark.jars and spark.files
 are set as what I want.

 · In the “Classpath Entries” Section, my jars and files paths are
 there(with a HDFS path)



 And I check the HTTP file server directory, the stuctrue is like:

  D:\data\temp

   \ --spark-UUID

\-- httpd-UUID

 \jars [*empty*]

 \files [*empty*]



 So I guess the files and jars and not properly downloaded from HDFS to
 these folders?



 I’m using standalone mode.



 Any ideas?



 Thanks

 Dong Lei



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Tuesday, June 9, 2015 4:46 PM


 *To:* Dong Lei
 *Cc:* user@spark.apache.org
 *Subject:* Re: ClassNotDefException when using spark-submit with multiple
 jars and files located on HDFS



 You can put a Thread.sleep(10) in the code to have the UI available
 for quiet some time. (Put it just before starting any of your
 transformations) Or you can enable the spark history server
 https://spark.apache.org/docs/latest/monitoring.html too. I believe
 --jars
 https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
 would download the dependency jars on all your worker machines (can be
 found in spark work dir of your application along with stderr stdout files).


   Thanks

 Best Regards



 On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei dong...@microsoft.com wrote:

  Thanks Akhil:



 The driver fails so fast to get a look at 4040. Is there any other way to
 see the download and ship process of the files?



 Is driver supposed to download these jars from HDFS to some location, then
 ship them to excutors?

 I can see from log that the driver downloaded the application jar but not
 the other jars specified by “—jars”.



 Or I misunderstand the usage of “--jars”, and the jars should be already
 in every worker, driver will not download them?

 Is there some useful docs?



 Thanks

 Dong Lei





 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Tuesday, June 9, 2015 3:24 PM
 *To:* Dong Lei
 *Cc:* user@spark.apache.org
 *Subject:* Re: ClassNotDefException when using spark-submit with multiple
 jars and files located on HDFS



 Once you submits the application, you can check in the driver UI (running
 on port 4040) Environment Tab to see whether those jars you added got
 shipped or not. If they are shipped and still you are getting NoClassDef
 exceptions then it means that you are having a jar conflict which you can
 resolve by putting the jar with the class in it on the top of your
 classpath.


   Thanks

 Best Regards



 On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.com wrote:

  Hi, spark-users:



 I’m using spark-submit to submit multiple jars and files(all in HDFS) to
 run a job, with the following command:



 Spark-submit

   --class myClass

  --master spark://localhost:7077/

   --deploy-mode cluster

   --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar

   --files hdfs://localhost/1.txt, hdfs://localhost/2.txt

  hdfs://localhost/main.jar



 the stderr in the driver showed java.lang.ClassNotDefException for a class
 in 1.jar.



 I checked the log that spark has added these jars:

  INFO SparkContext: Added JAR hdfs:// …1.jar

  INFO SparkContext: Added JAR hdfs:// …2.jar



 In the folder of the driver, I only saw the main.jar is copied to that
 place, *but  the other jars and files were not there*



 Could someone explain *how should I pass the jars and files* needed by
 the main jar to spark?



 If my class in main.jar refer to these files with a relative path, *will
 spark copy these files into one folder*?



 BTW, my class works in a client mode with all jars and files in local.



 Thanks

 Dong Lei







RE: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS

2015-06-09 Thread Dong Lei
Hi Jörn:

I start to check code and sadly it seems it does not work hdfs path:

In HTTPFileServer.scala:
def addFileToDir:
….
Files.copy
….

It looks like it only copy file from local to the http server directory(then 
distribute the files to workers).

Even if I make addFileToDir to recognize a hdfs prefix and download from hdfs, 
there still be another problems:
the class path is set to the file path on hdfs and I do think it works.

I will try a fat jar,

BTW, do you know how can I ask to support such a scenario? Should I send to 
dev@spark?

Does yarn-cluster mode support dependency jars and files on HDFS?

Thanks
Dong Lei

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Wednesday, June 10, 2015 12:48 PM
To: Dong Lei; Akhil Das
Cc: user@spark.apache.org
Subject: Re: ClassNotDefException when using spark-submit with multiple jars 
and files located on HDFS


I am not sure they work with HDFS pathes. You may want to look at the source 
code. Alternatively you can create a fat jar containing all jars (let your 
build tool set correctly METAINF). This always works.

Le mer. 10 juin 2015 à 6:22, Dong Lei 
dong...@microsoft.commailto:dong...@microsoft.com a écrit :
Thanks So much!

I did put sleep on my code to have the UI available.

Now from the UI, I can see:

• In the “SparkProperty” Section,  the spark.jars and spark.files are 
set as what I want.

• In the “Classpath Entries” Section, my jars and files paths are 
there(with a HDFS path)

And I check the HTTP file server directory, the stuctrue is like:
 D:\data\temp
  \ --spark-UUID
   \-- httpd-UUID
\jars [empty]
\files [empty]

So I guess the files and jars and not properly downloaded from HDFS to these 
folders?

I’m using standalone mode.

Any ideas?

Thanks
Dong Lei

From: Akhil Das 
[mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com]
Sent: Tuesday, June 9, 2015 4:46 PM

To: Dong Lei
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: ClassNotDefException when using spark-submit with multiple jars 
and files located on HDFS

You can put a Thread.sleep(10) in the code to have the UI available for 
quiet some time. (Put it just before starting any of your transformations) Or 
you can enable the spark history 
serverhttps://spark.apache.org/docs/latest/monitoring.html too. I believe 
--jarshttps://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
 would download the dependency jars on all your worker machines (can be found 
in spark work dir of your application along with stderr stdout files).

Thanks
Best Regards

On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei 
dong...@microsoft.commailto:dong...@microsoft.com wrote:
Thanks Akhil:

The driver fails so fast to get a look at 4040. Is there any other way to see 
the download and ship process of the files?

Is driver supposed to download these jars from HDFS to some location, then ship 
them to excutors?
I can see from log that the driver downloaded the application jar but not the 
other jars specified by “—jars”.

Or I misunderstand the usage of “--jars”, and the jars should be already in 
every worker, driver will not download them?
Is there some useful docs?

Thanks
Dong Lei


From: Akhil Das 
[mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com]
Sent: Tuesday, June 9, 2015 3:24 PM
To: Dong Lei
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: ClassNotDefException when using spark-submit with multiple jars 
and files located on HDFS

Once you submits the application, you can check in the driver UI (running on 
port 4040) Environment Tab to see whether those jars you added got shipped or 
not. If they are shipped and still you are getting NoClassDef exceptions then 
it means that you are having a jar conflict which you can resolve by putting 
the jar with the class in it on the top of your classpath.

Thanks
Best Regards

On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei 
dong...@microsoft.commailto:dong...@microsoft.com wrote:
Hi, spark-users:

I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a 
job, with the following command:

Spark-submit
  --class myClass
 --master spark://localhost:7077/
  --deploy-mode cluster
  --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar
  --files hdfs://localhost/1.txt, hdfs://localhost/2.txt
 hdfs://localhost/main.jar

the stderr in the driver showed java.lang.ClassNotDefException for a class in 
1.jar.

I checked the log that spark has added these jars:
 INFO SparkContext: Added JAR hdfs:// …1.jar
 INFO SparkContext: Added JAR hdfs:// …2.jar

In the folder of the driver, I only saw the main.jar is copied to that place, 
but  the other jars and files were not there

Could someone