Re: How to keep a SQLContext instance alive in a spark streaming application's life cycle?
Why? I tried this solution and works fine. El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] ml-node+s1001560n23218...@n3.nabble.com escribió: Hi drarse, thanks for replying, the way you said use a singleton object does not work 在 2015-06-09 16:24:25,drarse [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23218i=0 写道: The best way is create a singleton object like: object SQLContextSingleton { @transient private var instance: SQLContext = null // Instantiate SQLContext on demand def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance }} You have more information in the programming guide: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23216i=0: I used SQLContext in a spark streaming application as blew: case class topic_name (f1: Int, f2: Int) val sqlContext = new SQLContext(sc) @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) ssc.checkpoint(.) val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name)) theDStream.map(x = x._2).foreach { rdd = sqlContext.jsonRDD(newsIdRDD).registerTempTable(topic_name) sqlContext.sql(select count(*) from topic_name).foreach { x = WriteToFile(file_path, x(0).toString) } } ssc.start() ssc.awaitTermination() I found i could only get every 5 seconds's count of message, because The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame, i guess every 5 seconds, a new sqlContext will be create and the temporary table can only alive just 5 seconds, i want to the sqlContext and the temporary table alive all the streaming application's life cycle, how to do it? Thanks~ -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215.html To start a new topic under Apache Spark User List, email [hidden email] http:///user/SendEmail.jtp?type=nodenode=23216i=1 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23216.html To unsubscribe from How to keep a SQLContext instance alive in a spark streaming application's life cycle?, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-SQLContext-instance-alive-in-a-spark-streaming-application-s-life-cycle-tp23215p23218.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com javascript:_e(%7B%7D,'cvml','ml-node%2bs1001560n1...@n3.nabble.com'); To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA== . NAML
RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
The shuffle data can be deleted through weak reference mechanism, you could check the code of ContextCleaner, also you could trigger a full gc manually with JVisualVM or some other tools to see if shuffle files are deleted. Thanks Jerry From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 5:28 PM To: Shao, Saisai; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl Jerry, I agree with you. However, in my case, I kept the monitoring the blockmanager folder. I do see sometimes the number of files decreased, but the folder's size kept increasing. And below is a screenshot of the folder. You can see some old files are not deleted somehow. [cid:image001.jpg@01D0A2DB.739904D0] -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Tuesday, June 09, 2015 4:33 PM To: Haopu Wang; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at
Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]
Thanks Akhil,Mark for your valuable comments. Problem resolved. AT On Tue, Jun 9, 2015 at 2:17 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I think Yes, as the documentation says Creates tuples of the elements in this RDD by applying f. Thanks Best Regards On Tue, Jun 9, 2015 at 1:54 PM, amit tewari amittewar...@gmail.com wrote: Actually the question was will keyBy() take accept multiple fields (eg x(0), x(1)) as Key? On Tue, Jun 9, 2015 at 1:07 PM, amit tewari amittewar...@gmail.com wrote: Thanks Akhil, as you suggested, I have to go keyBy(route) as need the columns intact. But wil keyBy() take accept multiple fields (eg x(0), x(1))? Thanks Amit On Tue, Jun 9, 2015 at 12:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try this way: scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3))) scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3))) scala input11.join(input22).take(10) PairFunctions basically requires RDD[K,V] and in your case its ((String, String), String, String). You can also look in keyBy if you don't want to concatenate your keys. Thanks Best Regards On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com wrote: Hi Dear Spark Users I am very new to Spark/Scala. Am using Datastax (4.7/Spark 1.2.1) and struggling with following error/issue. Already tried options like import org.apache.spark.SparkContext._ or explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions. But error not resolved. Help much appreciated. Thanks AT scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3))) scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3))) scala input11.join(input22).take(10) console:66: error: value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)] input11.join(input22).take(10)
Re: Re: Re: How to decrease the time of storing block in memory
Hi 罗辉 I think you interpret the logs wrong. Your program actually runs from this point: (Rest of them are just starting up stuffs and connecting) 15/06/08 16:14:22 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 15/06/08 16:14:23 INFO storage.MemoryStore: ensureFreeSpace(1561) called with curMem=0, maxMem=370503843 15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1561.0 B, free 353.3 MB) 15/06/08 16:14:23 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0 15/06/08 16:14:23 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 967 ms 15/06/08 16:14:23 INFO storage.MemoryStore: ensureFreeSpace(2168) called with curMem=1561, maxMem=370503843 15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 353.3 MB) = At this point it has already stored the broadcast piece in memory. And starts your Task 0 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 693 bytes result sent to driver = It took 19s to finish your Task 0, and starts Task 1 from this point 15/06/08 16:14:42 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1 15/06/08 16:14:42 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1) 15/06/08 16:14:56 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 693 bytes result sent to driver 15/06/08 16:14:56 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown Now, to speed up things you need to obtain parallelism (at least 2-3 times the number of cores you have), which could mean that your sort.sh is running on a single core. You can perhaps instead of triggering an external command try to do the operation within spark itself, in that way you can always control the parallelism and stuffs. Hope it helps. Thanks Best Regards On Tue, Jun 9, 2015 at 3:00 PM, luohui20...@sina.com wrote: hi akhil Not exactly ,the task took 54s to finish, started from 16:14:02 and ended at 16:14:56. within this 54s , it needs 19s to store value in memory, which started from 16:14:23 and ended at 16:14:42. I think this is the most time-wasting part of this task ,also unreasonable. You may check the log attached in previous mail. and here is my codes: import org.apache.spark._ object GeneCompare3 { def main(args: Array[String]) { //i:piece number, j:user number val i = args(0).toInt val j = args(1) val conf = new SparkConf().setAppName(CompareGenePiece + i + of User + j).setMaster(spark://slave3:7077).set(spark.executor.memory, 2g) val sc = new SparkContext(conf) println(start to compare gene) val runmodifyshell2 = List(run, sort.sh) val runmodifyshellRDD2 = sc.makeRDD(runmodifyshell2) val pipeModify2 = runmodifyshellRDD2.pipe(sh /opt/sh/bin/sort.sh /opt/data/shellcompare/db/chr + i + .txt /opt/data/shellcompare/data/user + j + /pgs/sample/samplechr + i + .txt /opt/data/shellcompare/data/user + j + /pgs/intermediateResult/result + i + .txt 600) pipeModify2.collect() sc.stop() } } Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Re: How to decrease the time of storing block in memory 日期:2015年06月09日 16点51分 Is it that task taking 19s? It won't be simply taking 19s to store 2KB of data into memory there could be other operations happening too (the transformations that you are doing), It would be good if you can paste the code snippet that you are running to have a better understanding. Thanks Best Regards On Tue, Jun 9, 2015 at 2:09 PM, luohui20...@sina.com wrote: Only 1 minor GC, 0.07s. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: How to decrease the time of storing block in memory 日期:2015年06月09日 15点02分 May be you should check in your driver UI and see if there's any GC time involved etc. Thanks Best Regards On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote: hi there I am trying to descrease my app's running time in worker node. I checked the log and found the most time-wasting part is below: 15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 353.3 MB) 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 693 bytes result sent to driver I don't know why it needs 19s to storing 2.1KB size data to memory. Is there any tuning method? The attache is the full log, here it is: 15/06/08 16:14:02 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/06/08 16:14:07 WARN
回复:Re: Re: How to decrease the time of storing block in memory
hi akhil Not exactly ,the task took 54s to finish, started from 16:14:02 and ended at 16:14:56. within this 54s , it needs 19s to store value in memory, which started from 16:14:23 and ended at 16:14:42. I think this is the most time-wasting part of this task ,also unreasonable.You may check the log attached in previous mail. and here is my codes: import org.apache.spark._ object GeneCompare3 { def main(args: Array[String]) { //i:piece number, j:user number val i = args(0).toInt val j = args(1) val conf = new SparkConf().setAppName(CompareGenePiece + i + of User + j).setMaster(spark://slave3:7077).set(spark.executor.memory, 2g) val sc = new SparkContext(conf) println(start to compare gene) val runmodifyshell2 = List(run, sort.sh) val runmodifyshellRDD2 = sc.makeRDD(runmodifyshell2) val pipeModify2 = runmodifyshellRDD2.pipe(sh /opt/sh/bin/sort.sh /opt/data/shellcompare/db/chr + i + .txt /opt/data/shellcompare/data/user + j + /pgs/sample/samplechr + i + .txt /opt/data/shellcompare/data/user + j + /pgs/intermediateResult/result + i + .txt 600) pipeModify2.collect()sc.stop() } } Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Re: How to decrease the time of storing block in memory 日期:2015年06月09日 16点51分 Is it that task taking 19s? It won't be simply taking 19s to store 2KB of data into memory there could be other operations happening too (the transformations that you are doing), It would be good if you can paste the code snippet that you are running to have a better understanding.ThanksBest Regards On Tue, Jun 9, 2015 at 2:09 PM, luohui20...@sina.com wrote: Only 1 minor GC, 0.07s. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: How to decrease the time of storing block in memory 日期:2015年06月09日 15点02分 May be you should check in your driver UI and see if there's any GC time involved etc. ThanksBest Regards On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote: hi there I am trying to descrease my app's running time in worker node. I checked the log and found the most time-wasting part is below:15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 353.3 MB) 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 693 bytes result sent to driver I don't know why it needs 19s to storing 2.1KB size data to memory. Is there any tuning method? The attache is the full log, here it is:15/06/08 16:14:02 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/06/08 16:14:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/08 16:14:10 INFO spark.SecurityManager: Changing view acls to: root 15/06/08 16:14:10 INFO spark.SecurityManager: Changing modify acls to: root 15/06/08 16:14:10 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/06/08 16:14:14 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/08 16:14:14 INFO Remoting: Starting remoting 15/06/08 16:14:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@slave5:54684] 15/06/08 16:14:15 INFO util.Utils: Successfully started service 'driverPropsFetcher' on port 54684. 15/06/08 16:14:16 INFO spark.SecurityManager: Changing view acls to: root 15/06/08 16:14:16 INFO spark.SecurityManager: Changing modify acls to: root 15/06/08 16:14:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/06/08 16:14:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/08 16:14:16 INFO Remoting: Starting remoting 15/06/08 16:14:17 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/06/08 16:14:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@slave5:49169] 15/06/08 16:14:17 INFO util.Utils: Successfully started service 'sparkExecutor' on port 49169. 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@slave5:58630/user/MapOutputTracker 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@slave5:58630/user/BlockManagerMaster 15/06/08
RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
Jerry, I agree with you. However, in my case, I kept the monitoring the blockmanager folder. I do see sometimes the number of files decreased, but the folder's size kept increasing. And below is a screenshot of the folder. You can see some old files are not deleted somehow. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Tuesday, June 09, 2015 4:33 PM To: Haopu Wang; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
Re: Different Sorting RDD methods in Apache Spark
It would be even faster to load the data on the driver and sort it there without using Spark :). Using reduce() is cheating, because it only works as long as the data fits on one machine. That is not the targeted use case of a distributed computation system. You can repeat your test with more data (that doesn't fit on one machine) to see what I mean. On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com wrote: For a research project, I tried sorting the elements in an RDD. I did this in two different approaches. In the first method, I applied a mapPartitions() function on the RDD, so that it would sort the contents of the RDD, and provide a result RDD that contains the sorted list as the only record in the RDD. Then, I applied a reduce function which basically merges sorted lists. I ran these experiments on an EC2 cluster containing 30 nodes. I set it up using the spark ec2 script. The data file was stored in HDFS. In the second approach I used the sortBy method in Spark. I performed these operation on the US census data(100MB) found here A single lines looks like this 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child 18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 5. I sorted based on the 25th value in the CSV. In this line that is 1758.14. I noticed that sortBy performs worse than the other method. Is this the expected scenario? If it is, why wouldn't the mapPartitions() and reduce() be the default sorting approach? Here is my implementation public static void sortBy(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); long start = System.currentTimeMillis(); rdd.sortBy(new FunctionString, Double(){ @Override public Double call(String v1) throws Exception { // TODO Auto-generated method stub String [] arr = v1.split(,); return Double.parseDouble(arr[24]); } }, true, 9).collect(); long end = System.currentTimeMillis(); System.out.println(SortBy: + (end - start)); } public static void sortList(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l, 8); long start = System.currentTimeMillis(); JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 = rdd.mapPartitions(new FlatMapFunctionIteratorlt;String, LinkedListTuple2lt;Double, String(){ @Override public IterableLinkedListlt;Tuple2lt;Double, String call(IteratorString t) throws Exception { // TODO Auto-generated method stub LinkedListTuple2lt;Double, String lines = new LinkedListTuple2lt;Double, String(); while(t.hasNext()){ String s = t.next(); String arr1[] = s.split(,); Tuple2Double, String t1 = new Tuple2Double, String(Double.parseDouble(arr1[24]),s); lines.add(t1); } Collections.sort(lines, new IncomeComparator()); LinkedListLinkedListlt;Tuple2lt;Double, String list = new LinkedListLinkedListlt;Tuple2lt;Double, String(); list.add(lines); return list; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD of RDDs
Possibly in future, if and when spark architecture allows workers to launch spark jobs (the functions passed to transformation or action APIs of RDD), it will be possible to have RDD of RDD. On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar loni...@gmail.com wrote: Simillar question was asked before: http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html Here is one of the reasons why I think RDD[RDD[T]] is not possible: - RDD is only a handle to the actual data partitions. It has a reference/pointer to the *SparkContext* object (*sc*) and a list of partitions. - The *SparkContext *is an object in the Spark Application/Driver Program's JVM. Similarly, the list of partitions is also in the JVM of the driver program. Each partition contains kind of remote references to the partition data on the worker JVMs. - The functions passed to RDD's transformations and actions execute in the worker's JVMs on different nodes. For example, in *rdd.map { x = x*x }*, the function performing *x*x* runs on the JVMs of the worker nodes where the partitions of the RDD reside. These JVMs do not have access to the *sc* since its only on the driver's JVM. - Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd = innerRDD.filter { x = x*x } }*, the worker nodes will not be able to execute the *filter* on *innerRDD *as the code in the worker does not have access to sc and can not launch a spark job. Hope it helps. You need to consider List[RDD] or some other collection. -Kiran On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote: Hi, The problem I am looking at is as follows: - I read in a log file of multiple users as a RDD - I'd like to group the above RDD into *multiple RDDs* by userIds (the key) - my processEachUser() function then takes in each RDD mapped into each individual user, and calls for RDD.map or DataFrame operations on them. (I already had the function coded, I am therefore reluctant to work with the ResultIterable object coming out of rdd.groupByKey() ... ) I've searched the mailing list and googled on RDD of RDDs and seems like it isn't a thing at all. A few choices left seem to be: 1) groupByKey() and then work with the ResultIterable object; 2) groupbyKey() and then write each group into a file, and read them back as individual rdds to process.. Anyone got a better idea or had a similar problem before? Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException
Hi, I posted a question with regards to Phoenix and Spark Streaming on StackOverflow [1]. Please find a copy of the question to this email below the first stack trace. I also already contacted the Phoenix mailing list and tried the suggestion of setting spark.driver.userClassPathFirst. Unfortunately that only pushed me further into the dependency hell, which I tried to resolve until I hit a wall with an UnsatisfiedLinkError on Snappy. What I am trying to achieve: To save a stream from Kafka into Phoenix/Hbase via Spark Streaming. I'm using MapR as a platform and the original exception happens both on a 3-node cluster, as on the MapR Sandbox (a VM for experimentation), in YARN and stand-alone mode. Further experimentation (like the saveAsNewHadoopApiFile below), was done only on the sandbox in standalone mode. Phoenix only supports Spark from 4.4.0 onwards, but I thought I could use a naive implementation that creates a new connection for every RDD from the DStream in 4.3.1. This resulted in the ClassNotFoundException described in [1], so I switched to 4.4.0. Unfortunately the saveToPhoenix method is only available in Scala. So I did find the suggestion to try it via the saveAsNewHadoopApiFile method [2] and an example implementation [3], which I adapted to my own needs. However, 4.4.0 + saveAsNewHadoopApiFile raises the same ClassNotFoundExeption, just a slightly different stacktrace: java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection. at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:995) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection. at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386) at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145) at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288) at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171) at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1881) at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860) at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77) at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860) at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162) at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131) at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133) at java.sql.DriverManager.getConnection(DriverManager.java:571) at java.sql.DriverManager.getConnection(DriverManager.java:187) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68) at org.apache.phoenix.mapreduce.PhoenixRecordWriter.init(PhoenixRecordWriter.java:49) at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:55) ... 8 more Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457) at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350) at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47) at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:286) ... 23 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]
I think Yes, as the documentation says Creates tuples of the elements in this RDD by applying f. Thanks Best Regards On Tue, Jun 9, 2015 at 1:54 PM, amit tewari amittewar...@gmail.com wrote: Actually the question was will keyBy() take accept multiple fields (eg x(0), x(1)) as Key? On Tue, Jun 9, 2015 at 1:07 PM, amit tewari amittewar...@gmail.com wrote: Thanks Akhil, as you suggested, I have to go keyBy(route) as need the columns intact. But wil keyBy() take accept multiple fields (eg x(0), x(1))? Thanks Amit On Tue, Jun 9, 2015 at 12:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try this way: scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3))) scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3))) scala input11.join(input22).take(10) PairFunctions basically requires RDD[K,V] and in your case its ((String, String), String, String). You can also look in keyBy if you don't want to concatenate your keys. Thanks Best Regards On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com wrote: Hi Dear Spark Users I am very new to Spark/Scala. Am using Datastax (4.7/Spark 1.2.1) and struggling with following error/issue. Already tried options like import org.apache.spark.SparkContext._ or explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions. But error not resolved. Help much appreciated. Thanks AT scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3))) scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3))) scala input11.join(input22).take(10) console:66: error: value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)] input11.join(input22).take(10)
Re: RDD of RDDs
Simillar question was asked before: http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html Here is one of the reasons why I think RDD[RDD[T]] is not possible: - RDD is only a handle to the actual data partitions. It has a reference/pointer to the *SparkContext* object (*sc*) and a list of partitions. - The *SparkContext *is an object in the Spark Application/Driver Program's JVM. Similarly, the list of partitions is also in the JVM of the driver program. Each partition contains kind of remote references to the partition data on the worker JVMs. - The functions passed to RDD's transformations and actions execute in the worker's JVMs on different nodes. For example, in *rdd.map { x = x*x }*, the function performing *x*x* runs on the JVMs of the worker nodes where the partitions of the RDD reside. These JVMs do not have access to the *sc* since its only on the driver's JVM. - Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd = innerRDD.filter { x = x*x } }*, the worker nodes will not be able to execute the *filter* on *innerRDD *as the code in the worker does not have access to sc and can not launch a spark job. Hope it helps. You need to consider List[RDD] or some other collection. -Kiran On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote: Hi, The problem I am looking at is as follows: - I read in a log file of multiple users as a RDD - I'd like to group the above RDD into *multiple RDDs* by userIds (the key) - my processEachUser() function then takes in each RDD mapped into each individual user, and calls for RDD.map or DataFrame operations on them. (I already had the function coded, I am therefore reluctant to work with the ResultIterable object coming out of rdd.groupByKey() ... ) I've searched the mailing list and googled on RDD of RDDs and seems like it isn't a thing at all. A few choices left seem to be: 1) groupByKey() and then work with the ResultIterable object; 2) groupbyKey() and then write each group into a file, and read them back as individual rdds to process.. Anyone got a better idea or had a similar problem before? Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
Re: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
Hi, Are you restarting your Spark streaming context through getOrCreate? On 9 Jun 2015 09:30, Haopu Wang hw...@qilinsoft.com wrote: When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) ... 25 more 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4 times; aborting job 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job 143382585 ms.0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Rdd of Rdds
Replicating my answer to another question asked today: Here is one of the reasons why I think RDD[RDD[T]] is not possible: * RDD is only a handle to the actual data partitions. It has a reference/pointer to the /SparkContext /object (/sc/) and a list of partitions. * The SparkContext is an object in the Spark Application/Driver Program's JVM. Similarly, the list of partitions is also in the JVM of the driver program. Each partition contains kind of remote references to the partition data on the worker JVMs. * The functions passed to RDD's transformations and actions execute in the worker's JVMs on different nodes. For example, in *rdd.map { x = x*x }*, the function performing *x*x* runs on the JVMs of the worker nodes where the partitions of the RDD reside. These JVMs do not have access to the sc since its only on the driver's JVM. * Thus, in case of your RDD of RDD: *outerRDD.map { innerRdd = innerRDD.filter { x = x*x } }*, the worker nodes will not be able to execute the filter on innerRDD as the code in the worker does not have access to sc and can not launch a spark job. Hope it helps. You need to consider List[RDD] or some other collection. Possibly in future, if and when spark architecture allows workers to launch spark jobs (the functions passed to transformation or action APIs of RDD), it will be possible to have RDD of RDD. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-tp17025p23217.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) ... 25 more 15/06/09 12:57:30 ERROR
回复:Re: How to decrease the time of storing block in memory
Only 1 minor GC, 0.07s. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: How to decrease the time of storing block in memory 日期:2015年06月09日 15点02分 May be you should check in your driver UI and see if there's any GC time involved etc. ThanksBest Regards On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote: hi there I am trying to descrease my app's running time in worker node. I checked the log and found the most time-wasting part is below:15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 353.3 MB) 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 693 bytes result sent to driver I don't know why it needs 19s to storing 2.1KB size data to memory. Is there any tuning method? The attache is the full log, here it is:15/06/08 16:14:02 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/06/08 16:14:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/08 16:14:10 INFO spark.SecurityManager: Changing view acls to: root 15/06/08 16:14:10 INFO spark.SecurityManager: Changing modify acls to: root 15/06/08 16:14:10 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/06/08 16:14:14 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/08 16:14:14 INFO Remoting: Starting remoting 15/06/08 16:14:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@slave5:54684] 15/06/08 16:14:15 INFO util.Utils: Successfully started service 'driverPropsFetcher' on port 54684. 15/06/08 16:14:16 INFO spark.SecurityManager: Changing view acls to: root 15/06/08 16:14:16 INFO spark.SecurityManager: Changing modify acls to: root 15/06/08 16:14:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/06/08 16:14:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/08 16:14:16 INFO Remoting: Starting remoting 15/06/08 16:14:17 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/06/08 16:14:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@slave5:49169] 15/06/08 16:14:17 INFO util.Utils: Successfully started service 'sparkExecutor' on port 49169. 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@slave5:58630/user/MapOutputTracker 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@slave5:58630/user/BlockManagerMaster 15/06/08 16:14:17 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-548b4618-4aba-4b63-9467-381fbfea8d5b/spark-83737dd6-46b0-47ee-82a5-5afee46bdbf5/spark-0fe9d8ba-2910-44a2-bf4f-80d179f5d58b/blockmgr-b4884e7a-2527-447a-9fc5-1823d923c2f1 15/06/08 16:14:17 INFO storage.MemoryStore: MemoryStore started with capacity 353.3 MB 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to OutputCommitCoordinator: akka.tcp://sparkDriver@slave5:58630/user/OutputCommitCoordinator 15/06/08 16:14:17 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkDriver@slave5:58630/user/CoarseGrainedScheduler 15/06/08 16:14:18 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@slave5:48926/user/Worker 15/06/08 16:14:18 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver 15/06/08 16:14:18 INFO executor.Executor: Starting executor ID 0 on host slave5 15/06/08 16:14:18 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@slave5:48926/user/Worker 15/06/08 16:14:21 WARN internal.ThreadLocalRandom: Failed to generate a seed from SecureRandom within 3 seconds. Not enough entrophy? 15/06/08 16:14:21 INFO netty.NettyBlockTransferService: Server created on 53449 15/06/08 16:14:21 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/06/08 16:14:21 INFO storage.BlockManagerMaster: Registered BlockManager 15/06/08 16:14:21 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@slave5:58630/user/HeartbeatReceiver 15/06/08 16:14:21 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0 15/06/08 16:14:21 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 15/06/08 16:14:21 INFO executor.Executor: Fetching
Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS
You can put a Thread.sleep(10) in the code to have the UI available for quiet some time. (Put it just before starting any of your transformations) Or you can enable the spark history server https://spark.apache.org/docs/latest/monitoring.html too. I believe --jars https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management would download the dependency jars on all your worker machines (can be found in spark work dir of your application along with stderr stdout files). Thanks Best Regards On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei dong...@microsoft.com wrote: Thanks Akhil: The driver fails so fast to get a look at 4040. Is there any other way to see the download and ship process of the files? Is driver supposed to download these jars from HDFS to some location, then ship them to excutors? I can see from log that the driver downloaded the application jar but not the other jars specified by “—jars”. Or I misunderstand the usage of “--jars”, and the jars should be already in every worker, driver will not download them? Is there some useful docs? Thanks Dong Lei *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, June 9, 2015 3:24 PM *To:* Dong Lei *Cc:* user@spark.apache.org *Subject:* Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS Once you submits the application, you can check in the driver UI (running on port 4040) Environment Tab to see whether those jars you added got shipped or not. If they are shipped and still you are getting NoClassDef exceptions then it means that you are having a jar conflict which you can resolve by putting the jar with the class in it on the top of your classpath. Thanks Best Regards On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.com wrote: Hi, spark-users: I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a job, with the following command: Spark-submit --class myClass --master spark://localhost:7077/ --deploy-mode cluster --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar --files hdfs://localhost/1.txt, hdfs://localhost/2.txt hdfs://localhost/main.jar the stderr in the driver showed java.lang.ClassNotDefException for a class in 1.jar. I checked the log that spark has added these jars: INFO SparkContext: Added JAR hdfs:// …1.jar INFO SparkContext: Added JAR hdfs:// …2.jar In the folder of the driver, I only saw the main.jar is copied to that place, *but the other jars and files were not there* Could someone explain *how should I pass the jars and files* needed by the main jar to spark? If my class in main.jar refer to these files with a relative path, *will spark copy these files into one folder*? BTW, my class works in a client mode with all jars and files in local. Thanks Dong Lei
Re: Re: How to decrease the time of storing block in memory
Is it that task taking 19s? It won't be simply taking 19s to store 2KB of data into memory there could be other operations happening too (the transformations that you are doing), It would be good if you can paste the code snippet that you are running to have a better understanding. Thanks Best Regards On Tue, Jun 9, 2015 at 2:09 PM, luohui20...@sina.com wrote: Only 1 minor GC, 0.07s. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: How to decrease the time of storing block in memory 日期:2015年06月09日 15点02分 May be you should check in your driver UI and see if there's any GC time involved etc. Thanks Best Regards On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote: hi there I am trying to descrease my app's running time in worker node. I checked the log and found the most time-wasting part is below: 15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 353.3 MB) 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 693 bytes result sent to driver I don't know why it needs 19s to storing 2.1KB size data to memory. Is there any tuning method? The attache is the full log, here it is: 15/06/08 16:14:02 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/06/08 16:14:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/08 16:14:10 INFO spark.SecurityManager: Changing view acls to: root 15/06/08 16:14:10 INFO spark.SecurityManager: Changing modify acls to: root 15/06/08 16:14:10 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/06/08 16:14:14 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/08 16:14:14 INFO Remoting: Starting remoting 15/06/08 16:14:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@slave5:54684] 15/06/08 16:14:15 INFO util.Utils: Successfully started service 'driverPropsFetcher' on port 54684. 15/06/08 16:14:16 INFO spark.SecurityManager: Changing view acls to: root 15/06/08 16:14:16 INFO spark.SecurityManager: Changing modify acls to: root 15/06/08 16:14:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/06/08 16:14:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/08 16:14:16 INFO Remoting: Starting remoting 15/06/08 16:14:17 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/06/08 16:14:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@slave5:49169] 15/06/08 16:14:17 INFO util.Utils: Successfully started service 'sparkExecutor' on port 49169. 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@slave5:58630/user/MapOutputTracker 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@slave5:58630/user/BlockManagerMaster 15/06/08 16:14:17 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-548b4618-4aba-4b63-9467-381fbfea8d5b/spark-83737dd6-46b0-47ee-82a5-5afee46bdbf5/spark-0fe9d8ba-2910-44a2-bf4f-80d179f5d58b/blockmgr-b4884e7a-2527-447a-9fc5-1823d923c2f1 15/06/08 16:14:17 INFO storage.MemoryStore: MemoryStore started with capacity 353.3 MB 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to OutputCommitCoordinator: akka.tcp://sparkDriver@slave5 :58630/user/OutputCommitCoordinator 15/06/08 16:14:17 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkDriver@slave5:58630/user/CoarseGrainedScheduler 15/06/08 16:14:18 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@slave5:48926/user/Worker 15/06/08 16:14:18 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver 15/06/08 16:14:18 INFO executor.Executor: Starting executor ID 0 on host slave5 15/06/08 16:14:18 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@slave5:48926/user/Worker 15/06/08 16:14:21 WARN internal.ThreadLocalRandom: Failed to generate a seed from SecureRandom within 3 seconds. Not enough entrophy? 15/06/08 16:14:21 INFO netty.NettyBlockTransferService: Server created on 53449 15/06/08 16:14:21 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/06/08 16:14:21 INFO
RE: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS
Thanks Akhil: The driver fails so fast to get a look at 4040. Is there any other way to see the download and ship process of the files? Is driver supposed to download these jars from HDFS to some location, then ship them to excutors? I can see from log that the driver downloaded the application jar but not the other jars specified by “—jars”. Or I misunderstand the usage of “--jars”, and the jars should be already in every worker, driver will not download them? Is there some useful docs? Thanks Dong Lei From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, June 9, 2015 3:24 PM To: Dong Lei Cc: user@spark.apache.org Subject: Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS Once you submits the application, you can check in the driver UI (running on port 4040) Environment Tab to see whether those jars you added got shipped or not. If they are shipped and still you are getting NoClassDef exceptions then it means that you are having a jar conflict which you can resolve by putting the jar with the class in it on the top of your classpath. Thanks Best Regards On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.commailto:dong...@microsoft.com wrote: Hi, spark-users: I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a job, with the following command: Spark-submit --class myClass --master spark://localhost:7077/ --deploy-mode cluster --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar --files hdfs://localhost/1.txt, hdfs://localhost/2.txt hdfs://localhost/main.jar the stderr in the driver showed java.lang.ClassNotDefException for a class in 1.jar. I checked the log that spark has added these jars: INFO SparkContext: Added JAR hdfs:// …1.jar INFO SparkContext: Added JAR hdfs:// …2.jar In the folder of the driver, I only saw the main.jar is copied to that place, but the other jars and files were not there Could someone explain how should I pass the jars and files needed by the main jar to spark? If my class in main.jar refer to these files with a relative path, will spark copy these files into one folder? BTW, my class works in a client mode with all jars and files in local. Thanks Dong Lei
Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]
Actually the question was will keyBy() take accept multiple fields (eg x(0), x(1)) as Key? On Tue, Jun 9, 2015 at 1:07 PM, amit tewari amittewar...@gmail.com wrote: Thanks Akhil, as you suggested, I have to go keyBy(route) as need the columns intact. But wil keyBy() take accept multiple fields (eg x(0), x(1))? Thanks Amit On Tue, Jun 9, 2015 at 12:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try this way: scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3))) scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3))) scala input11.join(input22).take(10) PairFunctions basically requires RDD[K,V] and in your case its ((String, String), String, String). You can also look in keyBy if you don't want to concatenate your keys. Thanks Best Regards On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com wrote: Hi Dear Spark Users I am very new to Spark/Scala. Am using Datastax (4.7/Spark 1.2.1) and struggling with following error/issue. Already tried options like import org.apache.spark.SparkContext._ or explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions. But error not resolved. Help much appreciated. Thanks AT scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3))) scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3))) scala input11.join(input22).take(10) console:66: error: value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)] input11.join(input22).take(10)
Re: Cassandra Submit
I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says *Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160* In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: yes, whatever you put for listen_address in cassandra.yaml. Also, you should try to connect to your cassandra cluster via bin/cqlsh to make sure you have connectivity before you try to make a a connection via spark. On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I run my project on local. How can find ip address of my cassandra host ? From cassandra.yaml or ?? yasemin 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com: ? = ip address of your cassandra host On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote: Hi , How can I find spark.cassandra.connection.host? And what should I change ? Should I change cassandra.yaml ? Error says me *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* What should I add *SparkConf sparkConf = new SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts, true).set(spark.cassandra.connection.host, ?);* Best yasemin 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com: Check your spark.cassandra.connection.host setting. It should be pointing to one of your Cassandra nodes. Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Friday, June 5, 2015 7:31 AM *To:* user@spark.apache.org *Subject:* Cassandra Submit Hi, I am using cassandraDB in my project. I had that error *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* I think I have to modify the submit line. What should I add or remove when I submit my project? Best, yasemin -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç
Re: FileOutputCommitter deadlock 1.3.1
On 8 Jun 2015, at 15:55, Richard Marscher rmarsc...@localytics.commailto:rmarsc...@localytics.com wrote: Hi, we've been seeing occasional issues in production with the FileOutCommitter reaching a deadlock situation. We are writing our data to S3 and currently have speculation enabled. What we see is that Spark get's a file not found error trying to access a temporary part file that it wrote (part-#2 file it seems to be every time?), so the task fails. But the file actually exists in S3 so subsequent speculations and task retries all fail because the committer tells them the file exists. This will persist until human intervention kills the application. Usually rerunning the application will succeed on the next try so it is not deterministic with the dataset or anything. It seems like there isn't a good story yet for file writing and speculation (https://issues.apache.org/jira/browse/SPARK-4879), although our error here seems worse that reports in that issue since I believe ours deadlocks and those don't? S3 isn't a real filesystem, which is why the hadoop docs say don't turn speculation on if your destination is s3n/s3a or swift http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/introduction.html 1. rename of directories aren't atomic —but rename is exactly speculation uses to ensure an atomic commit of one tasks output 2. eventual consistency means that things aren't immediately visible 3. when you create an output file for writing, it's not there until the final close(), so other code that looks for the file existing so as to implement any locking algorithm is in trouble. These are fundamental issues which hurt all the bits of Hadoop code that expect basic POSIX-ish filesystem behaviours of: files immediately visible on create, atomic and O(1) mv and rmdir, efficient seek(), and consistency of Create, Update and Delete across the entire cluster. What to do? -keep speculation on for output to HDFS; turn it off for s3 -lift the amazon hadoop-aws JAR from EMR (hadoop 2.6+, for earlier versions you need the whole of hadoop-common) -look at netflix s3mper http://techblog.netflix.com/2014/01/s3mper-consistency-in-cloud.html -don't use US East, as it has the worst consistency (it doesn't even support create consistency). That may help, but it still won't address non-atomic directory rename I don't know about SPARK-4879, so can't comment on what's happening there, but if you are using s3 as your destination it's irrelevant: you will have problems if speculation is turned on. That's the same as if you use classic Hadoop MR, Tez or others. It's also why s3 isn't supported as a filesystem for things like HBase. Hopefully the work on HADOOP-9565 will eventually add some operations to enable speculative commits for code written explicitly against it. -Steve
RE: Spark SQL with Thrift Server is very very slow and finally failing
Is it the large result set return from the Thrift Server? And can you paste the SQL and physical plan? From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Tuesday, June 9, 2015 12:01 PM To: Sourav Mazumder Cc: user Subject: Re: Spark SQL with Thrift Server is very very slow and finally failing Which Spark release are you using ? Can you pastebin the stack trace w.r.t. ExecutorLostFailure ? Thanks On Mon, Jun 8, 2015 at 8:52 PM, Sourav Mazumder sourav.mazumde...@gmail.commailto:sourav.mazumde...@gmail.com wrote: Hi, I am trying to run a SQL form a JDBC driver using Spark's Thrift Server. I'm doing a join between a Hive Table of size around 100 GB and another Hive Table with 10 KB, with a filter on a particular column The query takes more than 45 minutes and then I get ExecutorLostFailure. That is because of memory as once I increase the memory the failure happens but after a long time. I'm having executor memory 20 GB, Spark DRiver Memory 2 GB, Executor Instances 2 and Executor Core 2. Running the job using Yarn with master as 'yarn-client'. Any idea if I'm missing any other configuration ? Regards, Sourav
Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]
Try this way: scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3))) scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3))) scala input11.join(input22).take(10) PairFunctions basically requires RDD[K,V] and in your case its ((String, String), String, String). You can also look in keyBy if you don't want to concatenate your keys. Thanks Best Regards On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com wrote: Hi Dear Spark Users I am very new to Spark/Scala. Am using Datastax (4.7/Spark 1.2.1) and struggling with following error/issue. Already tried options like import org.apache.spark.SparkContext._ or explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions. But error not resolved. Help much appreciated. Thanks AT scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3))) scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3))) scala input11.join(input22).take(10) console:66: error: value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)] input11.join(input22).take(10)
Re: Saving compressed textFiles from a DStream in Scala
like this? myDStream.foreachRDD(rdd = rdd.saveAsTextFile(/sigmoid/, codec )) Thanks Best Regards On Mon, Jun 8, 2015 at 8:06 PM, Bob Corsaro rcors...@gmail.com wrote: It looks like saveAsTextFiles doesn't support the compression parameter of RDD.saveAsTextFile. Is there a way to add the functionality in my client code without patching Spark? I tried making my own saveFunc function and calling DStream.foreachRDD but ran into trouble with invoking rddToFileName and making the RDD type parameter work properly. It's probably just do to my lack of Scala knowledge. Can anyone give me a hand? def saveAsTextFiles(prefix: String, suffix: String = ): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) = { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc) }
RE: SparkSQL: How to specify replication factor on the persisted parquet files?
Cheng, yes, it works, I set the property in SparkConf before initiating SparkContext. The property name is spark.hadoop.dfs.replication Thanks fro the help! -Original Message- From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Monday, June 08, 2015 6:41 PM To: Haopu Wang; user Subject: Re: SparkSQL: How to specify replication factor on the persisted parquet files? Then one possible workaround is to set dfs.replication in sc.hadoopConfiguration. However, this configuration is shared by all Spark jobs issued within the same application. Since different Spark jobs can be issued from different threads, you need to pay attention to synchronization. Cheng On 6/8/15 2:46 PM, Haopu Wang wrote: Cheng, thanks for the response. Yes, I was using HiveContext.setConf() to set dfs.replication. However, I cannot change the value in Hadoop core-site.xml because that will change every HDFS file. I only want to change the replication factor of some specific files. -Original Message- From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Sunday, June 07, 2015 10:17 PM To: Haopu Wang; user Subject: Re: SparkSQL: How to specify replication factor on the persisted parquet files? Were you using HiveContext.setConf()? dfs.replication is a Hadoop configuration, but setConf() is only used to set Spark SQL specific configurations. You may either set it in your Hadoop core-site.xml. Cheng On 6/2/15 2:28 PM, Haopu Wang wrote: Hi, I'm trying to save SparkSQL DataFrame to a persistent Hive table using the default parquet data source. I don't know how to change the replication factor of the generated parquet files on HDFS. I tried to set dfs.replication on HiveContext but that didn't work. Any suggestions are appreciated very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error in using saveAsParquetFile
Cheng you were right. It works when I remove the field from either one. I should have checked the types beforehand. What confused me is that Spark attempted to join it and midway threw the error. It isn't quite there yet. Thanks for the help. On Mon, Jun 8, 2015 at 8:29 PM Cheng Lian lian.cs@gmail.com wrote: I suspect that Bookings and Customerdetails both have a PolicyType field, one is string and the other is an int. Cheng On 6/8/15 9:15 PM, Bipin Nag wrote: Hi Jeetendra, Cheng I am using following code for joining val Bookings = sqlContext.load(/home/administrator/stageddata/Bookings) val Customerdetails = sqlContext.load(/home/administrator/stageddata/Customerdetails) val CD = Customerdetails. where($CreatedOn 2015-04-01 00:00:00.0). where($CreatedOn 2015-05-01 00:00:00.0) //Bookings by CD val r1 = Bookings. withColumnRenamed(ID,ID2) val r2 = CD. join(r1,CD.col(CustomerID) === r1.col(ID2),left) r2.saveAsParquetFile(/home/administrator/stageddata/BOOKING_FULL); @Cheng I am not appending the joined table to an existing parquet file, it is a new file. @Jitender I have a rather large parquet file and it also contains some confidential data. Can you tell me what you need to check in it. Thanks On 8 June 2015 at 16:47, Jeetendra Gangele gangele...@gmail.com wrote: Parquet file when are you loading these file? can you please share the code where you are passing parquet file to spark?. On 8 June 2015 at 16:39, Cheng Lian lian.cs@gmail.com wrote: Are you appending the joined DataFrame whose PolicyType is string to an existing Parquet file whose PolicyType is int? The exception indicates that Parquet found a column with conflicting data types. Cheng On 6/8/15 5:29 PM, bipin wrote: Hi I get this error message when saving a table: parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: optional binary PolicyType (UTF8) != optional int32 PolicyType at parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:105) at parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:97) at parquet.schema.PrimitiveType.accept(PrimitiveType.java:386) at parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:87) at parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:61) at parquet.schema.MessageType.accept(MessageType.java:55) at parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:148) at parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:137) at parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:157) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:107) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetRelation2.org $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:667) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I joined two tables both loaded from parquet file, the joined table when saved throws this error. I could not find anything about this error. Could this be a bug ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-using-saveAsParquetFile-tp23204.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Hi, Find my attached resume. I have total around 7 years of work experience. I worked for Amazon and Expedia in my
Re: How to decrease the time of storing block in memory
May be you should check in your driver UI and see if there's any GC time involved etc. Thanks Best Regards On Mon, Jun 8, 2015 at 5:45 PM, luohui20...@sina.com wrote: hi there I am trying to descrease my app's running time in worker node. I checked the log and found the most time-wasting part is below: 15/06/08 16:14:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 353.3 MB) 15/06/08 16:14:42 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 693 bytes result sent to driver I don't know why it needs 19s to storing 2.1KB size data to memory. Is there any tuning method? The attache is the full log, here it is: 15/06/08 16:14:02 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/06/08 16:14:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/08 16:14:10 INFO spark.SecurityManager: Changing view acls to: root 15/06/08 16:14:10 INFO spark.SecurityManager: Changing modify acls to: root 15/06/08 16:14:10 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/06/08 16:14:14 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/08 16:14:14 INFO Remoting: Starting remoting 15/06/08 16:14:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@slave5:54684] 15/06/08 16:14:15 INFO util.Utils: Successfully started service 'driverPropsFetcher' on port 54684. 15/06/08 16:14:16 INFO spark.SecurityManager: Changing view acls to: root 15/06/08 16:14:16 INFO spark.SecurityManager: Changing modify acls to: root 15/06/08 16:14:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/06/08 16:14:16 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/06/08 16:14:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/08 16:14:16 INFO Remoting: Starting remoting 15/06/08 16:14:17 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/06/08 16:14:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@slave5:49169] 15/06/08 16:14:17 INFO util.Utils: Successfully started service 'sparkExecutor' on port 49169. 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@slave5:58630/user/MapOutputTracker 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@slave5:58630/user/BlockManagerMaster 15/06/08 16:14:17 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-548b4618-4aba-4b63-9467-381fbfea8d5b/spark-83737dd6-46b0-47ee-82a5-5afee46bdbf5/spark-0fe9d8ba-2910-44a2-bf4f-80d179f5d58b/blockmgr-b4884e7a-2527-447a-9fc5-1823d923c2f1 15/06/08 16:14:17 INFO storage.MemoryStore: MemoryStore started with capacity 353.3 MB 15/06/08 16:14:17 INFO util.AkkaUtils: Connecting to OutputCommitCoordinator: akka.tcp://sparkDriver@slave5 :58630/user/OutputCommitCoordinator 15/06/08 16:14:17 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkDriver@slave5:58630/user/CoarseGrainedScheduler 15/06/08 16:14:18 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@slave5:48926/user/Worker 15/06/08 16:14:18 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver 15/06/08 16:14:18 INFO executor.Executor: Starting executor ID 0 on host slave5 15/06/08 16:14:18 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@slave5:48926/user/Worker 15/06/08 16:14:21 WARN internal.ThreadLocalRandom: Failed to generate a seed from SecureRandom within 3 seconds. Not enough entrophy? 15/06/08 16:14:21 INFO netty.NettyBlockTransferService: Server created on 53449 15/06/08 16:14:21 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/06/08 16:14:21 INFO storage.BlockManagerMaster: Registered BlockManager 15/06/08 16:14:21 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@slave5:58630/user/HeartbeatReceiver 15/06/08 16:14:21 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0 15/06/08 16:14:21 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 15/06/08 16:14:21 INFO executor.Executor: Fetching http://192.168.100.11:50648/jars/ShellCompare.jar with timestamp 1433751228699 15/06/08 16:14:21 INFO util.Utils: Fetching http://192.168.100.11:50648/jars/ShellCompare.jar to
[SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) ... 25 more 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4 times; aborting job 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job 143382585 ms.0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark error value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)]
Thanks Akhil, as you suggested, I have to go keyBy(route) as need the columns intact. But wil keyBy() take accept multiple fields (eg x(0), x(1))? Thanks Amit On Tue, Jun 9, 2015 at 12:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try this way: scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=(*(x(0) + x(1)*),x(2),x(3))) scalaval input22 = input2.map(x=(*(x(0) + x(1)*),x(2),x(3))) scala input11.join(input22).take(10) PairFunctions basically requires RDD[K,V] and in your case its ((String, String), String, String). You can also look in keyBy if you don't want to concatenate your keys. Thanks Best Regards On Tue, Jun 9, 2015 at 10:14 AM, amit tewari amittewar...@gmail.com wrote: Hi Dear Spark Users I am very new to Spark/Scala. Am using Datastax (4.7/Spark 1.2.1) and struggling with following error/issue. Already tried options like import org.apache.spark.SparkContext._ or explicit import org.apache.spark.SparkContext.rddToPairRDDFunctions. But error not resolved. Help much appreciated. Thanks AT scalaval input1 = sc.textFile(/test7).map(line = line.split(,).map(_.trim)); scalaval input2 = sc.textFile(/test8).map(line = line.split(,).map(_.trim)); scalaval input11 = input1.map(x=((x(0),x(1)),x(2),x(3))) scalaval input22 = input2.map(x=((x(0),x(1)),x(2),x(3))) scala input11.join(input22).take(10) console:66: error: value join is not a member of org.apache.spark.rdd.RDD[((String, String), String, String)] input11.join(input22).take(10)
Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS
Once you submits the application, you can check in the driver UI (running on port 4040) Environment Tab to see whether those jars you added got shipped or not. If they are shipped and still you are getting NoClassDef exceptions then it means that you are having a jar conflict which you can resolve by putting the jar with the class in it on the top of your classpath. Thanks Best Regards On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.com wrote: Hi, spark-users: I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a job, with the following command: Spark-submit --class myClass --master spark://localhost:7077/ --deploy-mode cluster --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar --files hdfs://localhost/1.txt, hdfs://localhost/2.txt hdfs://localhost/main.jar the stderr in the driver showed java.lang.ClassNotDefException for a class in 1.jar. I checked the log that spark has added these jars: INFO SparkContext: Added JAR hdfs:// …1.jar INFO SparkContext: Added JAR hdfs:// …2.jar In the folder of the driver, I only saw the main.jar is copied to that place, *but the other jars and files were not there* Could someone explain *how should I pass the jars and files* needed by the main jar to spark? If my class in main.jar refer to these files with a relative path, *will spark copy these files into one folder*? BTW, my class works in a client mode with all jars and files in local. Thanks Dong Lei
Re: SparkSQL: How to specify replication factor on the persisted parquet files?
Hi I am little confused here. If I am writing to HDFS,shouldn't HDFS replication factor will automatically kick in? In other words, how spark writer is different than a hdfs -put commnd (from perspective of HDFS, of course)? Best Ayan On Tue, Jun 9, 2015 at 5:17 PM, Haopu Wang hw...@qilinsoft.com wrote: Cheng, yes, it works, I set the property in SparkConf before initiating SparkContext. The property name is spark.hadoop.dfs.replication Thanks fro the help! -Original Message- From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Monday, June 08, 2015 6:41 PM To: Haopu Wang; user Subject: Re: SparkSQL: How to specify replication factor on the persisted parquet files? Then one possible workaround is to set dfs.replication in sc.hadoopConfiguration. However, this configuration is shared by all Spark jobs issued within the same application. Since different Spark jobs can be issued from different threads, you need to pay attention to synchronization. Cheng On 6/8/15 2:46 PM, Haopu Wang wrote: Cheng, thanks for the response. Yes, I was using HiveContext.setConf() to set dfs.replication. However, I cannot change the value in Hadoop core-site.xml because that will change every HDFS file. I only want to change the replication factor of some specific files. -Original Message- From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Sunday, June 07, 2015 10:17 PM To: Haopu Wang; user Subject: Re: SparkSQL: How to specify replication factor on the persisted parquet files? Were you using HiveContext.setConf()? dfs.replication is a Hadoop configuration, but setConf() is only used to set Spark SQL specific configurations. You may either set it in your Hadoop core-site.xml. Cheng On 6/2/15 2:28 PM, Haopu Wang wrote: Hi, I'm trying to save SparkSQL DataFrame to a persistent Hive table using the default parquet data source. I don't know how to change the replication factor of the generated parquet files on HDFS. I tried to set dfs.replication on HiveContext but that didn't work. Any suggestions are appreciated very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Re: Error in using saveAsParquetFile
Yeah, this does look confusing. We are trying to improve the error reporting by catching similar issues at the end of the analysis phase and give more descriptive error messages. Part of the work can be found here: https://github.com/apache/spark/blob/0902a11940e550e85a53e110b490fe90e16ddaf4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala Cheng On 6/9/15 2:51 PM, Bipin Nag wrote: Cheng you were right. It works when I remove the field from either one. I should have checked the types beforehand. What confused me is that Spark attempted to join it and midway threw the error. It isn't quite there yet. Thanks for the help. On Mon, Jun 8, 2015 at 8:29 PM Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: I suspect that Bookings and Customerdetails both have a PolicyType field, one is string and the other is an int. Cheng On 6/8/15 9:15 PM, Bipin Nag wrote: Hi Jeetendra, Cheng I am using following code for joining val Bookings = sqlContext.load(/home/administrator/stageddata/Bookings) val Customerdetails = sqlContext.load(/home/administrator/stageddata/Customerdetails) val CD = Customerdetails. where($CreatedOn 2015-04-01 00:00:00.0). where($CreatedOn 2015-05-01 00:00:00.0) //Bookings by CD val r1 = Bookings. withColumnRenamed(ID,ID2) val r2 = CD. join(r1,CD.col(CustomerID) === r1.col(ID2),left) r2.saveAsParquetFile(/home/administrator/stageddata/BOOKING_FULL); @Cheng I am not appending the joined table to an existing parquet file, it is a new file. @Jitender I have a rather large parquet file and it also contains some confidential data. Can you tell me what you need to check in it. Thanks On 8 June 2015 at 16:47, Jeetendra Gangele gangele...@gmail.com mailto:gangele...@gmail.com wrote: Parquet file when are you loading these file? can you please share the code where you are passing parquet file to spark?. On 8 June 2015 at 16:39, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Are you appending the joined DataFrame whose PolicyType is string to an existing Parquet file whose PolicyType is int? The exception indicates that Parquet found a column with conflicting data types. Cheng On 6/8/15 5:29 PM, bipin wrote: Hi I get this error message when saving a table: parquet.io http://parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: optional binary PolicyType (UTF8) != optional int32 PolicyType at parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:105) at parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:97) at parquet.schema.PrimitiveType.accept(PrimitiveType.java:386) at parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:87) at parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:61) at parquet.schema.MessageType.accept(MessageType.java:55) at parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:148) at parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:137) at parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:157) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:107) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:667) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at
Different Sorting RDD methods in Apache Spark
For a research project, I tried sorting the elements in an RDD. I did this in two different approaches. In the first method, I applied a mapPartitions() function on the RDD, so that it would sort the contents of the RDD, and provide a result RDD that contains the sorted list as the only record in the RDD. Then, I applied a reduce function which basically merges sorted lists. I ran these experiments on an EC2 cluster containing 30 nodes. I set it up using the spark ec2 script. The data file was stored in HDFS. In the second approach I used the sortBy method in Spark. I performed these operation on the US census data(100MB) found here A single lines looks like this 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child 18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 5. I sorted based on the 25th value in the CSV. In this line that is 1758.14. I noticed that sortBy performs worse than the other method. Is this the expected scenario? If it is, why wouldn't the mapPartitions() and reduce() be the default sorting approach? Here is my implementation public static void sortBy(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); long start = System.currentTimeMillis(); rdd.sortBy(new FunctionString, Double(){ @Override public Double call(String v1) throws Exception { // TODO Auto-generated method stub String [] arr = v1.split(,); return Double.parseDouble(arr[24]); } }, true, 9).collect(); long end = System.currentTimeMillis(); System.out.println(SortBy: + (end - start)); } public static void sortList(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l, 8); long start = System.currentTimeMillis(); JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 = rdd.mapPartitions(new FlatMapFunctionIteratorlt;String, LinkedListTuple2lt;Double, String(){ @Override public IterableLinkedListlt;Tuple2lt;Double, String call(IteratorString t) throws Exception { // TODO Auto-generated method stub LinkedListTuple2lt;Double, String lines = new LinkedListTuple2lt;Double, String(); while(t.hasNext()){ String s = t.next(); String arr1[] = s.split(,); Tuple2Double, String t1 = new Tuple2Double, String(Double.parseDouble(arr1[24]),s); lines.add(t1); } Collections.sort(lines, new IncomeComparator()); LinkedListLinkedListlt;Tuple2lt;Double, String list = new LinkedListLinkedListlt;Tuple2lt;Double, String(); list.add(lines); return list; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cassandra Submit
Yes my cassandra is listening on 9160 I think. Actually I know from yaml file. The file includes : rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 I check the port nc -z localhost 9160; echo $? it returns me 0. I think it close, should I open this port ? 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: Is your cassandra installation actually listening on 9160? lsof -i :9160COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java29232 ykadiysk 69u IPv4 42152497 0t0 TCP localhost:9160 (LISTEN) I am running an out-of-the box cassandra conf where rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote: I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says *Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160* In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: yes, whatever you put for listen_address in cassandra.yaml. Also, you should try to connect to your cassandra cluster via bin/cqlsh to make sure you have connectivity before you try to make a a connection via spark. On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I run my project on local. How can find ip address of my cassandra host ? From cassandra.yaml or ?? yasemin 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com: ? = ip address of your cassandra host On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote: Hi , How can I find spark.cassandra.connection.host? And what should I change ? Should I change cassandra.yaml ? Error says me *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* What should I add *SparkConf sparkConf = new SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts, true).set(spark.cassandra.connection.host, ?);* Best yasemin 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com: Check your spark.cassandra.connection.host setting. It should be pointing to one of your Cassandra nodes. Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Friday, June 5, 2015 7:31 AM *To:* user@spark.apache.org *Subject:* Cassandra Submit Hi, I am using cassandraDB in my project. I had that error *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* I think I have to modify the submit line. What should I add or remove when I submit my project? Best, yasemin -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç
Re: BigDecimal problem in parquet file
Would you please provide a snippet that reproduce this issue? What version of Spark were you using? Cheng On 6/9/15 8:18 PM, bipin wrote: Hi, When I try to save my data frame as a parquet file I get the following error: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) How to fix this problem ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
traverse a graph based on edge properties whilst counting matching vertex attributes
Hi All, I was hoping somebody might be able to help out, I currently have a network built using graphx which looks like the following (only with a much larger number of vertices and edges) Vertices ID, Attribute1, Attribute2 1001 2 0 1002 1 0 1003 2 1 1004 3 2 1006 4 0 1007 5 1 Edges Source, Destination, Attribute 1001 1002 7 1002 1003 7 1003 1004 7 1004 1005 3 1002 1006 5 1006 1007 5 For each vertex I need to send a message down a chain to each connected component based on the edge attribute and count how many matches there are of the vertex attribute to another vertex attribute along the chain. So for example: For vertex 1004 the connecting edge attribute is 7, so I want to identify each component which is connected to 1004 by edge attribute 7, in this case it would be 1001-1002-1003-1004, then pattern match the second vertex attribute from 1004 (in this case 2) to any matching first vertex attributes along the chain (in this case it would match with 1003 and 1001, giving me a total count of 2). The two bits of code I have at the moment are firstly to subgraph by vertex ID (although I have to specify the edge property) val edgeProperty = 7 val fGraph = graph.subgraph(epred = e = e.attr == edgeProperty) and currently I have code which counts the nearest neighbour, but does not count down the chain of connected nodes. val counts = { fGraph.collectNeighbors(EdgeDirection.In).join(graph.vertices).map { case (id, t) = { val neighbors: Array[(VertexId, Array[String])] = t._1 val nodeAttr = t._2 neighbors.map(_._2).filter(x = x(0) == nodeAttr(2)).size } } } The problem I’m having is to make this process automatic and to pattern match down all connected components, so for each vertex: 1. Subgraph by all edge properties which connect to it 2. Count all matching vertex properties along each of these subgraphs 3. Produce a count at the end for each vertex I’m still pretty new to spark and scala so I may be looking to do this in a very inefficient way, any suggestions of how best to achieve this task would be most welcome, for example would this be possible using Pregel? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/traverse-a-graph-based-on-edge-properties-whilst-counting-matching-vertex-attributes-tp23224.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cassandra Submit
Sorry my answer I hit terminal lsof -i:9160: result is lsof -i:9160 COMMAND PIDUSER FD TYPE DEVICE SIZE/OFF NODE NAME java7597 inosens 101u IPv4 85754 0t0 TCP localhost:9160 (LISTEN) so 9160 port is available or not ? 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com: Yes my cassandra is listening on 9160 I think. Actually I know from yaml file. The file includes : rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 I check the port nc -z localhost 9160; echo $? it returns me 0. I think it close, should I open this port ? 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: Is your cassandra installation actually listening on 9160? lsof -i :9160COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java29232 ykadiysk 69u IPv4 42152497 0t0 TCP localhost:9160 (LISTEN) I am running an out-of-the box cassandra conf where rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote: I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says *Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160* In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: yes, whatever you put for listen_address in cassandra.yaml. Also, you should try to connect to your cassandra cluster via bin/cqlsh to make sure you have connectivity before you try to make a a connection via spark. On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I run my project on local. How can find ip address of my cassandra host ? From cassandra.yaml or ?? yasemin 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com: ? = ip address of your cassandra host On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote: Hi , How can I find spark.cassandra.connection.host? And what should I change ? Should I change cassandra.yaml ? Error says me *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* What should I add *SparkConf sparkConf = new SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts, true).set(spark.cassandra.connection.host, ?);* Best yasemin 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com: Check your spark.cassandra.connection.host setting. It should be pointing to one of your Cassandra nodes. Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Friday, June 5, 2015 7:31 AM *To:* user@spark.apache.org *Subject:* Cassandra Submit Hi, I am using cassandraDB in my project. I had that error *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* I think I have to modify the submit line. What should I add or remove when I submit my project? Best, yasemin -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç
Issue running Spark 1.4 on Yarn
Hi all, I'm manually building Spark from source against 1.4 branch and submitting the job against Yarn. I am seeing very strange behavior. The first 2 or 3 times I submit the job, it runs fine, computes Pi, and exits. The next time I run it, it gets stuck in the ACCEPTED state. I'm kicking off a job using yarn-client mode like this: ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3--driver-memory 4g --executor-memory 2g--executor-cores 1--queue thequeue examples/target/scala-2.10/spark-examples*.jar10 Here's what ResourceManager shows:[image: Yarn ResourceManager UI] In Yarn ResourceManager logs, all I'm seeing is this: 2015-06-08 14:49:57,166 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler: Added Application Attempt appattempt_1433789077942_0004_01 to scheduler from user: root 2015-06-08 14:49:57,166 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1433789077942_0004_01 State change from SUBMITTED to SCHEDULED There's nothing in the NodeManager logs (though its up and running), the job isn't getting that far. It seems to me that there's an issue somewhere between Spark 1.4 and Yarn integration. Hadoop runs without any issues. I've ran the below multiple times. yarn jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.4.2.jar pi 16 100 For reference, I'm compiling the source against 1.4 branch, and running it on a single-node cluster with CDH5.4 and Hadoop 2.6, distributed mode. I am using the following to compile: mvn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Pyarn -Phive -Phive-thriftserver -DskipTests clean package Any help appreciated. Thanks, -Matt
Join between DStream and Periodically-Changing-RDD
Hi, I'm trying to join DStream with interval let say 20s, join with RDD loaded from HDFS folder which is changing periodically, let say new file is coming to the folder for every 10 minutes. How should it be done, considering the HDFS files in the folder is periodically changing/adding new files? Do RDD automatically detect changes in HDFS folder as RDD source and automatically reload RDD? Thanks! Rendy
Costs of transformations
Is it possible bound costs of operations such as flatMap(), collect() based on the size of RDDs?
Re: Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException
This may or may not be helpful for your classpath issues, but I wanted to verify that basic functionality worked, so I made a sample app here: https://github.com/jmahonin/spark-streaming-phoenix This consumes events off a Kafka topic using spark streaming, and writes out event counts to Phoenix using the new phoenix-spark functionality: http://phoenix.apache.org/phoenix_spark.html It's definitely overkill, and would probably be more efficient to use the JDBC driver directly, but it serves as a proof-of-concept. I've only tested this in local mode. To convert it to a full jobs JAR, I suspect that keeping all of the spark and phoenix dependencies marked as 'provided', and including the Phoenix client JAR in the Spark classpath would work as well. Good luck, Josh On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek j.v...@anchormen.nl wrote: Hi, I posted a question with regards to Phoenix and Spark Streaming on StackOverflow [1]. Please find a copy of the question to this email below the first stack trace. I also already contacted the Phoenix mailing list and tried the suggestion of setting spark.driver.userClassPathFirst. Unfortunately that only pushed me further into the dependency hell, which I tried to resolve until I hit a wall with an UnsatisfiedLinkError on Snappy. What I am trying to achieve: To save a stream from Kafka into Phoenix/Hbase via Spark Streaming. I'm using MapR as a platform and the original exception happens both on a 3-node cluster, as on the MapR Sandbox (a VM for experimentation), in YARN and stand-alone mode. Further experimentation (like the saveAsNewHadoopApiFile below), was done only on the sandbox in standalone mode. Phoenix only supports Spark from 4.4.0 onwards, but I thought I could use a naive implementation that creates a new connection for every RDD from the DStream in 4.3.1. This resulted in the ClassNotFoundException described in [1], so I switched to 4.4.0. Unfortunately the saveToPhoenix method is only available in Scala. So I did find the suggestion to try it via the saveAsNewHadoopApiFile method [2] and an example implementation [3], which I adapted to my own needs. However, 4.4.0 + saveAsNewHadoopApiFile raises the same ClassNotFoundExeption, just a slightly different stacktrace: java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection. at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:995) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection. at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386) at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145) at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288) at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171) at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1881) at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860) at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77) at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860) at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162) at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131) at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133) at java.sql.DriverManager.getConnection(DriverManager.java:571) at java.sql.DriverManager.getConnection(DriverManager.java:187) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68) at org.apache.phoenix.mapreduce.PhoenixRecordWriter.init(PhoenixRecordWriter.java:49)
Re: Spark 1.3.1 SparkSQL metastore exceptions
Seems that you're using a DB2 Hive metastore? I'm not sure whether Hive 0.12.0 officially supports DB2, but probably not? (Since I didn't find DB2 scripts under the metastore/scripts/upgrade folder in Hive source tree.) Cheng On 6/9/15 8:28 PM, Needham, Guy wrote: Hi, I’m using Spark 1.3.1 to insert into a Hive 0.12 table from a SparkSQL query. The query is a very simple select from a dummy Hive table used for benchmarking. I’m using a create table as statement to do the insert. No matter if I do that or an insert overwrite, I get the same Hive exception, unable to alter table, with some Hive metastore issues. The data is inserted into the Hive table as expected, however I get a very long stacktrace. Does anyone know the meaning of the stacktrace and how I can avoid generating it every time I insert into a table? scala hiveContext.sql(create table benchmarking.spark_logins_benchmark as select * from benchmarking.logins_benchmark limit 10) org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:387) at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1448) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:235) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:123) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:255) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.hive.execution.CreateTableAsSelect.run(CreateTableAsSelect.scala:70) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:147) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:130) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at
Re: Issue running Spark 1.4 on Yarn
If your application is stuck in that state, it generally means your cluster doesn't have enough resources to start it. In the RM logs you can see how many vcores / memory the application is asking for, and then you can check your RM configuration to see if that's currently available on any single NM. On Tue, Jun 9, 2015 at 7:56 AM, Matt Kapilevich matve...@gmail.com wrote: Hi all, I'm manually building Spark from source against 1.4 branch and submitting the job against Yarn. I am seeing very strange behavior. The first 2 or 3 times I submit the job, it runs fine, computes Pi, and exits. The next time I run it, it gets stuck in the ACCEPTED state. I'm kicking off a job using yarn-client mode like this: ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3--driver-memory 4g --executor-memory 2g--executor-cores 1--queue thequeue examples/target/scala-2.10/spark-examples*.jar10 Here's what ResourceManager shows:[image: Yarn ResourceManager UI] In Yarn ResourceManager logs, all I'm seeing is this: 2015-06-08 14:49:57,166 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler: Added Application Attempt appattempt_1433789077942_0004_01 to scheduler from user: root 2015-06-08 14:49:57,166 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1433789077942_0004_01 State change from SUBMITTED to SCHEDULED There's nothing in the NodeManager logs (though its up and running), the job isn't getting that far. It seems to me that there's an issue somewhere between Spark 1.4 and Yarn integration. Hadoop runs without any issues. I've ran the below multiple times. yarn jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.4.2.jar pi 16 100 For reference, I'm compiling the source against 1.4 branch, and running it on a single-node cluster with CDH5.4 and Hadoop 2.6, distributed mode. I am using the following to compile: mvn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Pyarn -Phive -Phive-thriftserver -DskipTests clean package Any help appreciated. Thanks, -Matt -- Marcelo
Re: Different Sorting RDD methods in Apache Spark
Thank you for you responses! You mention that it only works as long as the data fits on a single machine. What I am tying to do is receive the sorted contents of my dataset. For this to be possible, the entire dataset should be able to fit on a single machine. Are you saying that sorting the entire data and collecting it on the driver node is not a typical use case? If I want to do this using sortBy(), I would first call sortBy() followed by a collect(). Collect() would involve gathering all the data on a single machine as well. Thanks, Raghav On Tuesday, June 9, 2015, Mark Hamstra m...@clearstorydata.com wrote: Correct. Trading away scalability for increased performance is not an option for the standard Spark API. On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos daniel.dara...@lynxanalytics.com javascript:_e(%7B%7D,'cvml','daniel.dara...@lynxanalytics.com'); wrote: It would be even faster to load the data on the driver and sort it there without using Spark :). Using reduce() is cheating, because it only works as long as the data fits on one machine. That is not the targeted use case of a distributed computation system. You can repeat your test with more data (that doesn't fit on one machine) to see what I mean. On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote: For a research project, I tried sorting the elements in an RDD. I did this in two different approaches. In the first method, I applied a mapPartitions() function on the RDD, so that it would sort the contents of the RDD, and provide a result RDD that contains the sorted list as the only record in the RDD. Then, I applied a reduce function which basically merges sorted lists. I ran these experiments on an EC2 cluster containing 30 nodes. I set it up using the spark ec2 script. The data file was stored in HDFS. In the second approach I used the sortBy method in Spark. I performed these operation on the US census data(100MB) found here A single lines looks like this 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child 18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 5. I sorted based on the 25th value in the CSV. In this line that is 1758.14. I noticed that sortBy performs worse than the other method. Is this the expected scenario? If it is, why wouldn't the mapPartitions() and reduce() be the default sorting approach? Here is my implementation public static void sortBy(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); long start = System.currentTimeMillis(); rdd.sortBy(new FunctionString, Double(){ @Override public Double call(String v1) throws Exception { // TODO Auto-generated method stub String [] arr = v1.split(,); return Double.parseDouble(arr[24]); } }, true, 9).collect(); long end = System.currentTimeMillis(); System.out.println(SortBy: + (end - start)); } public static void sortList(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l, 8); long start = System.currentTimeMillis(); JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 = rdd.mapPartitions(new FlatMapFunctionIteratorlt;String, LinkedListTuple2lt;Double, String(){ @Override public IterableLinkedListlt;Tuple2lt;Double, String call(IteratorString t) throws Exception { // TODO Auto-generated method stub LinkedListTuple2lt;Double, String lines = new LinkedListTuple2lt;Double, String(); while(t.hasNext()){ String s = t.next(); String arr1[] = s.split(,); Tuple2Double, String t1 = new Tuple2Double, String(Double.parseDouble(arr1[24]),s); lines.add(t1); } Collections.sort(lines, new IncomeComparator()); LinkedListLinkedListlt;Tuple2lt;Double, String list = new LinkedListLinkedListlt;Tuple2lt;Double, String(); list.add(lines); return list; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
Implementing top() using treeReduce()
I am trying to implement top-k in scala within apache spark. I am aware that spark has a top action. But, top() uses reduce(). Instead, I would like to use treeReduce(). I am trying to compare the performance of reduce() and treeReduce(). The main issue I have is that I cannot use these 2 lines of code which are used in the top() action within my Spark application. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) How can I go about implementing top() using treeReduce()? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cassandra Submit
hm. Yeah, your port is good...have you seen this thread: http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector ? It seems that you might be running into version mis-match issues? What versions of Spark/Cassandra-connector are you trying to use? On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.com wrote: Sorry my answer I hit terminal lsof -i:9160: result is lsof -i:9160 COMMAND PIDUSER FD TYPE DEVICE SIZE/OFF NODE NAME java7597 inosens 101u IPv4 85754 0t0 TCP localhost:9160 (LISTEN) so 9160 port is available or not ? 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com: Yes my cassandra is listening on 9160 I think. Actually I know from yaml file. The file includes : rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 I check the port nc -z localhost 9160; echo $? it returns me 0. I think it close, should I open this port ? 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: Is your cassandra installation actually listening on 9160? lsof -i :9160COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java29232 ykadiysk 69u IPv4 42152497 0t0 TCP localhost:9160 (LISTEN) I am running an out-of-the box cassandra conf where rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote: I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says *Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160* In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: yes, whatever you put for listen_address in cassandra.yaml. Also, you should try to connect to your cassandra cluster via bin/cqlsh to make sure you have connectivity before you try to make a a connection via spark. On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I run my project on local. How can find ip address of my cassandra host ? From cassandra.yaml or ?? yasemin 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com: ? = ip address of your cassandra host On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote: Hi , How can I find spark.cassandra.connection.host? And what should I change ? Should I change cassandra.yaml ? Error says me *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* What should I add *SparkConf sparkConf = new SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts, true).set(spark.cassandra.connection.host, ?);* Best yasemin 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com : Check your spark.cassandra.connection.host setting. It should be pointing to one of your Cassandra nodes. Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Friday, June 5, 2015 7:31 AM *To:* user@spark.apache.org *Subject:* Cassandra Submit Hi, I am using cassandraDB in my project. I had that error *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* I think I have to modify the submit line. What should I add or remove when I submit my project? Best, yasemin -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç
Re: RDD of RDDs
That would constitute a major change in Spark's architecture. It's not happening anytime soon. On Tue, Jun 9, 2015 at 1:34 AM, kiran lonikar loni...@gmail.com wrote: Possibly in future, if and when spark architecture allows workers to launch spark jobs (the functions passed to transformation or action APIs of RDD), it will be possible to have RDD of RDD. On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar loni...@gmail.com wrote: Simillar question was asked before: http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html Here is one of the reasons why I think RDD[RDD[T]] is not possible: - RDD is only a handle to the actual data partitions. It has a reference/pointer to the *SparkContext* object (*sc*) and a list of partitions. - The *SparkContext *is an object in the Spark Application/Driver Program's JVM. Similarly, the list of partitions is also in the JVM of the driver program. Each partition contains kind of remote references to the partition data on the worker JVMs. - The functions passed to RDD's transformations and actions execute in the worker's JVMs on different nodes. For example, in *rdd.map { x = x*x }*, the function performing *x*x* runs on the JVMs of the worker nodes where the partitions of the RDD reside. These JVMs do not have access to the *sc* since its only on the driver's JVM. - Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd = innerRDD.filter { x = x*x } }*, the worker nodes will not be able to execute the *filter* on *innerRDD *as the code in the worker does not have access to sc and can not launch a spark job. Hope it helps. You need to consider List[RDD] or some other collection. -Kiran On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote: Hi, The problem I am looking at is as follows: - I read in a log file of multiple users as a RDD - I'd like to group the above RDD into *multiple RDDs* by userIds (the key) - my processEachUser() function then takes in each RDD mapped into each individual user, and calls for RDD.map or DataFrame operations on them. (I already had the function coded, I am therefore reluctant to work with the ResultIterable object coming out of rdd.groupByKey() ... ) I've searched the mailing list and googled on RDD of RDDs and seems like it isn't a thing at all. A few choices left seem to be: 1) groupByKey() and then work with the ResultIterable object; 2) groupbyKey() and then write each group into a file, and read them back as individual rdds to process.. Anyone got a better idea or had a similar problem before? Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
Re: Different Sorting RDD methods in Apache Spark
Are you saying that sorting the entire data and collecting it on the driver node is not a typical use case? It most definitely is not. Spark is designed and intended to be used with very large datasets. Far from being typical, collecting hundreds of gigabytes, terabytes or petabytes to the driver node is not feasible. On Tue, Jun 9, 2015 at 10:01 AM, Raghav Shankar raghav0110...@gmail.com wrote: Thank you for you responses! You mention that it only works as long as the data fits on a single machine. What I am tying to do is receive the sorted contents of my dataset. For this to be possible, the entire dataset should be able to fit on a single machine. Are you saying that sorting the entire data and collecting it on the driver node is not a typical use case? If I want to do this using sortBy(), I would first call sortBy() followed by a collect(). Collect() would involve gathering all the data on a single machine as well. Thanks, Raghav On Tuesday, June 9, 2015, Mark Hamstra m...@clearstorydata.com wrote: Correct. Trading away scalability for increased performance is not an option for the standard Spark API. On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: It would be even faster to load the data on the driver and sort it there without using Spark :). Using reduce() is cheating, because it only works as long as the data fits on one machine. That is not the targeted use case of a distributed computation system. You can repeat your test with more data (that doesn't fit on one machine) to see what I mean. On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com wrote: For a research project, I tried sorting the elements in an RDD. I did this in two different approaches. In the first method, I applied a mapPartitions() function on the RDD, so that it would sort the contents of the RDD, and provide a result RDD that contains the sorted list as the only record in the RDD. Then, I applied a reduce function which basically merges sorted lists. I ran these experiments on an EC2 cluster containing 30 nodes. I set it up using the spark ec2 script. The data file was stored in HDFS. In the second approach I used the sortBy method in Spark. I performed these operation on the US census data(100MB) found here A single lines looks like this 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child 18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 5. I sorted based on the 25th value in the CSV. In this line that is 1758.14. I noticed that sortBy performs worse than the other method. Is this the expected scenario? If it is, why wouldn't the mapPartitions() and reduce() be the default sorting approach? Here is my implementation public static void sortBy(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); long start = System.currentTimeMillis(); rdd.sortBy(new FunctionString, Double(){ @Override public Double call(String v1) throws Exception { // TODO Auto-generated method stub String [] arr = v1.split(,); return Double.parseDouble(arr[24]); } }, true, 9).collect(); long end = System.currentTimeMillis(); System.out.println(SortBy: + (end - start)); } public static void sortList(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l, 8); long start = System.currentTimeMillis(); JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 = rdd.mapPartitions(new FlatMapFunctionIteratorlt;String, LinkedListTuple2lt;Double, String(){ @Override public IterableLinkedListlt;Tuple2lt;Double, String call(IteratorString t) throws Exception { // TODO Auto-generated method stub LinkedListTuple2lt;Double, String lines = new LinkedListTuple2lt;Double, String(); while(t.hasNext()){ String s = t.next(); String arr1[] = s.split(,); Tuple2Double, String t1 = new Tuple2Double, String(Double.parseDouble(arr1[24]),s); lines.add(t1); } Collections.sort(lines, new IncomeComparator()); LinkedListLinkedListlt;Tuple2lt;Double, String list = new LinkedListLinkedListlt;Tuple2lt;Double, String(); list.add(lines); return list; } -- View this message in context:
Re: Different Sorting RDD methods in Apache Spark
Correct. Trading away scalability for increased performance is not an option for the standard Spark API. On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: It would be even faster to load the data on the driver and sort it there without using Spark :). Using reduce() is cheating, because it only works as long as the data fits on one machine. That is not the targeted use case of a distributed computation system. You can repeat your test with more data (that doesn't fit on one machine) to see what I mean. On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com wrote: For a research project, I tried sorting the elements in an RDD. I did this in two different approaches. In the first method, I applied a mapPartitions() function on the RDD, so that it would sort the contents of the RDD, and provide a result RDD that contains the sorted list as the only record in the RDD. Then, I applied a reduce function which basically merges sorted lists. I ran these experiments on an EC2 cluster containing 30 nodes. I set it up using the spark ec2 script. The data file was stored in HDFS. In the second approach I used the sortBy method in Spark. I performed these operation on the US census data(100MB) found here A single lines looks like this 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child 18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 5. I sorted based on the 25th value in the CSV. In this line that is 1758.14. I noticed that sortBy performs worse than the other method. Is this the expected scenario? If it is, why wouldn't the mapPartitions() and reduce() be the default sorting approach? Here is my implementation public static void sortBy(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); long start = System.currentTimeMillis(); rdd.sortBy(new FunctionString, Double(){ @Override public Double call(String v1) throws Exception { // TODO Auto-generated method stub String [] arr = v1.split(,); return Double.parseDouble(arr[24]); } }, true, 9).collect(); long end = System.currentTimeMillis(); System.out.println(SortBy: + (end - start)); } public static void sortList(JavaSparkContext sc){ JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l, 8); long start = System.currentTimeMillis(); JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 = rdd.mapPartitions(new FlatMapFunctionIteratorlt;String, LinkedListTuple2lt;Double, String(){ @Override public IterableLinkedListlt;Tuple2lt;Double, String call(IteratorString t) throws Exception { // TODO Auto-generated method stub LinkedListTuple2lt;Double, String lines = new LinkedListTuple2lt;Double, String(); while(t.hasNext()){ String s = t.next(); String arr1[] = s.split(,); Tuple2Double, String t1 = new Tuple2Double, String(Double.parseDouble(arr1[24]),s); lines.add(t1); } Collections.sort(lines, new IncomeComparator()); LinkedListLinkedListlt;Tuple2lt;Double, String list = new LinkedListLinkedListlt;Tuple2lt;Double, String(); list.add(lines); return list; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem getting program to run on 15TB input
I found that the problem was due to garbage collection in filter(). Using Hive to do the filter solved the problem. A lot of other problems went away when I upgraded to Spark 1.2.0, which compresses various task overhead data (HighlyCompressedMapStatus etc.). It has been running very very smoothly with these two changes. I'm fairly sure that I tried coalesce(), it resulted into tasks that were too big, the code has evolved too much to easily double check it now. On Sat, Jun 6, 2015 at 12:50 AM, Kapil Malik kma...@adobe.com wrote: Very interesting and relevant thread for production level usage of spark. @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase? Thanks, Kapil Malik | kma...@adobe.com | 33430 / 8800836581 *From:* Daniel Mahler [mailto:dmah...@gmail.com] *Sent:* 13 April 2015 15:42 *To:* Arun Luthra *Cc:* Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org *Subject:* Re: Problem getting program to run on 15TB input Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help. On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote: Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote: I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, real=0.02 secs] 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, real=0.04 secs] 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, real=0.08 secs] 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, real=0.02 secs] 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, real=0.02 secs] 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, real=0.02 secs] So ~9GB is getting GC'ed every few seconds. Which seems like a lot. Question: The filter() is removing 99% of the data. Does this 99% of the data get GC'ed? Now, I was able to finally get to reduceByKey() by reducing the number of executor-cores (to 2), based on suggestions at http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html . This makes everything before reduceByKey() run pretty smoothly. I ran this with more executor-memory and less executors (most important thing was fewer executor-cores): --num-executors 150 \ --driver-memory 15g \ --executor-memory 110g \ --executor-cores 32 \ But then, reduceByKey() fails with: java.lang.OutOfMemoryError: Java heap space On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com wrote: The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post:
Re: Issue running Spark 1.4 on Yarn
Hi Marcelo, Thanks. I think something more subtle is happening. I'm running a single-node cluster, so there's only 1 NM. When I executed the exact same job the 4th time, the cluster was idle, and there was nothing else being executed. RM currently reports that I have 6.5GB of memory and 4 cpus available. However, the job is still stuck in the ACCEPTED state a day later. Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. Thanks, -Matt On Tue, Jun 9, 2015 at 12:32 PM, Marcelo Vanzin van...@cloudera.com wrote: If your application is stuck in that state, it generally means your cluster doesn't have enough resources to start it. In the RM logs you can see how many vcores / memory the application is asking for, and then you can check your RM configuration to see if that's currently available on any single NM. On Tue, Jun 9, 2015 at 7:56 AM, Matt Kapilevich matve...@gmail.com wrote: Hi all, I'm manually building Spark from source against 1.4 branch and submitting the job against Yarn. I am seeing very strange behavior. The first 2 or 3 times I submit the job, it runs fine, computes Pi, and exits. The next time I run it, it gets stuck in the ACCEPTED state. I'm kicking off a job using yarn-client mode like this: ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3--driver-memory 4g --executor-memory 2g--executor-cores 1--queue thequeue examples/target/scala-2.10/spark-examples*.jar10 Here's what ResourceManager shows:[image: Yarn ResourceManager UI] In Yarn ResourceManager logs, all I'm seeing is this: 2015-06-08 14:49:57,166 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler: Added Application Attempt appattempt_1433789077942_0004_01 to scheduler from user: root 2015-06-08 14:49:57,166 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1433789077942_0004_01 State change from SUBMITTED to SCHEDULED There's nothing in the NodeManager logs (though its up and running), the job isn't getting that far. It seems to me that there's an issue somewhere between Spark 1.4 and Yarn integration. Hadoop runs without any issues. I've ran the below multiple times. yarn jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.4.2.jar pi 16 100 For reference, I'm compiling the source against 1.4 branch, and running it on a single-node cluster with CDH5.4 and Hadoop 2.6, distributed mode. I am using the following to compile: mvn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Pyarn -Phive -Phive-thriftserver -DskipTests clean package Any help appreciated. Thanks, -Matt -- Marcelo
Re: Cassandra Submit
My jar files are: cassandra-driver-core-2.1.5.jar cassandra-thrift-2.1.3.jar guava-18.jar jsr166e-1.1.0.jar spark-assembly-1.3.0.jar spark-cassandra-connector_2.10-1.3.0-M1.jar spark-cassandra-connector-java_2.10-1.3.0-M1.jar spark-core_2.10-1.3.1.jar spark-streaming_2.10-1.3.1.jar And my code from datastax spark-cassandra-connector https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java . Thanx alot. yasemin 2015-06-09 18:58 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: hm. Yeah, your port is good...have you seen this thread: http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector ? It seems that you might be running into version mis-match issues? What versions of Spark/Cassandra-connector are you trying to use? On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.com wrote: Sorry my answer I hit terminal lsof -i:9160: result is lsof -i:9160 COMMAND PIDUSER FD TYPE DEVICE SIZE/OFF NODE NAME java7597 inosens 101u IPv4 85754 0t0 TCP localhost:9160 (LISTEN) so 9160 port is available or not ? 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com: Yes my cassandra is listening on 9160 I think. Actually I know from yaml file. The file includes : rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 I check the port nc -z localhost 9160; echo $? it returns me 0. I think it close, should I open this port ? 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: Is your cassandra installation actually listening on 9160? lsof -i :9160COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java29232 ykadiysk 69u IPv4 42152497 0t0 TCP localhost:9160 (LISTEN) I am running an out-of-the box cassandra conf where rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote: I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says *Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160* In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: yes, whatever you put for listen_address in cassandra.yaml. Also, you should try to connect to your cassandra cluster via bin/cqlsh to make sure you have connectivity before you try to make a a connection via spark. On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I run my project on local. How can find ip address of my cassandra host ? From cassandra.yaml or ?? yasemin 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com: ? = ip address of your cassandra host On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote: Hi , How can I find spark.cassandra.connection.host? And what should I change ? Should I change cassandra.yaml ? Error says me *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* What should I add *SparkConf sparkConf = new SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts, true).set(spark.cassandra.connection.host, ?);* Best yasemin 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com : Check your spark.cassandra.connection.host setting. It should be pointing to one of your Cassandra nodes. Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Friday, June 5, 2015 7:31 AM *To:* user@spark.apache.org *Subject:* Cassandra Submit Hi, I am using cassandraDB in my project. I had that error *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* I think I have to modify the submit line. What should I add or remove when I submit my project? Best, yasemin -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç
RE: Cassandra Submit
It is strange that writes works but read does not. If it was a Cassandra connectivity issue, then neither write or read would work. Perhaps the problem is somewhere else. Can you send the complete exception trace? Also, just to make sure that there is no DNS issue, try this: ~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h 127.0.0.1 -p 9160 Mohammed From: Yasemin Kaya [mailto:godo...@gmail.com] Sent: Tuesday, June 9, 2015 11:32 AM To: Yana Kadiyska Cc: Gerard Maas; Mohammed Guller; user@spark.apache.org Subject: Re: Cassandra Submit I removed core and streaming jar. And the exception still same. I tried what you said then results: ~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h localhost -p 9160 Connected to: Test Cluster on localhost/9160 Welcome to Cassandra CLI version 2.1.5 The CLI is deprecated and will be removed in Cassandra 3.0. Consider migrating to cqlsh. CQL is fully backwards compatible with Thrift data; see http://www.datastax.com/dev/blog/thrift-to-cql3 Type 'help;' or '?' for help. Type 'quit;' or 'exit;' to quit. [default@unknown] and ~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh Connected to Test Cluster at 127.0.0.1:9042http://127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3] Use HELP for help. cqlsh Thank you for your kind responses ... 2015-06-09 20:59 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com: Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly 1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix versions. Spark-assembly bundless all spark packages, so either do them separately or use spark-assembly but don't mix like you've shown. As to the port issue -- what about this: $bin/cassandra-cli -h localhost -p 9160 Connected to: Test Cluster on localhost/9160 Welcome to Cassandra CLI version 2.1.5 On Tue, Jun 9, 2015 at 1:29 PM, Yasemin Kaya godo...@gmail.commailto:godo...@gmail.com wrote: My jar files are: cassandra-driver-core-2.1.5.jar cassandra-thrift-2.1.3.jar guava-18.jar jsr166e-1.1.0.jar spark-assembly-1.3.0.jar spark-cassandra-connector_2.10-1.3.0-M1.jar spark-cassandra-connector-java_2.10-1.3.0-M1.jar spark-core_2.10-1.3.1.jar spark-streaming_2.10-1.3.1.jar And my code from datastax spark-cassandra-connectorhttps://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java. Thanx alot. yasemin 2015-06-09 18:58 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com: hm. Yeah, your port is good...have you seen this thread: http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector ? It seems that you might be running into version mis-match issues? What versions of Spark/Cassandra-connector are you trying to use? On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.commailto:godo...@gmail.com wrote: Sorry my answer I hit terminal lsof -i:9160: result is lsof -i:9160 COMMAND PIDUSER FD TYPE DEVICE SIZE/OFF NODE NAME java7597 inosens 101u IPv4 85754 0t0 TCP localhost:9160 (LISTEN) so 9160 port is available or not ? 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.commailto:godo...@gmail.com: Yes my cassandra is listening on 9160 I think. Actually I know from yaml file. The file includes : rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 I check the port nc -z localhost 9160; echo $? it returns me 0. I think it close, should I open this port ? 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com: Is your cassandra installation actually listening on 9160? lsof -i :9160COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java29232 ykadiysk 69u IPv4 42152497 0t0 TCP localhost:9160 (LISTEN) I am running an out-of-the box cassandra conf where rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.commailto:godo...@gmail.com wrote: I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.commailto:godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160http://127.0.0.1:9160 In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska
Re: Running SparkSql against Hive tables
Thanks Ayan, I used beeline in Spark to connect to Hiveserver2 that I started from my Hive. So as you said, It is really interacting with Hive as a typical 3rd party application, and it is NOT using Spark execution engine. I was thinking that it gets metastore info from Hive, but uses Spark to execute the query. I already have created loaded tables in Hive, and now I want to use Spark to run SQL queries against those tables. I just want to submit SQL queries in Spark, and against the data in Hive, wout writing an application (Just similar to the way that one would pass SQL scripts to Hive or Shark). Going through the Spark documentation, I realized Spark SQL is the component that I need to use. But do you mean I have to write a client Spark application to do that ? Is there any way that one could pass SQL scripts directly through command-line Spark runs it in distributed mode on the cluster, against the already existing data in Hive ? On Mon, Jun 8, 2015 at 5:53 PM, ayan guha guha.a...@gmail.com wrote: I am afraid you are going other way around :) If you want to use Hive in spark, you'd need a HiveContext with hive config files in spark cluster (eveery node). This was spark can talk to hive metastore. Then you can write queries on hive table using hiveContext's sql method and spark will run it (either by reading from hive and creating RDD or lettinghive run the query using MR). Final result will be a spark dataFrame. What you currently doing is using beeline to connect to hive, which should work even without spark. Best Ayan On Tue, Jun 9, 2015 at 10:42 AM, James Pirz james.p...@gmail.com wrote: Thanks for the help! I am actually trying Spark SQL to run queries against tables that I've defined in Hive. I follow theses steps: - I start hiveserver2 and in Spark, I start Spark's Thrift server by: $SPARK_HOME/sbin/start-thriftserver.sh --master spark://spark-master-node-ip:7077 - and I start beeline: $SPARK_HOME/bin/beeline - In my beeline session, I connect to my running hiveserver2 !connect jdbc:hive2://hive-node-ip:1 and I can run queries successfully. But based on hiveserver2 logs, It seems it actually uses Hadoop's MR to run queries, *not* Spark's workers. My goals is to access Hive's tables' data, but run queries through Spark SQL using Spark workers (not Hadoop). Is it possible to do that via Spark SQL (its CLI) or through its thrift server ? (I tried to find some basic examples in the documentation, but I was not able to) - Any suggestion or hint on how I can do that would be highly appreciated. Thnx On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian lian.cs@gmail.com wrote: On 6/6/15 9:06 AM, James Pirz wrote: I am pretty new to Spark, and using Spark 1.3.1, I am trying to use 'Spark SQL' to run some SQL scripts, on the cluster. I realized that for a better performance, it is a good idea to use Parquet files. I have 2 questions regarding that: 1) If I wanna use Spark SQL against *partitioned bucketed* tables with Parquet format in Hive, does the provided spark binary on the apache website support that or do I need to build a new spark binary with some additional flags ? (I found a note https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables in the documentation about enabling Hive support, but I could not fully get it as what the correct way of building is, if I need to build) Yes, Hive support is enabled by default now for the binaries on the website. However, currently Spark SQL doesn't support buckets yet. 2) Does running Spark SQL against tables in Hive downgrade the performance, and it is better that I load parquet files directly to HDFS or having Hive in the picture is harmless ? If you're using Parquet, then it should be fine since by default Spark SQL uses its own native Parquet support to read Parquet Hive tables. Thnx -- Best Regards, Ayan Guha
Re: Cassandra Submit
Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly 1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix versions. Spark-assembly bundless all spark packages, so either do them separately or use spark-assembly but don't mix like you've shown. As to the port issue -- what about this: $bin/cassandra-cli -h localhost -p 9160 Connected to: Test Cluster on localhost/9160 Welcome to Cassandra CLI version 2.1.5 On Tue, Jun 9, 2015 at 1:29 PM, Yasemin Kaya godo...@gmail.com wrote: My jar files are: cassandra-driver-core-2.1.5.jar cassandra-thrift-2.1.3.jar guava-18.jar jsr166e-1.1.0.jar spark-assembly-1.3.0.jar spark-cassandra-connector_2.10-1.3.0-M1.jar spark-cassandra-connector-java_2.10-1.3.0-M1.jar spark-core_2.10-1.3.1.jar spark-streaming_2.10-1.3.1.jar And my code from datastax spark-cassandra-connector https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java . Thanx alot. yasemin 2015-06-09 18:58 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: hm. Yeah, your port is good...have you seen this thread: http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector ? It seems that you might be running into version mis-match issues? What versions of Spark/Cassandra-connector are you trying to use? On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.com wrote: Sorry my answer I hit terminal lsof -i:9160: result is lsof -i:9160 COMMAND PIDUSER FD TYPE DEVICE SIZE/OFF NODE NAME java7597 inosens 101u IPv4 85754 0t0 TCP localhost:9160 (LISTEN) so 9160 port is available or not ? 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com: Yes my cassandra is listening on 9160 I think. Actually I know from yaml file. The file includes : rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 I check the port nc -z localhost 9160; echo $? it returns me 0. I think it close, should I open this port ? 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: Is your cassandra installation actually listening on 9160? lsof -i :9160COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java29232 ykadiysk 69u IPv4 42152497 0t0 TCP localhost:9160 (LISTEN) I am running an out-of-the box cassandra conf where rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote: I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says *Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160* In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: yes, whatever you put for listen_address in cassandra.yaml. Also, you should try to connect to your cassandra cluster via bin/cqlsh to make sure you have connectivity before you try to make a a connection via spark. On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I run my project on local. How can find ip address of my cassandra host ? From cassandra.yaml or ?? yasemin 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com: ? = ip address of your cassandra host On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote: Hi , How can I find spark.cassandra.connection.host? And what should I change ? Should I change cassandra.yaml ? Error says me *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* What should I add *SparkConf sparkConf = new SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts, true).set(spark.cassandra.connection.host, ?);* Best yasemin 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com: Check your spark.cassandra.connection.host setting. It should be pointing to one of your Cassandra nodes. Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Friday, June 5, 2015 7:31 AM *To:* user@spark.apache.org *Subject:* Cassandra Submit Hi, I am using cassandraDB in my project. I had that error *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* I think I have to
Re: Running SparkSql against Hive tables
I am trying to use Spark 1.3 (Standalone) against Hive 1.2 running on Hadoop 2.6. I looked the ThriftServer2 logs, and I realized that the server was not starting properly, because of failure in creating a server socket. In fact, I had passed the URI to my Hiveserver2 service, launched from Hive, and the beeline in Spark was directly talking to Hive's hiveserver2 and it was just using it as a Hive service. I could fix starting the Thriftserver2 in Spark (by changing port), but I guess the missing puzzle piece for me is: How does Spark SQL re-uses the already created table in Hive ? I mean do I have to write an application that uses HiveContext to do that and submit it to Spark for execution, or is there a way to run SQL scripts directly via command line (in distributed mode and on the cluster) - (Just similar to the way that one would use Hive (or Shark) command line by passing a query file with -f flag). Looking at the Spark SQL documentation, it seems that it is possible. Please correct me if I am wrong. On Mon, Jun 8, 2015 at 6:56 PM, Cheng Lian lian.cs@gmail.com wrote: On 6/9/15 8:42 AM, James Pirz wrote: Thanks for the help! I am actually trying Spark SQL to run queries against tables that I've defined in Hive. I follow theses steps: - I start hiveserver2 and in Spark, I start Spark's Thrift server by: $SPARK_HOME/sbin/start-thriftserver.sh --master spark://spark-master-node-ip:7077 - and I start beeline: $SPARK_HOME/bin/beeline - In my beeline session, I connect to my running hiveserver2 !connect jdbc:hive2://hive-node-ip:1 and I can run queries successfully. But based on hiveserver2 logs, It seems it actually uses Hadoop's MR to run queries, *not* Spark's workers. My goals is to access Hive's tables' data, but run queries through Spark SQL using Spark workers (not Hadoop). Hm, interesting. HiveThriftServer2 should never issue MR jobs to perform queries. I did receive two reports in the past which also say MR jobs instead of Spark jobs were issued to perform the SQL query. However, I only reproduced this issue in a rare corner case, which uses HTTP mode to connect to Hive 0.12.0. Apparently this isn't your case. Would you mind to provide more details so that I can dig in? The following information would be very helpful: 1. Hive version 2. A copy of your hive-site.xml 3. Hadoop version 4. Full HiveThriftServer2 log (which can be found in $SPARK_HOME/logs) Thanks in advance! Is it possible to do that via Spark SQL (its CLI) or through its thrift server ? (I tried to find some basic examples in the documentation, but I was not able to) - Any suggestion or hint on how I can do that would be highly appreciated. Thnx On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian lian.cs@gmail.com wrote: On 6/6/15 9:06 AM, James Pirz wrote: I am pretty new to Spark, and using Spark 1.3.1, I am trying to use 'Spark SQL' to run some SQL scripts, on the cluster. I realized that for a better performance, it is a good idea to use Parquet files. I have 2 questions regarding that: 1) If I wanna use Spark SQL against *partitioned bucketed* tables with Parquet format in Hive, does the provided spark binary on the apache website support that or do I need to build a new spark binary with some additional flags ? (I found a note https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables in the documentation about enabling Hive support, but I could not fully get it as what the correct way of building is, if I need to build) Yes, Hive support is enabled by default now for the binaries on the website. However, currently Spark SQL doesn't support buckets yet. 2) Does running Spark SQL against tables in Hive downgrade the performance, and it is better that I load parquet files directly to HDFS or having Hive in the picture is harmless ? If you're using Parquet, then it should be fine since by default Spark SQL uses its own native Parquet support to read Parquet Hive tables. Thnx
Kafka Spark Streaming: ERROR EndpointWriter: dropping message
Hello, While trying to link kafka to spark, I'm not able to get data from kafka. This is the error that I'm getting from spark logs: ERROR EndpointWriter: dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://sparkMaster@localhost:7077/]] arriving at [akka.tcp://sparkMaster@localhost:7077] inbound addresses are [akka.tcp://sparkMaster@karma-HP-Pavilion-g6-Notebook-PC:7077] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark eventLog and history server
Hi, I don't have a complete answer to your questions but: Removing the suffix does not solve the problem - unfortunately this is true, the master web UI only tries to build out a Spark UI from the event logs once, at the time the context is closed. If the event logs are in-progress at this time, then you basically missed the opportunity. Does it mean I don't need to start history server if I only use spark in standalone mode? - Yes, you don't need to start the history server. On Mon, Jun 8, 2015 at 7:57 PM, Du Li l...@yahoo-inc.com.invalid wrote: Event log is enabled in my spark streaming app. My code runs in standalone mode and the spark version is 1.3.1. I periodically stop and restart the streaming context by calling ssc.stop(). However, from the web UI, when clicking on a past job, it says the job is still in progress and does not show the event log. The event log files have suffix .inprogress. Removing the suffix does not solve the problem. Do I need to do anything here in order to view the event logs of finished jobs? Or do I need to stop ssc differently? In addition, the documentation seems to suggest history server is used for Mesos or YARN mode. Does it mean I don't need to start history server if I only use spark in standalone mode? Thanks, Du
Re: Cassandra Submit
I removed core and streaming jar. And the exception still same. I tried what you said then results: ~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h localhost -p 9160 Connected to: Test Cluster on localhost/9160 Welcome to Cassandra CLI version 2.1.5 The CLI is deprecated and will be removed in Cassandra 3.0. Consider migrating to cqlsh. CQL is fully backwards compatible with Thrift data; see http://www.datastax.com/dev/blog/thrift-to-cql3 Type 'help;' or '?' for help. Type 'quit;' or 'exit;' to quit. [default@unknown] and ~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3] Use HELP for help. cqlsh Thank you for your kind responses ... 2015-06-09 20:59 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly 1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix versions. Spark-assembly bundless all spark packages, so either do them separately or use spark-assembly but don't mix like you've shown. As to the port issue -- what about this: $bin/cassandra-cli -h localhost -p 9160 Connected to: Test Cluster on localhost/9160 Welcome to Cassandra CLI version 2.1.5 On Tue, Jun 9, 2015 at 1:29 PM, Yasemin Kaya godo...@gmail.com wrote: My jar files are: cassandra-driver-core-2.1.5.jar cassandra-thrift-2.1.3.jar guava-18.jar jsr166e-1.1.0.jar spark-assembly-1.3.0.jar spark-cassandra-connector_2.10-1.3.0-M1.jar spark-cassandra-connector-java_2.10-1.3.0-M1.jar spark-core_2.10-1.3.1.jar spark-streaming_2.10-1.3.1.jar And my code from datastax spark-cassandra-connector https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java . Thanx alot. yasemin 2015-06-09 18:58 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: hm. Yeah, your port is good...have you seen this thread: http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector ? It seems that you might be running into version mis-match issues? What versions of Spark/Cassandra-connector are you trying to use? On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya godo...@gmail.com wrote: Sorry my answer I hit terminal lsof -i:9160: result is lsof -i:9160 COMMAND PIDUSER FD TYPE DEVICE SIZE/OFF NODE NAME java7597 inosens 101u IPv4 85754 0t0 TCP localhost:9160 (LISTEN) so 9160 port is available or not ? 2015-06-09 17:16 GMT+03:00 Yasemin Kaya godo...@gmail.com: Yes my cassandra is listening on 9160 I think. Actually I know from yaml file. The file includes : rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 I check the port nc -z localhost 9160; echo $? it returns me 0. I think it close, should I open this port ? 2015-06-09 16:55 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: Is your cassandra installation actually listening on 9160? lsof -i :9160COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java29232 ykadiysk 69u IPv4 42152497 0t0 TCP localhost:9160 (LISTEN) I am running an out-of-the box cassandra conf where rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote: I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says *Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160* In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: yes, whatever you put for listen_address in cassandra.yaml. Also, you should try to connect to your cassandra cluster via bin/cqlsh to make sure you have connectivity before you try to make a a connection via spark. On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I run my project on local. How can find ip address of my cassandra host ? From cassandra.yaml or ?? yasemin 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com: ? = ip address of your cassandra host On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote: Hi , How can I find spark.cassandra.connection.host? And what should I change ? Should I change cassandra.yaml ? Error says me *Exception
Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out
Hi Dibyendu, Thank you for your reply. I am using Kafka https://github.com/dibbhatt/kafka-spark-consumer which uses spark-core and spark-streaming *1.2.2* Spark cluster on which I am running application is* 1.3.1* . I will test it with latest changes . Yes Underlying BlockManager gives error and tries 3 times and then die. Problem is even if it dies, I cant restart it because application status says running Looks like its related to the issue https://issues.apache.org/jira/browse/SPARK-5220 Thanks, Snehal On 8 June 2015 at 20:56, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Seems to be related to this JIRA : https://issues.apache.org/jira/browse/SPARK-3612 ? On Tue, Jun 9, 2015 at 7:39 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Snehal Are you running the latest kafka consumer from github/spark-packages ? If not can you take the latest changes. This low level receiver will make attempt to keep trying if underlying BlockManager gives error. Are you see those retry cycle in log ? If yes then there is issue writing blocks to blockmanager and spark not able to recover from this failure but Receivet keep trying .. Which version of Spark you are using ? Dibyendu On Jun 9, 2015 5:14 AM, Snehal Nagmote nagmote.sne...@gmail.com wrote: All, I am using Kafka Spark Consumer https://github.com/dibbhatt/kafka-spark-consumer in spark streaming job . After spark streaming job runs for few hours , all executors exit and I still see status of application on SPARK UI as running Does anyone know cause of this exception and how to fix this ? WARN [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - Error reported by receiver for stream 7: Error While Store for Partition Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 1),10492,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136) at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152) at consumer.kafka.PartitionManager.next(PartitionManager.java:215) at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75) at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ... 14 more WARN [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, false, false, 1),0,0,0)] in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at
Re: Issue running Spark 1.4 on Yarn
Yes! If I either specify a different queue or don't specify a queue at all, it works. On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin van...@cloudera.com wrote: Does it work if you don't specify a queue? On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com wrote: Hi Marcelo, Yes, restarting YARN fixes this behavior and it again works the first few times. The only thing that's consistent is that once Spark job submissions stop working, it's broken for good. On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Apologies, I see you already posted everything from the RM logs that mention your stuck app. Have you tried restarting the YARN cluster to see if that changes anything? Does it go back to the first few tries work behaviour? I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like this. On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo -- Marcelo -- Marcelo
Re: Issue running Spark 1.4 on Yarn
From the RM scheduler, I see 3 applications currently stuck in the root.thequeue queue. Used Resources: memory:0, vCores:0 Num Active Applications: 0 Num Pending Applications: 3 Min Resources: memory:0, vCores:0 Max Resources: memory:6655, vCores:4 Steady Fair Share: memory:1664, vCores:0 Instantaneous Fair Share: memory:6655, vCores:0 On Tue, Jun 9, 2015 at 4:30 PM, Matt Kapilevich matve...@gmail.com wrote: Yes! If I either specify a different queue or don't specify a queue at all, it works. On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin van...@cloudera.com wrote: Does it work if you don't specify a queue? On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com wrote: Hi Marcelo, Yes, restarting YARN fixes this behavior and it again works the first few times. The only thing that's consistent is that once Spark job submissions stop working, it's broken for good. On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Apologies, I see you already posted everything from the RM logs that mention your stuck app. Have you tried restarting the YARN cluster to see if that changes anything? Does it go back to the first few tries work behaviour? I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like this. On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo -- Marcelo -- Marcelo
[SPARK-6330] 1.4.0/1.5.0 Bug to access S3 -- AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
Hi All, I have some code to access s3 from Spark. The code is as simple as: JavaSparkContext ctx = new JavaSparkContext(sparkConf); Configuration hadoopConf = ctx.hadoopConfiguration(); // aws.secretKey=Zqhjim3GB69hMBvfjh+7NX84p8sMF39BHfXwO3Hs // aws.accessKey=AKIAI4YXBAJTJ77VKS4A hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, ---); hadoopConf.set(fs.s3n.awsSecretAccessKey, --); SQLContext sql = new SQLContext(ctx); DataFrame grid_lookup = sql.parquetFile(s3n://---); grid_lookup.count(); ctx.stop(); The code works for 1.3.1. And for 1.4.0 and latest 1.5.0, it always give me below exception: Exception in thread main java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). I don't know why, I remember this is a known issue in 1.3.0: https://issues.apache.org/jira/browse/SPARK-6330, and solved in 1.3.1 But now it is not working again for a newer version? I remember while I switched to 1.4.0, for a while it works (while I worked with the master branch of the latest source code), and I just refresh latest code, and I am given this error again. Anyone has idea? Regards, Shuai
Linear Regression with SGD
Hi User group, We are using spark Linear Regression with SGD as the optimization technique and we are achieving very sub-optimal results. Can anyone shed some light on why this implementation seems to produce such poor results vs our own implementation? We are using a very small dataset, but we have to use a very large number of iterations to achieve similar results to our implementation, we’ve tried normalizing the data not normalizing the data and tuning every param. Our implementation is a closed form solution so we should be guaranteed convergence but the spark one is not, which is understandable, but why is it so far off? Has anyone experienced this? Steve Carman, M.S. Artificial Intelligence Engineer Coldlight-PTC scar...@coldlight.com This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment.
Re: Issue running Spark 1.4 on Yarn
Apologies, I see you already posted everything from the RM logs that mention your stuck app. Have you tried restarting the YARN cluster to see if that changes anything? Does it go back to the first few tries work behaviour? I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like this. On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo -- Marcelo
Re: Issue running Spark 1.4 on Yarn
Does it work if you don't specify a queue? On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com wrote: Hi Marcelo, Yes, restarting YARN fixes this behavior and it again works the first few times. The only thing that's consistent is that once Spark job submissions stop working, it's broken for good. On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Apologies, I see you already posted everything from the RM logs that mention your stuck app. Have you tried restarting the YARN cluster to see if that changes anything? Does it go back to the first few tries work behaviour? I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like this. On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo -- Marcelo -- Marcelo
Re: which database for gene alignment data ?
Hi Frank, Thanks for the reply. I downloaded ADAM and built it but it does not seem to list this function for command line options. Are these exposed as public API and I can call it from code ? Also , I need to save all my intermediate data. Seems like ADAM stores data in Parquet on HDFS. I want to save something in an external database, so that we can re-use the saved data in multiple ways by multiple people. Any suggestions on the DB selection or keeping data centralized for use by multiple distinct groups? Thanks -Roni On Mon, Jun 8, 2015 at 12:47 PM, Frank Austin Nothaft fnoth...@berkeley.edu wrote: Hi Roni, We have a full suite of genomic feature parsers that can read BED, narrowPeak, GATK interval lists, and GTF/GFF into Spark RDDs in ADAM https://github.com/bigdatagenomics/adam Additionally, we have support for efficient overlap joins (query 3 in your email below). You can load the genomic features with ADAMContext.loadFeatures https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala#L438. We have two tools for the overlap computation: you can use a BroadcastRegionJoin https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala if one of the datasets you want to overlap is small or a ShuffleRegionJoin https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala if both datasets are large. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jun 8, 2015, at 9:39 PM, roni roni.epi...@gmail.com wrote: Sorry for the delay. The files (called .bed files) have format like - Chromosome start endfeature score strand chr1 713776 714375 peak.1 599+ chr1 752401 753000 peak.2 599+ The mandatory fields are 1. chrom - The name of the chromosome (e.g. chr3, chrY, chr2_random) or scaffold (e.g. scaffold10671). 2. chromStart - The starting position of the feature in the chromosome or scaffold. The first base in a chromosome is numbered 0. 3. chromEnd - The ending position of the feature in the chromosome or scaffold. The *chromEnd* base is not included in the display of the feature. For example, the first 100 bases of a chromosome are defined as *chromStart=0, chromEnd=100*, and span the bases numbered 0-99. There can be more data as described - https://genome.ucsc.edu/FAQ/FAQformat.html#format1 Many times the use cases are like 1. find the features between given start and end positions 2.Find features which have overlapping start and end points with another feature. 3. read external (reference) data which will have similar format (chr10 4851478549604641MAPK8 49514785+) and find all the data points which are overlapping with the other .bed files. The data is huge. .bed files can range from .5 GB to 5 gb (or more) I was thinking of using cassandra, but not sue if the overlapping queries can be supported and will be fast enough. Thanks for the help -Roni On Sat, Jun 6, 2015 at 7:03 AM, Ted Yu yuzhih...@gmail.com wrote: Can you describe your use case in a bit more detail since not all people on this mailing list are familiar with gene sequencing alignments data ? Thanks On Fri, Jun 5, 2015 at 11:42 PM, roni roni.epi...@gmail.com wrote: I want to use spark for reading compressed .bed file for reading gene sequencing alignments data. I want to store bed file data in db and then use external gene expression data to find overlaps etc, which database is best for it ? Thanks -Roni
Re: Issue running Spark 1.4 on Yarn
Hi Marcelo, Yes, restarting YARN fixes this behavior and it again works the first few times. The only thing that's consistent is that once Spark job submissions stop working, it's broken for good. On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Apologies, I see you already posted everything from the RM logs that mention your stuck app. Have you tried restarting the YARN cluster to see if that changes anything? Does it go back to the first few tries work behaviour? I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like this. On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo -- Marcelo
spark on yarn
In my test data, I have a JavaRDD with a single String(size of this RDD is 1). On a 3 node Yarn cluster, mapToPair function on this RDD sends the same input String to 2 different nodes. Container logs on these nodes show the same string as input. Overriding default partition count by JavaRDDString input = sparkContext.textFile(hdfsPath, 0); didn't change anything and the same input string is being processed twice. Is there a way to make sure that each string in a RDD is processed exactly once? Thanks, Neera -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-on-yarn-tp23230.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue running Spark 1.4 on Yarn
On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo
Re: Kafka Spark Streaming: ERROR EndpointWriter: dropping message
1) Could you share your command? 2) Are the kafka brokers on the same host? 3) Could you run a --describe on the topic to see if the topic is setup correctly (just to be sure)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228p23235.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Determining number of executors within RDD
You should try, from the SparkConf object, to issue a get. I don't have the exact name for the matching key, but from reading the code in SparkSubmit.scala, it should be something like: conf.get(spark.executor.instances) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23234.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can a Spark App run with spark-submit write pdf files to HDFS
By writing PDF files, do you mean something equivalent to a hadoop fs -put /path? I'm not sure how Pdfbox works though, have you tried writing individually without spark? We can potentially look if you have established that as a starting point to see how Spark can be interfaced to write to HDFS. Moreover, is there a specific need to use Spark in this case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-Spark-App-run-with-spark-submit-write-pdf-files-to-HDFS-tp23233p23237.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue running Spark 1.4 on Yarn
I see the other jobs SUCCEEDED without issues. Could you snapshot the FairScheduler activity as well? My guess it, with the single core, it is reaching a NodeManager that is still busy with other jobs and the job ends up in a waiting state. Does the job eventually complete? Could you potentially add another node to the cluster to see if my guess is right? I just see one Active NM. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-running-Spark-1-4-on-Yarn-tp23211p23236.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can a Spark App run with spark-submit write pdf files to HDFS
I would like to write pdf files using pdfbox to HDFS from my Spark application. Can this be done? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-Spark-App-run-with-spark-submit-write-pdf-files-to-HDFS-tp23233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue running Spark 1.4 on Yarn
I've tried running a Hadoop app pointing to the same queue. Same thing now, the job doesn't get accepted. I've cleared out the queue and killed all the pending jobs, the queue is still unusable. It seems like an issue with YARN, but it's specifically Spark that leaves the queue in this state. I've ran a Hadoop job in a for loop 10x, while specifying the queue explicitly, just to double-check. On Tue, Jun 9, 2015 at 4:45 PM, Matt Kapilevich matve...@gmail.com wrote: From the RM scheduler, I see 3 applications currently stuck in the root.thequeue queue. Used Resources: memory:0, vCores:0 Num Active Applications: 0 Num Pending Applications: 3 Min Resources: memory:0, vCores:0 Max Resources: memory:6655, vCores:4 Steady Fair Share: memory:1664, vCores:0 Instantaneous Fair Share: memory:6655, vCores:0 On Tue, Jun 9, 2015 at 4:30 PM, Matt Kapilevich matve...@gmail.com wrote: Yes! If I either specify a different queue or don't specify a queue at all, it works. On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin van...@cloudera.com wrote: Does it work if you don't specify a queue? On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich matve...@gmail.com wrote: Hi Marcelo, Yes, restarting YARN fixes this behavior and it again works the first few times. The only thing that's consistent is that once Spark job submissions stop working, it's broken for good. On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin van...@cloudera.com wrote: Apologies, I see you already posted everything from the RM logs that mention your stuck app. Have you tried restarting the YARN cluster to see if that changes anything? Does it go back to the first few tries work behaviour? I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything like this. On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich matve...@gmail.com wrote: Like I mentioned earlier, I'm able to execute Hadoop jobs fine even now - this problem is specific to Spark. That doesn't necessarily mean anything. Spark apps have different resource requirements than Hadoop apps. Check your RM logs for any line that mentions your Spark app id. That may give you some insight into what's happening or not. -- Marcelo -- Marcelo -- Marcelo
Re: spark-submit working differently than pyspark when trying to find external jars
I figured it out *in case anyone else has this problem in the future. spark-submit --driver-class-path lib/postgresql-9.4-1201.jdbc4.jar --packages com.databricks:spark-csv_2.10:1.0.3 path/to/my/script.py What I found is that you MUST put the path to your script at the end of the spark-submit command. Also, wildcards in the --driver-class-path work when using pyspark but don't work when using spark-submit. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-working-differently-than-pyspark-when-trying-to-find-external-jars-tp23231p23232.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Cassandra Submit
Looks like the real culprit is a library version mismatch: Caused by: java.lang.NoSuchMethodError: org.apache.cassandra.thrift.TFramedTransportFactory.openTransport(Ljava/lang/String;I)Lorg/apache/thrift/transport/TTransport; at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createThriftClient(CassandraConnectionFactory.scala:41) at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:134) ... 28 more The Spark Cassandra Connector is trying to use a method, which does not exists. That means your assembly jar has the wrong version of the library that SCC is trying to use. Welcome to jar hell! Mohammed From: Yasemin Kaya [mailto:godo...@gmail.com] Sent: Tuesday, June 9, 2015 12:24 PM To: Mohammed Guller Cc: Yana Kadiyska; Gerard Maas; user@spark.apache.org Subject: Re: Cassandra Submit My code https://gist.github.com/yaseminn/d77dd9baa6c3c43c7594 and exceptionhttps://gist.github.com/yaseminn/fdd6e5a6efa26219b4d3. and ~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh Connected to Test Cluster at 127.0.0.1:9042http://127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3] Use HELP for help. cqlsh use test; cqlsh:test select * from people; id | name +- 5 | eslem 1 | yasemin 8 | ali 2 | busra 4 | ilham 7 | kubra 6 |tuba 9 |aslı 3 | Andrew (9 rows) cqlsh:test bin/cassandra-cli -h 127.0.0.1 -p 9160 Connected to: Test Cluster on 127.0.0.1/9160http://127.0.0.1/9160 Welcome to Cassandra CLI version 2.1.5 The CLI is deprecated and will be removed in Cassandra 3.0. Consider migrating to cqlsh. CQL is fully backwards compatible with Thrift data; see http://www.datastax.com/dev/blog/thrift-to-cql3 Type 'help;' or '?' for help. Type 'quit;' or 'exit;' to quit. [default@unknown] yasemin 2015-06-09 22:03 GMT+03:00 Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com: It is strange that writes works but read does not. If it was a Cassandra connectivity issue, then neither write or read would work. Perhaps the problem is somewhere else. Can you send the complete exception trace? Also, just to make sure that there is no DNS issue, try this: ~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h 127.0.0.1 -p 9160 Mohammed From: Yasemin Kaya [mailto:godo...@gmail.commailto:godo...@gmail.com] Sent: Tuesday, June 9, 2015 11:32 AM To: Yana Kadiyska Cc: Gerard Maas; Mohammed Guller; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Cassandra Submit I removed core and streaming jar. And the exception still same. I tried what you said then results: ~/cassandra/apache-cassandra-2.1.5$ bin/cassandra-cli -h localhost -p 9160 Connected to: Test Cluster on localhost/9160 Welcome to Cassandra CLI version 2.1.5 The CLI is deprecated and will be removed in Cassandra 3.0. Consider migrating to cqlsh. CQL is fully backwards compatible with Thrift data; see http://www.datastax.com/dev/blog/thrift-to-cql3 Type 'help;' or '?' for help. Type 'quit;' or 'exit;' to quit. [default@unknown] and ~/cassandra/apache-cassandra-2.1.5$ bin/cqlsh Connected to Test Cluster at 127.0.0.1:9042http://127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 2.1.5 | CQL spec 3.2.0 | Native protocol v3] Use HELP for help. cqlsh Thank you for your kind responses ... 2015-06-09 20:59 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com: Hm, jars look ok, although it's a bit of a mess -- you have spark-assembly 1.3.0 but then core and streaming 1.3.1...It's generally a bad idea to mix versions. Spark-assembly bundless all spark packages, so either do them separately or use spark-assembly but don't mix like you've shown. As to the port issue -- what about this: $bin/cassandra-cli -h localhost -p 9160 Connected to: Test Cluster on localhost/9160 Welcome to Cassandra CLI version 2.1.5 On Tue, Jun 9, 2015 at 1:29 PM, Yasemin Kaya godo...@gmail.commailto:godo...@gmail.com wrote: My jar files are: cassandra-driver-core-2.1.5.jar cassandra-thrift-2.1.3.jar guava-18.jar jsr166e-1.1.0.jar spark-assembly-1.3.0.jar spark-cassandra-connector_2.10-1.3.0-M1.jar spark-cassandra-connector-java_2.10-1.3.0-M1.jar spark-core_2.10-1.3.1.jar spark-streaming_2.10-1.3.1.jar And my code from datastax spark-cassandra-connectorhttps://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-demos/simple-demos/src/main/java/com/datastax/spark/connector/demo/JavaApiDemo.java. Thanx alot. yasemin 2015-06-09 18:58 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com: hm. Yeah, your port is good...have you seen this thread: http://stackoverflow.com/questions/27288380/fail-to-use-spark-cassandra-connector ? It seems that you might be running into version mis-match issues? What versions of Spark/Cassandra-connector are you trying to use? On Tue, Jun 9, 2015 at 10:18 AM, Yasemin Kaya
Re: Linear Regression with SGD
Hi Stephen How many is a very large number of iterations? SGD is notorious for requiring 100s or 1000s of iterations, also you may need to spend some time tweaking the step-size. In 1.4 there is an implementation of ElasticNet Linear Regression which is supposed to compare favourably with an equivalent R implementation. On 9 Jun 2015, at 22:05, Stephen Carman scar...@coldlight.com wrote: Hi User group, We are using spark Linear Regression with SGD as the optimization technique and we are achieving very sub-optimal results. Can anyone shed some light on why this implementation seems to produce such poor results vs our own implementation? We are using a very small dataset, but we have to use a very large number of iterations to achieve similar results to our implementation, we’ve tried normalizing the data not normalizing the data and tuning every param. Our implementation is a closed form solution so we should be guaranteed convergence but the spark one is not, which is understandable, but why is it so far off? Has anyone experienced this? Steve Carman, M.S. Artificial Intelligence Engineer Coldlight-PTC scar...@coldlight.com This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
BigDecimal problem in parquet file
Hi, When I try to save my data frame as a parquet file I get the following error: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) How to fix this problem ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3.1 SparkSQL metastore exceptions
Hi, I'm using Spark 1.3.1 to insert into a Hive 0.12 table from a SparkSQL query. The query is a very simple select from a dummy Hive table used for benchmarking. I'm using a create table as statement to do the insert. No matter if I do that or an insert overwrite, I get the same Hive exception, unable to alter table, with some Hive metastore issues. The data is inserted into the Hive table as expected, however I get a very long stacktrace. Does anyone know the meaning of the stacktrace and how I can avoid generating it every time I insert into a table? scala hiveContext.sql(create table benchmarking.spark_logins_benchmark as select * from benchmarking.logins_benchmark limit 10) org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:387) at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1448) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:235) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:123) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:255) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.hive.execution.CreateTableAsSelect.run(CreateTableAsSelect.scala:70) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:147) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:130) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: Implementing top() using treeReduce()
Having the following code in RDD.scala works for me. PS, in the following code, I merge the smaller queue into larger one. I wonder if this will help performance. Let me know when you do the benchmark. def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items = // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { Array.empty } else { mapRDDs.treeReduce { (queue1, queue2) = if (queue1.size queue2.size) { queue1 ++= queue2 queue1 } else { queue2 ++= queue1 queue2 } }.toArray.sorted(ord) } } } def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { treeTakeOrdered(num)(ord.reverse) } Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D On Tue, Jun 9, 2015 at 10:09 AM, raggy raghav0110...@gmail.com wrote: I am trying to implement top-k in scala within apache spark. I am aware that spark has a top action. But, top() uses reduce(). Instead, I would like to use treeReduce(). I am trying to compare the performance of reduce() and treeReduce(). The main issue I have is that I cannot use these 2 lines of code which are used in the top() action within my Spark application. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) How can I go about implementing top() using treeReduce()? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to clear state in Spark Streaming based on emitting
With Spark Streaming, I am maintaining a state (updateStateByKey every 30s) and emitting to file parts of that state that have been closed every 5 minutes, but only care about the last state collected. In 5m, there will be 10 updateStateByKey iterations called. For example: … val ssc = new StreamingContext(sc, Seconds(30)) val expiredState = state .filter(_._2.expired == true) .window(windowDuration = Seconds(30), slideDuration) … When I go to emit, I want to update a Boolean flag in my collection of state that says it has been collected, so that the next time state is updated I can remove what has been emitted. Is there a way to do this or maybe a better pattern or approach to solve this problem? Hopefully I have given enough information to explain the use case. Thanks, Robert
Re: Can a Spark App run with spark-submit write pdf files to HDFS
I don't know anything about your use case, so take this with a grain of salt, but typically if you are operating at a scale that benefits from Spark, then you likely will not want to write your output records as individual files into HDFS. Spark has built-in support for the Hadoop SequenceFile container format, which is a more scalable way to handle writing out your results; you could write your Spark RDD transformations in such a way that your final RDD is a PairRDD with a unique key (possibly what would normally have been the standalone file name) and the value (in this case, likely the byte array of the PDF you generated). It looks like PDFBox's PDDocument class allows you to save the document to an OutputStream https://pdfbox.apache.org/docs/1.8.9/javadocs/org/apache/pdfbox/pdmodel/PDDocument.html#save(java.io.OutputStream), so you could probably get away with saving to a ByteArrayOutputStream, and snagging the bytes that comprise the final document. You can see more about how to write SequenceFiles from Spark here https://spark.apache.org/docs/latest/programming-guide.html#actions. As an aside, one hint that I have found helpful since I starting working with Spark is that if your transformation requires classes that are expensive to instantiate, you may want to look into mapPartitions, which allows you to do the setup once per partition instead of once per record. I haven't used PDFBox, but it wouldn't surprise me to learn that there's some non-neglible overhead involved. Hope that helps, Will On Tue, Jun 9, 2015 at 5:57 PM, Richard Catlin richard.m.cat...@gmail.com wrote: I would like to write pdf files using pdfbox to HDFS from my Spark application. Can this be done? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-Spark-App-run-with-spark-submit-write-pdf-files-to-HDFS-tp23233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flatMap output on disk / flatMap memory overhead
I agree with Richard. It looks like the issue here is shuffling, and shuffle data is always written to disk, so the issue is definitely not that all the output of flatMap has to be stored in memory. If at all possible, I'd first suggest upgrading to a new version of spark -- even in 1.2, there were big improvements to shuffle with sort based shuffle as the default. On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com wrote: Are you sure it's memory related? What is the disk utilization and IO performance on the workers? The error you posted looks to be related to shuffle trying to obtain block data from another worker node and failing to do so in reasonable amount of time. It may still be memory related, but I'm not sure that other resources are ruled out yet. On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: I was tried using reduceByKey, without success. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-submit does not use hive-site.xml
I am using Spark (standalone) to run queries (from a remote client) against data in tables that are already defined/loaded in Hive. I have started metastore service in Hive successfully, and by putting hive-site.xml, with proper metastore.uri, in $SPARK_HOME/conf directory, I tried to share its config with spark. When I start spark-shell, it gives me a default sqlContext, and I can use that to access my Hive's tables with no problem. But once I submit a similar query via Spark application through 'spark-submit', it does not see the tables and it seems it does not pick hive-site.xml which is under conf directory in Spark's home. I tried to use '--files' argument with spark-submit to pass hive-site.xml' to the workers, but it did not change anything. Here is how I try to run the application: $SPARK_HOME/bin/spark-submit --class SimpleClient --master spark://my-spark-master:7077 --files=$SPARK_HOME/conf/hive-site.xml simple-sql-client-1.0.jar Here is the simple example code that I try to run (in Java): SparkConf conf = new SparkConf().setAppName(Simple SQL Client); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame res = sqlContext.sql(show tables); res.show(); Here are the SW versions: Spark: 1.3 Hive: 1.2 Hadoop: 2.6 Thanks in advance for any suggestion.
Re: Linear Regression with SGD
As Robin suggested, you may try the following new implementation. https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef Thanks. Sincerely, DB Tsai -- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D On Tue, Jun 9, 2015 at 3:22 PM, Robin East robin.e...@xense.co.uk wrote: Hi Stephen How many is a very large number of iterations? SGD is notorious for requiring 100s or 1000s of iterations, also you may need to spend some time tweaking the step-size. In 1.4 there is an implementation of ElasticNet Linear Regression which is supposed to compare favourably with an equivalent R implementation. On 9 Jun 2015, at 22:05, Stephen Carman scar...@coldlight.com wrote: Hi User group, We are using spark Linear Regression with SGD as the optimization technique and we are achieving very sub-optimal results. Can anyone shed some light on why this implementation seems to produce such poor results vs our own implementation? We are using a very small dataset, but we have to use a very large number of iterations to achieve similar results to our implementation, we’ve tried normalizing the data not normalizing the data and tuning every param. Our implementation is a closed form solution so we should be guaranteed convergence but the spark one is not, which is understandable, but why is it so far off? Has anyone experienced this? Steve Carman, M.S. Artificial Intelligence Engineer Coldlight-PTC scar...@coldlight.com This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cassandra Submit
Is your cassandra installation actually listening on 9160? lsof -i :9160COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java29232 ykadiysk 69u IPv4 42152497 0t0 TCP localhost:9160 (LISTEN) I am running an out-of-the box cassandra conf where rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 On Tue, Jun 9, 2015 at 7:36 AM, Yasemin Kaya godo...@gmail.com wrote: I couldn't find any solution. I can write but I can't read from Cassandra. 2015-06-09 8:52 GMT+03:00 Yasemin Kaya godo...@gmail.com: Thanks alot Mohammed, Gerard and Yana. I can write to table, but exception returns me. It says *Exception in thread main java.io.IOException: Failed to open thrift connection to Cassandra at 127.0.0.1:9160 http://127.0.0.1:9160* In yaml file : rpc_address: localhost rpc_port: 9160 And at project : .set(spark.cassandra.connection.host, 127.0.0.1) .set(spark.cassandra.connection.rpc.port, 9160); or .set(spark.cassandra.connection.host, localhost) .set(spark.cassandra.connection.rpc.port, 9160); whatever I write setting, I get same exception. Any help ?? 2015-06-08 18:23 GMT+03:00 Yana Kadiyska yana.kadiy...@gmail.com: yes, whatever you put for listen_address in cassandra.yaml. Also, you should try to connect to your cassandra cluster via bin/cqlsh to make sure you have connectivity before you try to make a a connection via spark. On Mon, Jun 8, 2015 at 4:43 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I run my project on local. How can find ip address of my cassandra host ? From cassandra.yaml or ?? yasemin 2015-06-08 11:27 GMT+03:00 Gerard Maas gerard.m...@gmail.com: ? = ip address of your cassandra host On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote: Hi , How can I find spark.cassandra.connection.host? And what should I change ? Should I change cassandra.yaml ? Error says me *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* What should I add *SparkConf sparkConf = new SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts, true).set(spark.cassandra.connection.host, ?);* Best yasemin 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com: Check your spark.cassandra.connection.host setting. It should be pointing to one of your Cassandra nodes. Mohammed *From:* Yasemin Kaya [mailto:godo...@gmail.com] *Sent:* Friday, June 5, 2015 7:31 AM *To:* user@spark.apache.org *Subject:* Cassandra Submit Hi, I am using cassandraDB in my project. I had that error *Exception in thread main java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042* I think I have to modify the submit line. What should I add or remove when I submit my project? Best, yasemin -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç -- hiç ender hiç
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
At which point would I call cache()? I just want the runtime to spill to disk when necessary without me having to know when the necessary is. On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger c...@koeninger.org wrote: direct stream isn't a receiver, it isn't required to cache data anywhere unless you want it to. If you want it, just call cache. On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: set the storage policy for the DStream RDDs to MEMORY AND DISK - it appears the storage level can be specified in the createStream methods but not createDirectStream... On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently and benefit from ongoing online learning – BUT this is STILL about voluntarily sacrificing your performance in the name of keeping your system stable – it is not
Re: Spark SQL with Thrift Server is very very slow and finally failing
From log file I noticed that the ExecutorLostFailure happens after the memory used by Executor becomes more than the Executor memory value. However, even if I increase the value of Executor Memory the Executor fails - only that it takes longer time. I'm wondering that for joining 2 Hive tables, one with 100 MB data (around 1 M rows) and another with 20 KB data (around 100 rows) why an executor is consuming so much of memory. Even if I increase the memory to 20 GB. The same failure happens. Regards, Sourav On Tue, Jun 9, 2015 at 12:58 PM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, I'm just doing a select statement which is supposed to return 10 MB data maximum. The driver memory is 2G and executor memory is 20 G. The query I'm trying to run is something like SELECT PROJECT_LIVE_DT, FLOORPLAN_NM, FLOORPLAN_DB_KEY FROM POG_PRE_EXT P, PROJECT_CALENDAR_EXT C WHERE PROJECT_TYPE = 'CR' Not sure what exactly you mean by physical plan. Here is he stack trace from the machine where the thrift process is running. Regards, Sourav On Mon, Jun 8, 2015 at 11:18 PM, Cheng, Hao hao.ch...@intel.com wrote: Is it the large result set return from the Thrift Server? And can you paste the SQL and physical plan? *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Tuesday, June 9, 2015 12:01 PM *To:* Sourav Mazumder *Cc:* user *Subject:* Re: Spark SQL with Thrift Server is very very slow and finally failing Which Spark release are you using ? Can you pastebin the stack trace w.r.t. ExecutorLostFailure ? Thanks On Mon, Jun 8, 2015 at 8:52 PM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, I am trying to run a SQL form a JDBC driver using Spark's Thrift Server. I'm doing a join between a Hive Table of size around 100 GB and another Hive Table with 10 KB, with a filter on a particular column The query takes more than 45 minutes and then I get ExecutorLostFailure. That is because of memory as once I increase the memory the failure happens but after a long time. I'm having executor memory 20 GB, Spark DRiver Memory 2 GB, Executor Instances 2 and Executor Core 2. Running the job using Yarn with master as 'yarn-client'. Any idea if I'm missing any other configuration ? Regards, Sourav
Spark's Scala shell killing itself
Hi, I have configured Spark to run on YARN. Whenever I start spark shell using 'spark-shell' command, it automatically gets killed. Output looks like below: ubuntu@dev-cluster-gateway:~$ ls shekhar/ edx-spark ubuntu@dev-cluster-gateway:~$ spark-shell Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.0-SNAPSHOT /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75) Type in expressions to have them evaluated. Type :help for more information. 15/06/10 05:20:45 WARN Utils: Your hostname, dev-cluster-gateway resolves to a loopback address: 127.0.0.1; using 10.182.149.171 instead (on interface eth0) 15/06/10 05:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/06/10 05:21:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable /usr/lib/spark/bin/spark-shell: line 48: 15573 Killed $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main ${SUBMISSION_OPTS[@]} spark-shell ${APPLICATION_OPTS[@]} Any clue why spark shell gets killed? Please let me know if other configuration/information is required. Thanks, Chandrash3khar Kotekar
append file on hdfs
hi community, i want append results to one file. if i work local my function build all right, if i run this on a yarn cluster, i lost same rows. here my function to write: points.foreach( new VoidFunctionTuple2Integer, GeoTimeDataTupel() { private static final long serialVersionUID = 2459995649387229261L; public void call(Tuple2Integer, GeoTimeDataTupel entry)throws Exception { try { FileSystem fs = FileSystem.get(new URI(pro.getProperty(hdfs.namenode)),new Configuration()); Path pt=new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results); if(fs.exists(pt)) { FSDataInputStream in = fs.open(pt); Path pt_temp = new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results_temp); backup(fs.getConf(), fs, in, pt_temp); in.close(); FSDataOutputStream out = fs.create((pt), true); FSDataInputStream backup = fs.open(pt_temp); int offset = 0; int bufferSize = 4096; int result = 0; byte[] buffer = new byte[bufferSize]; // pre read a part of content from input stream result = backup.read(offset, buffer, 0, bufferSize); // loop read input stream until it does not fill whole size of buffer while (result == bufferSize) { out.write(buffer); // read next segment from input stream by moving the offset pointer offset += bufferSize; result = backup.read(offset, buffer, 0, bufferSize); } if (result 0 result bufferSize) { for (int i = 0; i result; i++) { out.write(buffer[i]); } } out.writeBytes(Cluster: +entry._1+, Point: +entry._2.toString()+\n); out.close(); } else { BufferedWriter bw =new BufferedWriter(new OutputStreamWriter(fs.create(pt))); bw.write(Cluster: +entry._1+, Point: +entry._2.toString()+\n); bw.close(); } } catch (Exception e) { e.printStackTrace(); } } public void backup(Configuration conf, FileSystem fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception { FSDataOutputStream out = fs.create(pt_temp, true); IOUtils.copyBytes(sourceContent, out, 4096, false); out.close(); } where is my fault?? or give it a function to write(append) to the hadoop hdfs? best regards, paul
Re: RDD of RDDs
Yes true. That's why I said if and when. But hopefully I have given correct explanation of why rdd of rdd is not possible. On 09-Jun-2015 10:22 pm, Mark Hamstra m...@clearstorydata.com wrote: That would constitute a major change in Spark's architecture. It's not happening anytime soon. On Tue, Jun 9, 2015 at 1:34 AM, kiran lonikar loni...@gmail.com wrote: Possibly in future, if and when spark architecture allows workers to launch spark jobs (the functions passed to transformation or action APIs of RDD), it will be possible to have RDD of RDD. On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar loni...@gmail.com wrote: Simillar question was asked before: http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html Here is one of the reasons why I think RDD[RDD[T]] is not possible: - RDD is only a handle to the actual data partitions. It has a reference/pointer to the *SparkContext* object (*sc*) and a list of partitions. - The *SparkContext *is an object in the Spark Application/Driver Program's JVM. Similarly, the list of partitions is also in the JVM of the driver program. Each partition contains kind of remote references to the partition data on the worker JVMs. - The functions passed to RDD's transformations and actions execute in the worker's JVMs on different nodes. For example, in *rdd.map { x = x*x }*, the function performing *x*x* runs on the JVMs of the worker nodes where the partitions of the RDD reside. These JVMs do not have access to the *sc* since its only on the driver's JVM. - Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd = innerRDD.filter { x = x*x } }*, the worker nodes will not be able to execute the *filter* on *innerRDD *as the code in the worker does not have access to sc and can not launch a spark job. Hope it helps. You need to consider List[RDD] or some other collection. -Kiran On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote: Hi, The problem I am looking at is as follows: - I read in a log file of multiple users as a RDD - I'd like to group the above RDD into *multiple RDDs* by userIds (the key) - my processEachUser() function then takes in each RDD mapped into each individual user, and calls for RDD.map or DataFrame operations on them. (I already had the function coded, I am therefore reluctant to work with the ResultIterable object coming out of rdd.groupByKey() ... ) I've searched the mailing list and googled on RDD of RDDs and seems like it isn't a thing at all. A few choices left seem to be: 1) groupByKey() and then work with the ResultIterable object; 2) groupbyKey() and then write each group into a file, and read them back as individual rdds to process.. Anyone got a better idea or had a similar problem before? Thanks! Ping -- Ping Yan Ph.D. in Management Dept. of Management Information Systems University of Arizona Tucson, AZ 85721
RE: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS
Thanks So much! I did put sleep on my code to have the UI available. Now from the UI, I can see: · In the “SparkProperty” Section, the spark.jars and spark.files are set as what I want. · In the “Classpath Entries” Section, my jars and files paths are there(with a HDFS path) And I check the HTTP file server directory, the stuctrue is like: D:\data\temp \ --spark-UUID \-- httpd-UUID \jars [empty] \files [empty] So I guess the files and jars and not properly downloaded from HDFS to these folders? I’m using standalone mode. Any ideas? Thanks Dong Lei From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, June 9, 2015 4:46 PM To: Dong Lei Cc: user@spark.apache.org Subject: Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS You can put a Thread.sleep(10) in the code to have the UI available for quiet some time. (Put it just before starting any of your transformations) Or you can enable the spark history serverhttps://spark.apache.org/docs/latest/monitoring.html too. I believe --jarshttps://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management would download the dependency jars on all your worker machines (can be found in spark work dir of your application along with stderr stdout files). Thanks Best Regards On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei dong...@microsoft.commailto:dong...@microsoft.com wrote: Thanks Akhil: The driver fails so fast to get a look at 4040. Is there any other way to see the download and ship process of the files? Is driver supposed to download these jars from HDFS to some location, then ship them to excutors? I can see from log that the driver downloaded the application jar but not the other jars specified by “—jars”. Or I misunderstand the usage of “--jars”, and the jars should be already in every worker, driver will not download them? Is there some useful docs? Thanks Dong Lei From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Tuesday, June 9, 2015 3:24 PM To: Dong Lei Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS Once you submits the application, you can check in the driver UI (running on port 4040) Environment Tab to see whether those jars you added got shipped or not. If they are shipped and still you are getting NoClassDef exceptions then it means that you are having a jar conflict which you can resolve by putting the jar with the class in it on the top of your classpath. Thanks Best Regards On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.commailto:dong...@microsoft.com wrote: Hi, spark-users: I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a job, with the following command: Spark-submit --class myClass --master spark://localhost:7077/ --deploy-mode cluster --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar --files hdfs://localhost/1.txt, hdfs://localhost/2.txt hdfs://localhost/main.jar the stderr in the driver showed java.lang.ClassNotDefException for a class in 1.jar. I checked the log that spark has added these jars: INFO SparkContext: Added JAR hdfs:// …1.jar INFO SparkContext: Added JAR hdfs:// …2.jar In the folder of the driver, I only saw the main.jar is copied to that place, but the other jars and files were not there Could someone explain how should I pass the jars and files needed by the main jar to spark? If my class in main.jar refer to these files with a relative path, will spark copy these files into one folder? BTW, my class works in a client mode with all jars and files in local. Thanks Dong Lei
Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS
I am not sure they work with HDFS pathes. You may want to look at the source code. Alternatively you can create a fat jar containing all jars (let your build tool set correctly METAINF). This always works. Le mer. 10 juin 2015 à 6:22, Dong Lei dong...@microsoft.com a écrit : Thanks So much! I did put sleep on my code to have the UI available. Now from the UI, I can see: · In the “SparkProperty” Section, the spark.jars and spark.files are set as what I want. · In the “Classpath Entries” Section, my jars and files paths are there(with a HDFS path) And I check the HTTP file server directory, the stuctrue is like: D:\data\temp \ --spark-UUID \-- httpd-UUID \jars [*empty*] \files [*empty*] So I guess the files and jars and not properly downloaded from HDFS to these folders? I’m using standalone mode. Any ideas? Thanks Dong Lei *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, June 9, 2015 4:46 PM *To:* Dong Lei *Cc:* user@spark.apache.org *Subject:* Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS You can put a Thread.sleep(10) in the code to have the UI available for quiet some time. (Put it just before starting any of your transformations) Or you can enable the spark history server https://spark.apache.org/docs/latest/monitoring.html too. I believe --jars https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management would download the dependency jars on all your worker machines (can be found in spark work dir of your application along with stderr stdout files). Thanks Best Regards On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei dong...@microsoft.com wrote: Thanks Akhil: The driver fails so fast to get a look at 4040. Is there any other way to see the download and ship process of the files? Is driver supposed to download these jars from HDFS to some location, then ship them to excutors? I can see from log that the driver downloaded the application jar but not the other jars specified by “—jars”. Or I misunderstand the usage of “--jars”, and the jars should be already in every worker, driver will not download them? Is there some useful docs? Thanks Dong Lei *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, June 9, 2015 3:24 PM *To:* Dong Lei *Cc:* user@spark.apache.org *Subject:* Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS Once you submits the application, you can check in the driver UI (running on port 4040) Environment Tab to see whether those jars you added got shipped or not. If they are shipped and still you are getting NoClassDef exceptions then it means that you are having a jar conflict which you can resolve by putting the jar with the class in it on the top of your classpath. Thanks Best Regards On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.com wrote: Hi, spark-users: I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a job, with the following command: Spark-submit --class myClass --master spark://localhost:7077/ --deploy-mode cluster --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar --files hdfs://localhost/1.txt, hdfs://localhost/2.txt hdfs://localhost/main.jar the stderr in the driver showed java.lang.ClassNotDefException for a class in 1.jar. I checked the log that spark has added these jars: INFO SparkContext: Added JAR hdfs:// …1.jar INFO SparkContext: Added JAR hdfs:// …2.jar In the folder of the driver, I only saw the main.jar is copied to that place, *but the other jars and files were not there* Could someone explain *how should I pass the jars and files* needed by the main jar to spark? If my class in main.jar refer to these files with a relative path, *will spark copy these files into one folder*? BTW, my class works in a client mode with all jars and files in local. Thanks Dong Lei
RE: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS
Hi Jörn: I start to check code and sadly it seems it does not work hdfs path: In HTTPFileServer.scala: def addFileToDir: …. Files.copy …. It looks like it only copy file from local to the http server directory(then distribute the files to workers). Even if I make addFileToDir to recognize a hdfs prefix and download from hdfs, there still be another problems: the class path is set to the file path on hdfs and I do think it works. I will try a fat jar, BTW, do you know how can I ask to support such a scenario? Should I send to dev@spark? Does yarn-cluster mode support dependency jars and files on HDFS? Thanks Dong Lei From: Jörn Franke [mailto:jornfra...@gmail.com] Sent: Wednesday, June 10, 2015 12:48 PM To: Dong Lei; Akhil Das Cc: user@spark.apache.org Subject: Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS I am not sure they work with HDFS pathes. You may want to look at the source code. Alternatively you can create a fat jar containing all jars (let your build tool set correctly METAINF). This always works. Le mer. 10 juin 2015 à 6:22, Dong Lei dong...@microsoft.commailto:dong...@microsoft.com a écrit : Thanks So much! I did put sleep on my code to have the UI available. Now from the UI, I can see: • In the “SparkProperty” Section, the spark.jars and spark.files are set as what I want. • In the “Classpath Entries” Section, my jars and files paths are there(with a HDFS path) And I check the HTTP file server directory, the stuctrue is like: D:\data\temp \ --spark-UUID \-- httpd-UUID \jars [empty] \files [empty] So I guess the files and jars and not properly downloaded from HDFS to these folders? I’m using standalone mode. Any ideas? Thanks Dong Lei From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Tuesday, June 9, 2015 4:46 PM To: Dong Lei Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS You can put a Thread.sleep(10) in the code to have the UI available for quiet some time. (Put it just before starting any of your transformations) Or you can enable the spark history serverhttps://spark.apache.org/docs/latest/monitoring.html too. I believe --jarshttps://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management would download the dependency jars on all your worker machines (can be found in spark work dir of your application along with stderr stdout files). Thanks Best Regards On Tue, Jun 9, 2015 at 1:29 PM, Dong Lei dong...@microsoft.commailto:dong...@microsoft.com wrote: Thanks Akhil: The driver fails so fast to get a look at 4040. Is there any other way to see the download and ship process of the files? Is driver supposed to download these jars from HDFS to some location, then ship them to excutors? I can see from log that the driver downloaded the application jar but not the other jars specified by “—jars”. Or I misunderstand the usage of “--jars”, and the jars should be already in every worker, driver will not download them? Is there some useful docs? Thanks Dong Lei From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Tuesday, June 9, 2015 3:24 PM To: Dong Lei Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: ClassNotDefException when using spark-submit with multiple jars and files located on HDFS Once you submits the application, you can check in the driver UI (running on port 4040) Environment Tab to see whether those jars you added got shipped or not. If they are shipped and still you are getting NoClassDef exceptions then it means that you are having a jar conflict which you can resolve by putting the jar with the class in it on the top of your classpath. Thanks Best Regards On Tue, Jun 9, 2015 at 9:05 AM, Dong Lei dong...@microsoft.commailto:dong...@microsoft.com wrote: Hi, spark-users: I’m using spark-submit to submit multiple jars and files(all in HDFS) to run a job, with the following command: Spark-submit --class myClass --master spark://localhost:7077/ --deploy-mode cluster --jars hdfs://localhost/1.jar, hdfs://localhost/2.jar --files hdfs://localhost/1.txt, hdfs://localhost/2.txt hdfs://localhost/main.jar the stderr in the driver showed java.lang.ClassNotDefException for a class in 1.jar. I checked the log that spark has added these jars: INFO SparkContext: Added JAR hdfs:// …1.jar INFO SparkContext: Added JAR hdfs:// …2.jar In the folder of the driver, I only saw the main.jar is copied to that place, but the other jars and files were not there Could someone