Re: SparkStreaming batch processing time question
It will add scheduling delay for the new batch. The new batch data will be processed after finish up the previous batch, when the time is too high, sometimes it will throw fetch failures as the batch data could get removed from memory. Thanks Best Regards On Wed, Apr 1, 2015 at 11:35 AM, luohui20...@sina.com wrote: hi guys: I got a question when reading http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval . What will happen to the streaming data if the batch processing time is bigger than the batch interval? Will the next batch data be dalayed to process or the unfinished processing job to be discarded? thanks for any ideas shared? Thanksamp;Best regards! 罗辉 San.Luo
Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs
Hi, In spark over YARN, there is a property spark.yarn.max.executor.failures which controls the maximum number of executor's failure an application will survive. If number of executor's failures ( due to any reason like OOM or machine failure etc ), increases this value then applications quits. For small duration spark job, this looks fine, but for the long running jobs as this does not take into account the duration, this can lead to same treatment for two different scenarios ( mentioned below) : 1. executors failing with in 5 mins. 2. executors failing sparsely, but at some point even a single executor failure ( which application could have survived ) can make the application quit. Sending it to the community to listen what kind of behaviour / strategy they think will be suitable for long running spark jobs or spark streaming jobs. Thanks and Regards, Twinkle
RE: Using 'fair' scheduler mode with thrift server
The expensive query can take all executor slots, but no task occupy the executor permanently. i.e. The second job can possibly to take some resources to execute in-between tasks of the expensive queries. Can the fair scheduler mode help in this case? Or is it possible to setup thrift such that no query is taking all resources. From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 1, 2015 12:28 AM To: Asad Khan Cc: user@spark.apache.org Subject: Re: Using 'fair' scheduler mode Does the expensive query take all executor slots? Then there is nothing for any other job to use regardless of scheduling policy. On Mar 31, 2015 9:20 PM, asadrao as...@microsoft.commailto:as...@microsoft.com wrote: Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the first query is a very expensive query (ex: ‘select *’ on a really big data set) than any subsequent query seem to get blocked. I would have expected the second query to run in parallel since I am using the ‘fair’ scheduler mode not the ‘fifo’. I am submitting the query through thrift server. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
SparkStreaming batch processing time question
hi guys: I got a question when reading http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval. What will happen to the streaming data if the batch processing time is bigger than the batch interval? Will the next batch data be dalayed to process or the unfinished processing job to be discarded? thanks for any ideas shared? Thanksamp;Best regards! 罗辉 San.Luo
Re: Query REST web service with Spark?
Hello Minnow, It is possible. You can for example use Jersey REST client to query a web service and get its results in a Spark job. In fact, that's what we did actually in a recent project (in a Spark Streaming application). Kind regards, Emre Sevinç http://www.bigindustries.be/ On Tue, Mar 31, 2015 at 10:46 PM, Minnow Noir minnown...@gmail.com wrote: We have have some data on Hadoop that needs augmented with data only available to us via a REST service. We're using Spark to search for, and correct, missing data. Even though there are a lot of records to scour for missing data, the total number of calls to the service is expected to be low, so it would be ideal to do the whole job in Spark as we scour the data. I don't see anything obvious in the API or on Google relating to making REST calls from a Spark job. Is it possible? Thanks, Alec -- Emre Sevinc
Re: rdd.cache() not working ?
rerun person.count and you will see the performance of cache. person.cache would not cache it right now. It'll actually cache this RDD after one action[person.count here] - 原始邮件 - 发件人: fightf...@163.com 收件人: user user@spark.apache.org 发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 主题: rdd.cache() not working ? Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com -- --- Thanks Best regards 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路 99 号万向大厦8 楼, 200120 Address :Wanxiang Towen 8 F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话 |Phone : 021-60216502 手机 |Mobile: +86-18202171279
Re: Using 'fair' scheduler mode
I am facing the same issue. FAIR and FIFO behaving in the same way. On Wed, Apr 1, 2015 at 1:49 AM, asadrao as...@microsoft.com wrote: Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the first query is a very expensive query (ex: ‘select *’ on a really big data set) than any subsequent query seem to get blocked. I would have expected the second query to run in parallel since I am using the ‘fair’ scheduler mode not the ‘fifo’. I am submitting the query through thrift server. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: When do map how to get the line number?
You can use zipWithIndex() to get index for each record and then you can increment by 1 for each index. val tf=sc.textFile(test).zipWithIndex() tf.map(s=(s[1]+1,s[0])) Above should serve your purpose. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/When-do-map-how-to-get-the-line-number-tp22318p22334.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: --driver-memory parameter doesn't work for spark-submmit on yarn?
Once you submit the job do a ps aux | grep spark-submit and see how much is the heap space allocated to the process (the -Xmx params), if you are seeing a lower value you could try increasing it yourself with: export _JAVA_OPTIONS=-Xmx5g Thanks Best Regards On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn’t allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68) Do I do anything wrong here? And no matter how much I set for --driver-memory value (from 512M to 20G), it always give me error on the same line (that line try to load a 600MB java serialization file). So looks like the script doesn’t allocate right memory to driver in my case? Regards, Shuai
回复:Re: SparkStreaming batch processing time question
hummm, got it. Thank you Akhil. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: SparkStreaming batch processing time question 日期:2015年04月01日 14点31分 It will add scheduling delay for the new batch. The new batch data will be processed after finish up the previous batch, when the time is too high, sometimes it will throw fetch failures as the batch data could get removed from memory. Thanks Best Regards On Wed, Apr 1, 2015 at 11:35 AM, luohui20...@sina.com wrote: hi guys: I got a question when reading http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval. What will happen to the streaming data if the batch processing time is bigger than the batch interval? Will the next batch data be dalayed to process or the unfinished processing job to be discarded? thanks for any ideas shared? Thanksamp;Best regards! 罗辉 San.Luo
Re: Re: rdd.cache() not working ?
No, cache() changes the bookkeeping of the existing RDD. Although it returns a reference, it works to just call person.cache. I can't reproduce this. When I try to cache an RDD and then count it, it is persisted in memory and I see it in the web UI. Something else must be different about what's being executed. On Wed, Apr 1, 2015 at 8:26 AM, Yuri Makhno ymak...@gmail.com wrote: cache() method returns new RDD so you have to use something like this: val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) val cached = person.cache cached.count when you rerun count on cached you will see that cache works On Wed, Apr 1, 2015 at 9:35 AM, fightf...@163.com fightf...@163.com wrote: Hi That is just the issue. After running person.cache we then run person.count however, there still not be any cache performance showed from web ui storage. Thanks, Sun. fightf...@163.com From: Taotao.Li Date: 2015-04-01 14:02 To: fightfate CC: user Subject: Re: rdd.cache() not working ? rerun person.count and you will see the performance of cache. person.cache would not cache it right now. It'll actually cache this RDD after one action[person.count here] 发件人: fightf...@163.com 收件人: user user@spark.apache.org 发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 主题: rdd.cache() not working ? Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com -- --- Thanks Best regards 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话|Phone:021-60216502 手机|Mobile: +86-18202171279 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Creating Partitioned Parquet Tables via SparkSQL
This is tracked by these JIRAs.. https://issues.apache.org/jira/browse/SPARK-5947 https://issues.apache.org/jira/browse/SPARK-5948 From: denny.g@gmail.com Date: Wed, 1 Apr 2015 04:35:08 + Subject: Creating Partitioned Parquet Tables via SparkSQL To: user@spark.apache.org Creating Parquet tables via .saveAsTable is great but was wondering if there was an equivalent way to create partitioned parquet tables. Thanks!
Spark + Kafka
I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 142787141 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(142787140 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.499181 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.886121 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: app-20150401065621-0007/0 is now EXITED (Command exited with code 1) 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Executor app-20150401065621-0007/0 removed: Command exited with code 1 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to remove non-existent executor 0 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message (6.653705 ms) ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost :7077/user/Master#336117298] 2015-04-01 08:56:52 DEBUG
Re: Unable to save dataframe with UDT created with sqlContext.createDataFrame
Hmm, I got the same error with the master. Here is another test example that fails. Here, I explicitly create a Row RDD which corresponds to the use case I am in : *object TestDataFrame { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(TestDataFrame).setMaster(local[4]) val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)* *import sqlContext.implicits._* *val data = Seq(LabeledPoint(1, Vectors.zeros(10)))val dataDF = sc.parallelize(data).toDFdataDF.printSchema() dataDF.save(test1.parquet) // OKval dataRow = data.map {case LabeledPoint(l: Double, f: mllib.linalg.Vector)= Row(l,f)} val dataRowRDD = sc.parallelize(dataRow)val dataDF2 = sqlContext.createDataFrame(dataRowRDD, dataDF.schema) dataDF2.printSchema()dataDF2.saveAsParquetFile(test3.parquet) // FAIL !!! }}* On Tue, Mar 31, 2015 at 11:18 PM, Xiangrui Meng men...@gmail.com wrote: I cannot reproduce this error on master, but I'm not aware of any recent bug fixes that are related. Could you build and try the current master? -Xiangrui On Tue, Mar 31, 2015 at 4:10 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DataFrame with an user defined type (here mllib.Vector) created with sqlContex.createDataFrame can't be saved to parquet file and raise ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row error. Here is an example of code to reproduce this error : object TestDataFrame { def main(args: Array[String]): Unit = { //System.loadLibrary(Core.NATIVE_LIBRARY_NAME) val conf = new SparkConf().setAppName(RankingEval).setMaster(local[8]) .set(spark.executor.memory, 6g) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val data = sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10 val dataDF = data.toDF dataDF.save(test1.parquet) val dataDF2 = sqlContext.createDataFrame(dataDF.rdd, dataDF.schema) dataDF2.save(test2.parquet) } } Is this related to https://issues.apache.org/jira/browse/SPARK-5532 and how can it be solved ? Cheers, Jao
Re: Using 'fair' scheduler mode
I am using the Spark ‘fair’ scheduler mode. What do you mean by this? Fair scheduling mode is not one thing in Spark, but allows for multiple configurations and usages. Presumably, at a minimum you are using SparkConf to set spark.scheduling.mode to FAIR, but then how are you setting up scheduling pools, how are you allocating jobs to pools, and what scheduling mode are you using within pools? Setting spark.scheduling.mode is a necessary but probably not sufficient condition to effect your desired scheduling policy. On Tue, Mar 31, 2015 at 1:19 PM, asadrao as...@microsoft.com wrote: Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the first query is a very expensive query (ex: ‘select *’ on a really big data set) than any subsequent query seem to get blocked. I would have expected the second query to run in parallel since I am using the ‘fair’ scheduler mode not the ‘fifo’. I am submitting the query through thrift server. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: rdd.cache() not working ?
cache() method returns new RDD so you have to use something like this: val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) val cached = person.cache cached.count when you rerun count on cached you will see that cache works On Wed, Apr 1, 2015 at 9:35 AM, fightf...@163.com fightf...@163.com wrote: Hi That is just the issue. After running person.cache we then run person.count however, there still not be any cache performance showed from web ui storage. Thanks, Sun. -- fightf...@163.com *From:* Taotao.Li taotao...@datayes.com *Date:* 2015-04-01 14:02 *To:* fightfate fightf...@163.com *CC:* user user@spark.apache.org *Subject:* Re: rdd.cache() not working ? rerun person.count and you will see the performance of cache. person.cache would not cache it right now. It'll actually cache this RDD after one action[person.count here] -- *发件人: *fightf...@163.com *收件人: *user user@spark.apache.org *发送时间: *星期三, 2015年 4 月 01日 下午 1:21:25 *主题: *rdd.cache() not working ? Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) person.cache person.count -- fightf...@163.com -- *---* *Thanks Best regards* 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话|Phone:021-60216502 手机|Mobile: +86-18202171279
Re: Re: rdd.cache() not working ?
Hi Still no good luck with your guide. Best. Sun. fightf...@163.com From: Yuri Makhno Date: 2015-04-01 15:26 To: fightf...@163.com CC: Taotao.Li; user Subject: Re: Re: rdd.cache() not working ? cache() method returns new RDD so you have to use something like this: val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) val cached = person.cache cached.count when you rerun count on cached you will see that cache works On Wed, Apr 1, 2015 at 9:35 AM, fightf...@163.com fightf...@163.com wrote: Hi That is just the issue. After running person.cache we then run person.count however, there still not be any cache performance showed from web ui storage. Thanks, Sun. fightf...@163.com From: Taotao.Li Date: 2015-04-01 14:02 To: fightfate CC: user Subject: Re: rdd.cache() not working ? rerun person.count and you will see the performance of cache. person.cache would not cache it right now. It'll actually cache this RDD after one action[person.count here] 发件人: fightf...@163.com 收件人: user user@spark.apache.org 发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 主题: rdd.cache() not working ? Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com -- --- Thanks Best regards 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话|Phone:021-60216502 手机|Mobile: +86-18202171279
Re: Spark + Kafka
Please make sure that you have given more cores than Receiver numbers. From: James King Date: 2015-04-01 15:21 To: user Subject: Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 142787141 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(142787140 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.499181 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.886121 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: app-20150401065621-0007/0 is now EXITED (Command exited with code 1) 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Executor app-20150401065621-0007/0 removed: Command exited with code 1 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to remove non-existent executor 0 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message
Re: Spark + Kafka
Thanks Saisai, Sure will do. But just a quick note that when i set master as local[*] Spark started showing Kafka messages as expected, so the problem in my view was to do with not enough threads to process the incoming data. Thanks. On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Would you please share your code snippet please, so we can identify is there anything wrong in your code. Beside would you please grep your driver's debug log to see if there's any debug log about Stream xxx received block xxx, this means that Spark Streaming is keeping receiving data from sources like Kafka. 2015-04-01 16:18 GMT+08:00 James King jakwebin...@gmail.com: Thank you bit1129, From looking at the web UI i can see 2 cores Also looking at http://spark.apache.org/docs/1.2.1/configuration.html But can't see obvious configuration for number of receivers can you help please. On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com bit1...@163.com wrote: Please make sure that you have given more cores than Receiver numbers. *From:* James King jakwebin...@gmail.com *Date:* 2015-04-01 15:21 *To:* user user@spark.apache.org *Subject:* Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 142787141 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(142787140 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.499181 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.886121 ms) ReviveOffers from
Re: Disable stage logging to stdout
You can disable with spark.ui.showConsoleProgress=false but I also wasn't sure why this writes straight to System.err, at first. I assume it's because it's writing carriage returns to achieve the animation and this won't work via a logging framework. stderr is where log-like output goes, because stdout is for program output. On Wed, Apr 1, 2015 at 10:56 AM, Theodore Vasiloudis theodoros.vasilou...@gmail.com wrote: Since switching to Spark 1.2.1 I'm seeing logging for the stage progress (ex.): [error] [Stage 2154: (14 + 8) / 48][Stage 2210: (0 + 0) / 48] Any reason why these are error level logs? Shouldn't they be info level? In any case is there a way to disable them other than disabling logging completely? I would like to see my info level logs on stdout (i.e. my printouts) but not these stage progress logs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Disable-stage-logging-to-stdout-tp22336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kafka
Would you please share your code snippet please, so we can identify is there anything wrong in your code. Beside would you please grep your driver's debug log to see if there's any debug log about Stream xxx received block xxx, this means that Spark Streaming is keeping receiving data from sources like Kafka. 2015-04-01 16:18 GMT+08:00 James King jakwebin...@gmail.com: Thank you bit1129, From looking at the web UI i can see 2 cores Also looking at http://spark.apache.org/docs/1.2.1/configuration.html But can't see obvious configuration for number of receivers can you help please. On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com bit1...@163.com wrote: Please make sure that you have given more cores than Receiver numbers. *From:* James King jakwebin...@gmail.com *Date:* 2015-04-01 15:21 *To:* user user@spark.apache.org *Subject:* Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 142787141 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(142787140 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.499181 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.886121 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost :7077/user/Master#336117298] 2015-04-01 08:56:52 INFO
Disable stage logging to stdout
Since switching to Spark 1.2.1 I'm seeing logging for the stage progress (ex.): [error] [Stage 2154: (14 + 8) / 48][Stage 2210: (0 + 0) / 48] Any reason why these are error level logs? Shouldn't they be info level? In any case is there a way to disable them other than disabling logging completely? I would like to see my info level logs on stdout (i.e. my printouts) but not these stage progress logs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Disable-stage-logging-to-stdout-tp22336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark + Kafka
OK, seems there’s nothing strange from your code. So maybe we need to narrow down the cause, would you please run KafkaWordCount example in Spark to see if it is OK, if this is OK, then we should focus on your implementation, otherwise Kafka potentially has some problems. Thanks Jerry From: James King [mailto:jakwebin...@gmail.com] Sent: Wednesday, April 1, 2015 6:59 PM To: Saisai Shao Cc: bit1...@163.com; user Subject: Re: Spark + Kafka This is the code. And I couldn't find anything like the log you suggested. public KafkaLogConsumer(int duration, String master) { JavaStreamingContext spark = createSparkContext(duration, master); MapString, Integer topics = new HashMapString, Integer(); topics.put(test, 1); JavaPairDStreamString, String input = KafkaUtils.createStream(spark, somesparkhost:2181, groupid, topics); input.print(); spark.start(); spark.awaitTermination(); } private JavaStreamingContext createSparkContext(int duration, String master) { SparkConf sparkConf = new SparkConf() .setAppName(this.getClass().getSimpleName()) .setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(duration)); return ssc; } On Wed, Apr 1, 2015 at 11:37 AM, James King jakwebin...@gmail.commailto:jakwebin...@gmail.com wrote: Thanks Saisai, Sure will do. But just a quick note that when i set master as local[*] Spark started showing Kafka messages as expected, so the problem in my view was to do with not enough threads to process the incoming data. Thanks. On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao sai.sai.s...@gmail.commailto:sai.sai.s...@gmail.com wrote: Would you please share your code snippet please, so we can identify is there anything wrong in your code. Beside would you please grep your driver's debug log to see if there's any debug log about Stream xxx received block xxx, this means that Spark Streaming is keeping receiving data from sources like Kafka. 2015-04-01 16:18 GMT+08:00 James King jakwebin...@gmail.commailto:jakwebin...@gmail.com: Thank you bit1129, From looking at the web UI i can see 2 cores Also looking at http://spark.apache.org/docs/1.2.1/configuration.html But can't see obvious configuration for number of receivers can you help please. On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.commailto:bit1...@163.com bit1...@163.commailto:bit1...@163.com wrote: Please make sure that you have given more cores than Receiver numbers. From: James Kingmailto:jakwebin...@gmail.com Date: 2015-04-01 15:21 To: usermailto:user@spark.apache.org Subject: Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from
Spark 1.3.0 DataFrame and Postgres
Question: --- Is there a way to have JDBC DataFrames use quoted/escaped column names? Right now, it looks like it sees the names correctly in the schema created but does not escape them in the SQL it creates when they are not compliant: org.apache.spark.sql.jdbc.JDBCRDD private val columnList: String = { val sb = new StringBuilder() columns.foreach(x = sb.append(,).append(x)) if (sb.length == 0) 1 else sb.substring(1) } If you see value in this, I would take a shot at adding the quoting (escaping) of column names here. If you don't do it, some drivers... like postgresql's will simply drop case all names when parsing the query. As you can see in the TL;DR below that means they won't match the schema I am given. Thanks. TL;DR: I am able to connect to a Postgres database in the shell (with driver referenced): val jdbcDf = sqlContext.jdbc(jdbc:postgresql://localhost/sparkdemo?user=dbuser, sp500) In fact when I run: jdbcDf.registerTempTable(sp500) val avgEPSNamed = sqlContext.sql(SELECT AVG(`Earnings/Share`) as AvgCPI FROM sp500) and val avgEPSProg = jsonDf.agg(avg(jsonDf.col(Earnings/Share))) The values come back as expected. However, if I try: jdbcDf.show Or if I try val all = sqlContext.sql(SELECT * FROM sp500) all.show I get errors about column names not being found. In fact the error includes a mention of column names all lower cased. For now I will change my schema to be more restrictive. Right now it is, per a Stack Overflow poster, not ANSI compliant by doing things that are allowed by 's in pgsql, MySQL and SQLServer. BTW, our users are giving us tables like this... because various tools they already use support non-compliant names. In fact, this is mild compared to what we've had to support. Currently the schema in question uses mixed case, quoted names with special characters and spaces: CREATE TABLE sp500 ( Symbol text, Name text, Sector text, Price double precision, Dividend Yield double precision, Price/Earnings double precision, Earnings/Share double precision, Book Value double precision, 52 week low double precision, 52 week high double precision, Market Cap double precision, EBITDA double precision, Price/Sales double precision, Price/Book double precision, SEC Filings text ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-DataFrame-and-Postgres-tp22338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: rdd.cache() not working ?
Hi That is just the issue. After running person.cache we then run person.count however, there still not be any cache performance showed from web ui storage. Thanks, Sun. fightf...@163.com From: Taotao.Li Date: 2015-04-01 14:02 To: fightfate CC: user Subject: Re: rdd.cache() not working ? rerun person.count and you will see the performance of cache. person.cache would not cache it right now. It'll actually cache this RDD after one action[person.count here] 发件人: fightf...@163.com 收件人: user user@spark.apache.org 发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 主题: rdd.cache() not working ? Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com -- --- Thanks Best regards 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话|Phone:021-60216502 手机|Mobile: +86-18202171279
Re: Using 'fair' scheduler mode
Does the expensive query take all executor slots? Then there is nothing for any other job to use regardless of scheduling policy. On Mar 31, 2015 9:20 PM, asadrao as...@microsoft.com wrote: Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the first query is a very expensive query (ex: ‘select *’ on a really big data set) than any subsequent query seem to get blocked. I would have expected the second query to run in parallel since I am using the ‘fair’ scheduler mode not the ‘fifo’. I am submitting the query through thrift server. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: rdd.cache() not working ?
Hi all Thanks a lot for caspuring this. We are now using 1.3.0 release. We tested with both prebuilt version spark and source code compiling version targeting our CDH component, and the cache result did not show as expected. However, if we create dataframe with the person rdd and using sqlContext.cacheTable operation, we can see the cache results. Not sure what's happening here. If anyone can reproduce this issue, please let me know. Thanks, Sun fightf...@163.com From: Sean Owen Date: 2015-04-01 15:54 To: Yuri Makhno CC: fightf...@163.com; Taotao.Li; user Subject: Re: Re: rdd.cache() not working ? No, cache() changes the bookkeeping of the existing RDD. Although it returns a reference, it works to just call person.cache. I can't reproduce this. When I try to cache an RDD and then count it, it is persisted in memory and I see it in the web UI. Something else must be different about what's being executed. On Wed, Apr 1, 2015 at 8:26 AM, Yuri Makhno ymak...@gmail.com wrote: cache() method returns new RDD so you have to use something like this: val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) val cached = person.cache cached.count when you rerun count on cached you will see that cache works On Wed, Apr 1, 2015 at 9:35 AM, fightf...@163.com fightf...@163.com wrote: Hi That is just the issue. After running person.cache we then run person.count however, there still not be any cache performance showed from web ui storage. Thanks, Sun. fightf...@163.com From: Taotao.Li Date: 2015-04-01 14:02 To: fightfate CC: user Subject: Re: rdd.cache() not working ? rerun person.count and you will see the performance of cache. person.cache would not cache it right now. It'll actually cache this RDD after one action[person.count here] 发件人: fightf...@163.com 收件人: user user@spark.apache.org 发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 主题: rdd.cache() not working ? Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com -- --- Thanks Best regards 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话|Phone:021-60216502 手机|Mobile: +86-18202171279
Re: Spark + Kafka
Thank you bit1129, From looking at the web UI i can see 2 cores Also looking at http://spark.apache.org/docs/1.2.1/configuration.html But can't see obvious configuration for number of receivers can you help please. On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com bit1...@163.com wrote: Please make sure that you have given more cores than Receiver numbers. *From:* James King jakwebin...@gmail.com *Date:* 2015-04-01 15:21 *To:* user user@spark.apache.org *Subject:* Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 142787141 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(142787140 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.499181 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.886121 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost :7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: app-20150401065621-0007/0 is now EXITED (Command exited with code 1) 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Executor app-20150401065621-0007/0 removed: Command exited with code 1 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message RemoveExecutor(0,Unknown executor exit code (1)) from
Re: Creating Partitioned Parquet Tables via SparkSQL
Thanks Felix :) On Wed, Apr 1, 2015 at 00:08 Felix Cheung felixcheun...@hotmail.com wrote: This is tracked by these JIRAs.. https://issues.apache.org/jira/browse/SPARK-5947 https://issues.apache.org/jira/browse/SPARK-5948 -- From: denny.g@gmail.com Date: Wed, 1 Apr 2015 04:35:08 + Subject: Creating Partitioned Parquet Tables via SparkSQL To: user@spark.apache.org Creating Parquet tables via .saveAsTable is great but was wondering if there was an equivalent way to create partitioned parquet tables. Thanks!
Re: Spark Streaming and JMS
Hi Tathagata do you know if JMS Reciever was introduced during last year as standard Spark component or somebody is developing it? Regards Danila -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-JMS-tp5371p22337.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kafka
This is the code. And I couldn't find anything like the log you suggested. public KafkaLogConsumer(int duration, String master) { JavaStreamingContext spark = createSparkContext(duration, master); MapString, Integer topics = new HashMapString, Integer(); topics.put(test, 1); JavaPairDStreamString, String input = KafkaUtils.createStream(spark, somesparkhost:2181, groupid, topics); input.print(); spark.start(); spark.awaitTermination(); } private JavaStreamingContext createSparkContext(int duration, String master) { SparkConf sparkConf = new SparkConf() .setAppName(this.getClass().getSimpleName()) .setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(duration)); return ssc; } On Wed, Apr 1, 2015 at 11:37 AM, James King jakwebin...@gmail.com wrote: Thanks Saisai, Sure will do. But just a quick note that when i set master as local[*] Spark started showing Kafka messages as expected, so the problem in my view was to do with not enough threads to process the incoming data. Thanks. On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Would you please share your code snippet please, so we can identify is there anything wrong in your code. Beside would you please grep your driver's debug log to see if there's any debug log about Stream xxx received block xxx, this means that Spark Streaming is keeping receiving data from sources like Kafka. 2015-04-01 16:18 GMT+08:00 James King jakwebin...@gmail.com: Thank you bit1129, From looking at the web UI i can see 2 cores Also looking at http://spark.apache.org/docs/1.2.1/configuration.html But can't see obvious configuration for number of receivers can you help please. On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com bit1...@163.com wrote: Please make sure that you have given more cores than Receiver numbers. *From:* James King jakwebin...@gmail.com *Date:* 2015-04-01 15:21 *To:* user user@spark.apache.org *Subject:* Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 142787141 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(142787140 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 -
Re: Spark 1.3.0 DataFrame and Postgres
+1 on escaping column names. On Apr 1, 2015, at 5:50 AM, fergjo00 johngfergu...@gmail.com wrote: Question: --- Is there a way to have JDBC DataFrames use quoted/escaped column names? Right now, it looks like it sees the names correctly in the schema created but does not escape them in the SQL it creates when they are not compliant: org.apache.spark.sql.jdbc.JDBCRDD private val columnList: String = { val sb = new StringBuilder() columns.foreach(x = sb.append(,).append(x)) if (sb.length == 0) 1 else sb.substring(1) } If you see value in this, I would take a shot at adding the quoting (escaping) of column names here. If you don't do it, some drivers... like postgresql's will simply drop case all names when parsing the query. As you can see in the TL;DR below that means they won't match the schema I am given. Thanks. TL;DR: I am able to connect to a Postgres database in the shell (with driver referenced): val jdbcDf = sqlContext.jdbc(jdbc:postgresql://localhost/sparkdemo?user=dbuser, sp500) In fact when I run: jdbcDf.registerTempTable(sp500) val avgEPSNamed = sqlContext.sql(SELECT AVG(`Earnings/Share`) as AvgCPI FROM sp500) and val avgEPSProg = jsonDf.agg(avg(jsonDf.col(Earnings/Share))) The values come back as expected. However, if I try: jdbcDf.show Or if I try val all = sqlContext.sql(SELECT * FROM sp500) all.show I get errors about column names not being found. In fact the error includes a mention of column names all lower cased. For now I will change my schema to be more restrictive. Right now it is, per a Stack Overflow poster, not ANSI compliant by doing things that are allowed by 's in pgsql, MySQL and SQLServer. BTW, our users are giving us tables like this... because various tools they already use support non-compliant names. In fact, this is mild compared to what we've had to support. Currently the schema in question uses mixed case, quoted names with special characters and spaces: CREATE TABLE sp500 ( Symbol text, Name text, Sector text, Price double precision, Dividend Yield double precision, Price/Earnings double precision, Earnings/Share double precision, Book Value double precision, 52 week low double precision, 52 week high double precision, Market Cap double precision, EBITDA double precision, Price/Sales double precision, Price/Book double precision, SEC Filings text ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-DataFrame-and-Postgres-tp22338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Disable stage logging to stdout
Thank you Sean that does the trick. On Wed, Apr 1, 2015 at 12:05 PM, Sean Owen so...@cloudera.com wrote: You can disable with spark.ui.showConsoleProgress=false but I also wasn't sure why this writes straight to System.err, at first. I assume it's because it's writing carriage returns to achieve the animation and this won't work via a logging framework. stderr is where log-like output goes, because stdout is for program output. On Wed, Apr 1, 2015 at 10:56 AM, Theodore Vasiloudis theodoros.vasilou...@gmail.com wrote: Since switching to Spark 1.2.1 I'm seeing logging for the stage progress (ex.): [error] [Stage 2154: (14 + 8) / 48][Stage 2210: (0 + 0) / 48] Any reason why these are error level logs? Shouldn't they be info level? In any case is there a way to disable them other than disabling logging completely? I would like to see my info level logs on stdout (i.e. my printouts) but not these stage progress logs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Disable-stage-logging-to-stdout-tp22336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark throws rsync: change_dir errors on startup
Hi, I try to set up a minimal 2-node spark cluster for testing purposes. When I start the cluster with start-all.sh I get a rsync error message: rsync: change_dir /usr/local/spark130/sbin//right failed: No such file or directory (2) rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1183) [sender=3.1.0] (For clarification, my 2 nodes are called ‚right‘ and ‚left‘ referencing to the physical machines standing in front of me) It seems that a file named after my master node ‚right‘ is expected to exist and the synchronisation with it fails as it does not exist. I don’t understand what Spark is trying to do here. Why does it expect this file to exist and what content should it have? I assume I did something wrong in my configuration setup – can someone interpret this error message and has an idea where his error is coming from? Regards, Tobias
RE: --driver-memory parameter doesn't work for spark-submmit on yarn?
Hi Akhil, Thanks a lot! After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception disappeared. But this make me confused, so the driver-memory options doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it a bug? Regards, Shuai From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, April 01, 2015 2:40 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? Once you submit the job do a ps aux | grep spark-submit and see how much is the heap space allocated to the process (the -Xmx params), if you are seeing a lower value you could try increasing it yourself with: export _JAVA_OPTIONS=-Xmx5g Thanks Best Regards On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn’t allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68) Do I do anything wrong here? And no matter how much I set for --driver-memory value (from 512M to 20G), it always give me error on the same line (that line try to load a 600MB java serialization file). So looks like the script doesn’t allocate right memory to driver in my case? Regards, Shuai
Re: Spark 1.3 build with hive support fails on JLine
Please invoke dev/change-version-to-2.11.sh before running mvn. Cheers On Mon, Mar 30, 2015 at 1:02 AM, Night Wolf nightwolf...@gmail.com wrote: Hey, Trying to build Spark 1.3 with Scala 2.11 supporting yarn hive (with thrift server). Running; *mvn -e -DskipTests -Pscala-2.11 -Dscala-2.11 -Pyarn -Pmapr4 -Phive -Phive-thriftserver clean install* The build fails with; INFO] Compiling 9 Scala sources to /var/lib/jenkins/workspace/cse-Apache-Spark-YARN-2.11-1.3-build/sql/hive-thriftserver/target/scala-2.11/classes...[ERROR] /var/lib/jenkins/workspace/cse-Apache-Spark-YARN-2.11-1.3-build/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:25: object ConsoleReader is not a member of package jline[ERROR] import jline.{ConsoleReader, History}[ERROR]^[WARNING] Class jline.Completor not found - continuing with a stub.[WARNING] Class jline.ConsoleReader not found - continuing with a stub.[ERROR] /var/lib/jenkins/workspace/cse-Apache-Spark-YARN-2.11-1.3-build/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala:165: not found: type ConsoleReader[ERROR] val reader = new ConsoleReader()[ERROR] ^[ERROR] Class jline.Completor not found - continuing with a stub.[WARNING] Class com.google.protobuf.Parser not found - continuing with a stub.[WARNING] Class com.google.protobuf.Parser not found - continuing with a stub.[WARNING] Class com.google.protobuf.Parser not found - continuing with a stub.[WARNING] Class com.google.protobuf.Parser not found - continuing with a stub.[WARNING] 6 warnings found[ERROR] three errors found[INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 15.731 s] [INFO] Spark Project Networking ... SUCCESS [ 46.667 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 28.508 s] [INFO] Spark Project Core . SUCCESS [07:45 min] [INFO] Spark Project Bagel SUCCESS [01:10 min] [INFO] Spark Project GraphX ... SUCCESS [02:42 min] [INFO] Spark Project Streaming SUCCESS [03:22 min] [INFO] Spark Project Catalyst . SUCCESS [04:42 min] [INFO] Spark Project SQL .. SUCCESS [05:17 min] [INFO] Spark Project ML Library ... SUCCESS [05:36 min] [INFO] Spark Project Tools SUCCESS [ 46.976 s] [INFO] Spark Project Hive . SUCCESS [04:08 min] [INFO] Spark Project REPL . SUCCESS [01:58 min] [INFO] Spark Project YARN . SUCCESS [01:47 min] [INFO] Spark Project Hive Thrift Server ... FAILURE [ 20.731 s] [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project YARN Shuffle Service . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 41:59 min [INFO] Finished at: 2015-03-30T07:06:49+00:00 [INFO] Final Memory: 94M/868M [INFO] [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-hive-thriftserver_2.11: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile failed. CompileFailed - [Help 1]org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-hive-thriftserver_2.11: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile failed. Any ideas? Cheers, N
Size of arbitrary state managed via DStream updateStateByKey
Hi all, As I understand from docs and talks, the streaming state is in memory as RDD (optionally checkpointable to disk). SPARK-2629 hints that this in memory structure is not indexed efficiently? I am wondering how my performance would be if the streaming state does not fit in memory (say 100GB state over 10GB total RAM), and I did random updates to different keys via updateStateByKey? (Would throwing in SSDs help out). I am picturing some kind of performance degeneration would happen akin to Linux/innoDB Buffer cache thrashing. But if someone can demystify this, that would be awesome. Thanks Vinoth
Re: deployment of spark on mesos and data locality in tachyon/hdfs
Response inline. On Tue, Mar 31, 2015 at 10:41 PM, Sean Bigdatafun sean.bigdata...@gmail.com wrote: (resending...) I was thinking the same setup… But the more I think of this problem, and the more interesting this could be. If we allocate 50% total memory to Tachyon statically, then the Mesos benefits of dynamically scheduling resources go away altogether. People can still benefits from Mesos' dynamically scheduling of the rest memory as well as compute resource. Can Tachyon be resource managed by Mesos (dynamically)? Any thought or comment? This requires some integration work. Best, Haoyuan Sean Hi Haoyuan, So on each mesos slave node I should allocate/section off some amount of memory for tachyon (let's say 50% of the total memory) and the rest for regular mesos tasks? This means, on each slave node I would have tachyon worker (+ hdfs configuration to talk to s3 or the hdfs datanode) and the mesos slave ?process. Is this correct? -- --Sean -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Issue on Spark SQL insert or create table with Spark running on AWS EMR -- s3n.S3NativeFileSystem: rename never finished
Hi, we always get issues on inserting or creating table with Amazon EMR Spark version, by inserting about 1GB resultset, the spark sql query will never be finished. by inserting small resultset (like 500MB), works fine. *spark.sql.shuffle.partitions* by default 200 or *set spark.sql.shuffle.partitions=1* do not help. the log stopped at: */15/04/01 15:48:13 INFO s3n.S3NativeFileSystem: rename s3://hive-db/tmp/hive-hadoop/hive_2015-04-01_15-47-43_036_1196347178448825102-15/-ext-1 s3://hive-db/db_xxx/some_huge_table/* then only metrics.MetricsSaver logs. we set / property namehive.metastore.warehouse.dir/name values3://hive-db/value /property/ but hive.exec.scratchdir ist not set, i have no idea why the tmp files were created in /s3://hive-db/tmp/hive-hadoop// we just tried the newest Spark 1.3.0 on AMI 3.5.x and AMI 3.6 (https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/VersionInformation.md), still not work. anyone get same issue? any idea about how to fix it? i believe Amazon EMR's Spark version use com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem to access s3, but not the original hadoop s3n implementation, right? /home/hadoop/spark/classpath/emr/* and /home/hadoop/spark/classpath/emrfs/* is in classpath btw. is there any plan to use the new hadoop s3a implementation instead of s3n ? Thanks for any help. Teng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-on-Spark-SQL-insert-or-create-table-with-Spark-running-on-AWS-EMR-s3n-S3NativeFileSystem-renamd-tp22340.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: --driver-memory parameter doesn't work for spark-submmit on yarn?
I feel like I recognize that problem, and it's almost the inverse of https://issues.apache.org/jira/browse/SPARK-3884 which I was looking at today. The spark-class script didn't seem to handle all the ways that driver memory can be set. I think this is also something fixed by the new launcher library in 1.4.0. _JAVA_OPTIONS is not a good solution since it's global. On Wed, Apr 1, 2015 at 3:21 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi Akhil, Thanks a lot! After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception disappeared. But this make me confused, so the driver-memory options doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it a bug? Regards, Shuai From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, April 01, 2015 2:40 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? Once you submit the job do a ps aux | grep spark-submit and see how much is the heap space allocated to the process (the -Xmx params), if you are seeing a lower value you could try increasing it yourself with: export _JAVA_OPTIONS=-Xmx5g Thanks Best Regards On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn’t allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68) Do I do anything wrong here? And no matter how much I set for --driver-memory value (from 512M to 20G), it always give me error on the same line (that line try to load a 600MB java serialization file). So looks like the script doesn’t allocate right memory to driver in my case? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error reading smallin in hive table with parquet format
Hi. In Spark SQL 1.2.0, with HiveContext, I'm executing the following statement: CREATE TABLE testTable STORED AS PARQUET AS SELECT field1 FROM table1 *field1 is SMALLINT. If table1 is in text format all it's ok, but if table1 is in parquet format, spark returns the following error*: 15/04/01 16:48:24 ERROR TaskSetManager: Task 26 in stage 1.0 failed 1 times; aborting job Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 1.0 failed 1 times, most recent failure: Lost task 26.0 in stage 1.0 (TID 28, localhost): java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Short at org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector.get(JavaShortObjectInspector.java:41) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.createPrimitive(ParquetHiveSerDe.java:251) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.createObject(ParquetHiveSerDe.java:301) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.createStruct(ParquetHiveSerDe.java:178) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.serialize(ParquetHiveSerDe.java:164) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:123) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:114) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:114) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:93) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:93) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks! -- Regards. Miguel Ángel
Spark 1.3.0 missing dependency?
Upon executing these two lines of code: conf = new SparkConf().setAppName(appName).setMaster(master); sc = new JavaSparkContext(conf); I get the following error message: ERROR Configuration: Failed to set setXIncludeAware(true) for parser org.apache.xerces.jaxp.DocumentBuilderFactoryImpl@32b260fa:java.lang.UnsupportedOperationException: setXIncludeAware is not supported on this JAXP implementation or earlier: class org.apache.xerces.jaxp.DocumentBuilderFactoryImpl java.lang.UnsupportedOperationException: setXIncludeAware is not supported on this JAXP implementation or earlier: class org.apache.xerces.jaxp.DocumentBuilderFactoryImpl at javax.xml.parsers.DocumentBuilderFactory.setXIncludeAware(DocumentBuilderFactory.java:584) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2032) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2001) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:1918) at org.apache.hadoop.conf.Configuration.get(Configuration.java:893) at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:673) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:224) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:669) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:1996) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:1996) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:1996) at org.apache.spark.SecurityManager.init(SecurityManager.scala:207) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267) at org.apache.spark.SparkContext.init(SparkContext.scala:270) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) Is Spark missing a dependency? What's going on here? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-missing-dependency-tp22339.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.0 missing dependency?
No, it means you have a conflicting version of Xerces somewhere in your classpath. Maybe your app is bundling an old version? On Wed, Apr 1, 2015 at 4:46 PM, ARose ashley.r...@telarix.com wrote: Upon executing these two lines of code: conf = new SparkConf().setAppName(appName).setMaster(master); sc = new JavaSparkContext(conf); I get the following error message: ERROR Configuration: Failed to set setXIncludeAware(true) for parser org.apache.xerces.jaxp.DocumentBuilderFactoryImpl@32b260fa:java.lang.UnsupportedOperationException: setXIncludeAware is not supported on this JAXP implementation or earlier: class org.apache.xerces.jaxp.DocumentBuilderFactoryImpl java.lang.UnsupportedOperationException: setXIncludeAware is not supported on this JAXP implementation or earlier: class org.apache.xerces.jaxp.DocumentBuilderFactoryImpl at javax.xml.parsers.DocumentBuilderFactory.setXIncludeAware(DocumentBuilderFactory.java:584) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2032) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2001) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:1918) at org.apache.hadoop.conf.Configuration.get(Configuration.java:893) at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:673) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:224) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:669) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:1996) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:1996) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:1996) at org.apache.spark.SecurityManager.init(SecurityManager.scala:207) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267) at org.apache.spark.SparkContext.init(SparkContext.scala:270) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) Is Spark missing a dependency? What's going on here? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-missing-dependency-tp22339.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming 1.3 Kafka Direct Streams
With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Use with Data justifying Spark
Good Morning All, I would like to use Spark in a special synthetics case that justifies the use of spark. So , I am looking for a case based on data ( can be big) and eventually the associated java and/or python code. It will be fantastic if you can refer me a link where I can load this case ! Didier
pyspark hbase range scan
I am attempting to read an hbase table in pyspark with a range scan. conf = { hbase.zookeeper.quorum: host, hbase.mapreduce.inputtable: table, hbase.mapreduce.scan : scan } hbase_rdd = sc.newAPIHadoopRDD( org.apache.hadoop.hbase.mapreduce.TableInputFormat, org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result, keyConverter=keyConv, valueConverter=valueConv, conf=conf) If i jump over to scala or java and generate a base64 encoded protobuf scan object and convert it to a string, i can use that value for hbase.mapreduce.scan and everything works, the rdd will correctly perform the range scan and I am happy. The problem is that I can not find any reasonable way to generate that range scan string in python. The scala code required is: import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.client.{Delete, HBaseAdmin, HTable, Put, Result = HBaseResult, Scan} val scan = new Scan() scan.setStartRow(test_domain\0email.getBytes) scan.setStopRow(test_domain\0email~.getBytes) def scanToString(scan:Scan): String = { Base64.encodeBytes( ProtobufUtil.toScan(scan).toByteArray()) } scanToString(scan) Is there another way to perform an hbase range scan from pyspark or is that functionality something that might be supported in the future? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-hbase-range-scan-tp22348.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkR newHadoopAPIRDD
How hard would it be to expose this in some way? I ask because the current textFile and objectFile functions are obviously at some point calling out to a FileInputFormat and configuring it. Could we get a way to configure any arbitrary inputformat / outputformat?
Data locality across jobs
Hi, We are running an hourly job using Spark 1.2 on Yarn. It saves an RDD of Tuple2. At the end of day, a daily job is launched, which works on the outputs of the hourly jobs. For data locality and speed, we wish that when the daily job launches, it finds all instances of a given key at a single executor rather than fetching it from others during shuffle. Is it possible to maintain key partitioning across jobs? We can control partitioning in one job. But how do we send keys to the executors of same node manager across jobs? And while saving data to HDFS, are the blocks allocated to the same data node machine as the executor for a partition? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-across-jobs-tp22351.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark, snappy and HDFS
I'm actually running this in a separate environment to our HDFS cluster. I think I've been able to sort out the issue by copying /opt/cloudera/parcels/CDH/lib to the machine I'm running this on (I'm just using a one-worker setup at present) and adding the following to spark-env.sh: export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/home/nickt/lib/hadoop/lib/native export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/nickt/lib/hadoop/lib/native export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/home/nickt/lib/hadoop/lib/native export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/nickt/lib/hadoop/lib/snappy-java-1.0.4.1.jar I can get past the previous error. The issue now seems to be with what is being returned. import org.apache.hadoop.io._ val hdfsPath = hdfs://nost.name/path/to/folder val file = sc.sequenceFile[BytesWritable,String](hdfsPath) file.count() returns the following error: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text On Wed, Apr 1, 2015 at 7:34 PM, Xianjin YE advance...@gmail.com wrote: Do you have the same hadoop config for all nodes in your cluster(you run it in a cluster, right?)? Check the node(usually the executor) which gives the java.lang.UnsatisfiedLinkError to see whether the libsnappy.so is in the hadoop native lib path. On Thursday, April 2, 2015 at 10:22 AM, Nick Travers wrote: Thanks for the super quick response! I can read the file just fine in hadoop, it's just when I point Spark at this file it can't seem to read it due to the missing snappy jars / so's. I'l paying around with adding some things to spark-env.sh file, but still nothing. On Wed, Apr 1, 2015 at 7:19 PM, Xianjin YE advance...@gmail.com wrote: Can you read snappy compressed file in hdfs? Looks like the libsnappy.so is not in the hadoop native lib path. On Thursday, April 2, 2015 at 10:13 AM, Nick Travers wrote: Has anyone else encountered the following error when trying to read a snappy compressed sequence file from HDFS? *java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z* The following works for me when the file is uncompressed: import org.apache.hadoop.io._ val hdfsPath = hdfs://nost.name/path/to/folder val file = sc.sequenceFile[BytesWritable,String](hdfsPath) file.count() but fails when the encoding is Snappy. I've seen some stuff floating around on the web about having to explicitly enable support for Snappy in spark, but it doesn't seem to work for me: http://www.ericlin.me/enabling-snappy-support-for-sharkspark http://www.ericlin.me/enabling-snappy-support-for-sharkspark -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-snappy-and-HDFS-tp22349.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Streaming anomaly detection using ARIMA
I'm curious - I'm not sure if I understand you correctly. With SparkR, the work is distributed in Spark and computed in R, isn't that what your are looking for? SparkR was on rJava for the R-JVM but moved away from it. rJava has a component called JRI which allows JVM to call R. You could call R with JRI or through rdd.forEachPartition(pass_data_to_R) or rdd.pipe From: cjno...@gmail.com Date: Wed, 1 Apr 2015 19:31:48 -0400 Subject: Re: Streaming anomaly detection using ARIMA To: user@spark.apache.org Surprised I haven't gotten any responses about this. Has anyone tried using rJava or FastR w/ Spark? I've seen the SparkR project but thta goes the other way- what I'd like to do is use R for model calculation and Spark to distribute the load across the cluster. Also, has anyone used Scalation for ARIMA models? On Mon, Mar 30, 2015 at 9:30 AM, Corey Nolet cjno...@gmail.com wrote: Taking out the complexity of the ARIMA models to simplify things- I can't seem to find a good way to represent even standard moving averages in spark streaming. Perhaps it's my ignorance with the micro-batched style of the DStreams API. On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote: I want to use ARIMA for a predictive model so that I can take time series data (metrics) and perform a light anomaly detection. The time series data is going to be bucketed to different time units (several minutes within several hours, several hours within several days, several days within several years. I want to do the algorithm in Spark Streaming. I'm used to tuple at a time streaming and I'm having a tad bit of trouble gaining insight into how exactly the windows are managed inside of DStreams. Let's say I have a simple dataset that is marked by a key/value tuple where the key is the name of the component who's metrics I want to run the algorithm against and the value is a metric (a value representing a sum for the time bucket. I want to create histograms of the time series data for each key in the windows in which they reside so I can use that histogram vector to generate my ARIMA prediction (actually, it seems like this doesn't just apply to ARIMA but could apply to any sliding average). I *think* my prediction code may look something like this: val predictionAverages = dstream .groupByKeyAndWindow(60*60*24, 60*60*24) .mapValues(applyARIMAFunction) That is, keep 24 hours worth of metrics in each window and use that for the ARIMA prediction. The part I'm struggling with is how to join together the actual values so that i can do my comparison against the prediction model. Let's say dstream contains the actual values. For any time window, I should be able to take a previous set of windows and use model to compare against the current values.
Re: Spark SQL saveAsParquet failed after a few waves
I have 7 workers for spark and set SPARK_WORKER_CORES=12, therefore 84 tasks in one job can run simultaneously, I call the tasks in a job started almost simultaneously a wave. While inserting, there is only one job on spark, not inserting from multiple programs concurrently. — Best Regards! Yijie Shen On April 2, 2015 at 2:05:31 AM, Michael Armbrust (mich...@databricks.com) wrote: When few waves (1 or 2) are used in a job, LoadApp could finish after a few failures and retries. But when more waves (3) are involved in a job, the job would terminate abnormally. Can you clarify what you mean by waves? Are you inserting from multiple programs concurrently?
Re: Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs
Hi, Thanks Sandy. Another way to look at this is that would we like to have our long running application to die? So let's say, we create a window of around 10 batches, and we are using incremental kind of operations inside our application, as restart here is a relatively more costlier, so should it be the maximum number of executor failure's kind of criteria to fail the application or should we have some parameters around minimum number of executor's availability for some x time? So, if the application is not able to have minimum n number of executors within x period of time, then we should fail the application. Adding time factor here, will allow some window for spark to get more executors allocated if some of them fails. Thoughts please. Thanks, Twinkle On Wed, Apr 1, 2015 at 10:19 PM, Sandy Ryza sandy.r...@cloudera.com wrote: That's a good question, Twinkle. One solution could be to allow a maximum number of failures within any given time span. E.g. a max failures per hour property. -Sandy On Tue, Mar 31, 2015 at 11:52 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In spark over YARN, there is a property spark.yarn.max.executor.failures which controls the maximum number of executor's failure an application will survive. If number of executor's failures ( due to any reason like OOM or machine failure etc ), exceeds this value then applications quits. For small duration spark job, this looks fine, but for the long running jobs as this does not take into account the duration, this can lead to same treatment for two different scenarios ( mentioned below) : 1. executors failing with in 5 mins. 2. executors failing sparsely, but at some point even a single executor failure ( which application could have survived ) can make the application quit. Sending it to the community to listen what kind of behaviour / strategy they think will be suitable for long running spark jobs or spark streaming jobs. Thanks and Regards, Twinkle
Re: Spark, snappy and HDFS
Can you read snappy compressed file in hdfs? Looks like the libsnappy.so is not in the hadoop native lib path. On Thursday, April 2, 2015 at 10:13 AM, Nick Travers wrote: Has anyone else encountered the following error when trying to read a snappy compressed sequence file from HDFS? *java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z* The following works for me when the file is uncompressed: import org.apache.hadoop.io._ val hdfsPath = hdfs://nost.name/path/to/folder (http://nost.name/path/to/folder) val file = sc.sequenceFile[BytesWritable,String](hdfsPath) file.count() but fails when the encoding is Snappy. I've seen some stuff floating around on the web about having to explicitly enable support for Snappy in spark, but it doesn't seem to work for me: http://www.ericlin.me/enabling-snappy-support-for-sharkspark http://www.ericlin.me/enabling-snappy-support-for-sharkspark -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-snappy-and-HDFS-tp22349.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
RE: Can I call aggregate UDF in DataFrame?
Great! Thank you! From: Reynold Xin [mailto:r...@databricks.com] Sent: Thursday, April 02, 2015 8:11 AM To: Haopu Wang Cc: user; d...@spark.apache.org Subject: Re: Can I call aggregate UDF in DataFrame? You totally can. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/ apache/spark/sql/DataFrame.scala#L792 There is also an attempt at adding stddev here already: https://github.com/apache/spark/pull/5228 On Thu, Mar 26, 2015 at 12:37 AM, Haopu Wang hw...@qilinsoft.com wrote: Specifically there are only 5 aggregate functions in class org.apache.spark.sql.GroupedData: sum/max/min/mean/count. Can I plugin a function to calculate stddev? Thank you! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can I call aggregate UDF in DataFrame?
You totally can. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L792 There is also an attempt at adding stddev here already: https://github.com/apache/spark/pull/5228 On Thu, Mar 26, 2015 at 12:37 AM, Haopu Wang hw...@qilinsoft.com wrote: Specifically there are only 5 aggregate functions in class org.apache.spark.sql.GroupedData: sum/max/min/mean/count. Can I plugin a function to calculate stddev? Thank you! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Quick GraphX gutcheck
hi, Yes, you're right. Original VertexIds are used to join them in VeretexRDD#xxxJoin. On Thu, Apr 2, 2015 at 7:31 AM, hokiegeek2 soozandjohny...@gmail.com wrote: Hi Everyone, Quick (hopefully) and silly (likely) question--the VertexId can be used to join the VertexRDD generated from Graph.vertices with a transformed RDD where the keys are vertexIds from the original graph, correct? --John -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Quick-GraphX-gutcheck-tp22345.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- --- Takeshi Yamamuro
Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0
Ravi, we just merged https://issues.apache.org/jira/browse/SPARK-6642 and used the same lambda scaling as in 1.2. The change will be included in Spark 1.3.1, which will be released soon. Thanks for reporting this issue! -Xiangrui On Tue, Mar 31, 2015 at 8:53 PM, Xiangrui Meng men...@gmail.com wrote: I created a JIRA for this: https://issues.apache.org/jira/browse/SPARK-6637. Since we don't have a clear answer about how the scaling should be handled. Maybe the best solution for now is to switch back to the 1.2 scaling. -Xiangrui On Tue, Mar 31, 2015 at 2:50 PM, Sean Owen so...@cloudera.com wrote: Ah yeah I take your point. The squared error term is over the whole user-item matrix, technically, in the implicit case. I suppose I am used to assuming that the 0 terms in this matrix are weighted so much less (because alpha is usually large-ish) that they're almost not there, but they are. So I had just used the explicit formulation. I suppose the result is kind of scale invariant, but not exactly. I had not prioritized this property since I had generally built models on the full data set and not a sample, and had assumed that lambda would need to be retuned over time as the input grew anyway. So, basically I don't know anything more than you do, sorry! On Tue, Mar 31, 2015 at 10:41 PM, Xiangrui Meng men...@gmail.com wrote: Hey Sean, That is true for explicit model, but not for implicit. The ALS-WR paper doesn't cover the implicit model. In implicit formulation, a sub-problem (for v_j) is: min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2 This is a sum for all i but not just the users who rate item j. In this case, if we set X=m_j, the number of observed ratings for item j, it is not really scale invariant. We have #users user vectors in the least squares problem but only penalize lambda * #ratings. I was suggesting using lambda * m directly for implicit model to match the number of vectors in the least squares problem. Well, this is my theory. I don't find any public work about it. Best, Xiangrui On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen so...@cloudera.com wrote: I had always understood the formulation to be the first option you describe. Lambda is scaled by the number of items the user has rated / interacted with. I think the goal is to avoid fitting the tastes of prolific users disproportionately just because they have many ratings to fit. This is what's described in the ALS-WR paper we link to on the Spark web site, in equation 5 (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf) I think this also gets you the scale-invariance? For every additional rating from user i to product j, you add one new term to the squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the regularization term by lambda * (|u_i|^2 + |m_j|^2) They are at least both increasing about linearly as ratings increase. If the regularization term is multiplied by the total number of users and products in the model, then it's fixed. I might misunderstand you and/or be speaking about something slightly different when it comes to invariance. But FWIW I had always understood the regularization to be multiplied by the number of explicit ratings. On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng men...@gmail.com wrote: Okay, I didn't realize that I changed the behavior of lambda in 1.3. to make it scale-invariant, but it is worth discussing whether this is a good change. In 1.2, we multiply lambda by the number ratings in each sub-problem. This makes it scale-invariant for explicit feedback. However, in implicit feedback model, a user's sub-problem contains all item factors. Then the question is whether we should multiply lambda by the number of explicit ratings from this user or by the total number of items. We used the former in 1.2 but changed to the latter in 1.3. So you should try a smaller lambda to get a similar result in 1.3. Sean and Shuo, which approach do you prefer? Do you know any existing work discussing this? Best, Xiangrui On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng men...@gmail.com wrote: This sounds like a bug ... Did you try a different lambda? It would be great if you can share your dataset or re-produce this issue on the public dataset. Thanks! -Xiangrui On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody rmody...@gmail.com wrote: After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly smaller factors (and hence scores). For example, the first few product's factor values in 1.2.0 are (0.04821, -0.00674, -0.0325). In 1.3.0, the first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This difference of several orders of magnitude is consistent throughout both user and product. The recommendations from 1.2.0 are subjectively much better than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less memory. My first
Re: Spark SQL does not read from cached table if table is renamed
This is fixed in Spark 1.3. https://issues.apache.org/jira/browse/SPARK-5195 On Wed, Apr 1, 2015 at 4:05 PM, Judy Nash judyn...@exchange.microsoft.com wrote: Hi all, Noticed a bug in my current version of Spark 1.2.1. After a table is cached with “cache table table” command, query will not read from memory if SQL query renames the table. This query reads from in memory table i.e. select hivesampletable.country from default.hivesampletable group by hivesampletable.country This query with renamed table reads from hive i.e. select table.country from default.hivesampletable table group by table.country Is this a known bug? Most BI tools rename tables to avoid table name collision. Thanks, Judy
Re: Streaming anomaly detection using ARIMA
Surprised I haven't gotten any responses about this. Has anyone tried using rJava or FastR w/ Spark? I've seen the SparkR project but thta goes the other way- what I'd like to do is use R for model calculation and Spark to distribute the load across the cluster. Also, has anyone used Scalation for ARIMA models? On Mon, Mar 30, 2015 at 9:30 AM, Corey Nolet cjno...@gmail.com wrote: Taking out the complexity of the ARIMA models to simplify things- I can't seem to find a good way to represent even standard moving averages in spark streaming. Perhaps it's my ignorance with the micro-batched style of the DStreams API. On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote: I want to use ARIMA for a predictive model so that I can take time series data (metrics) and perform a light anomaly detection. The time series data is going to be bucketed to different time units (several minutes within several hours, several hours within several days, several days within several years. I want to do the algorithm in Spark Streaming. I'm used to tuple at a time streaming and I'm having a tad bit of trouble gaining insight into how exactly the windows are managed inside of DStreams. Let's say I have a simple dataset that is marked by a key/value tuple where the key is the name of the component who's metrics I want to run the algorithm against and the value is a metric (a value representing a sum for the time bucket. I want to create histograms of the time series data for each key in the windows in which they reside so I can use that histogram vector to generate my ARIMA prediction (actually, it seems like this doesn't just apply to ARIMA but could apply to any sliding average). I *think* my prediction code may look something like this: val predictionAverages = dstream .groupByKeyAndWindow(60*60*24, 60*60*24) .mapValues(applyARIMAFunction) That is, keep 24 hours worth of metrics in each window and use that for the ARIMA prediction. The part I'm struggling with is how to join together the actual values so that i can do my comparison against the prediction model. Let's say dstream contains the actual values. For any time window, I should be able to take a previous set of windows and use model to compare against the current values.
Re: pyspark hbase range scan
Have you looked at http://happybase.readthedocs.org/en/latest/ ? Cheers On Apr 1, 2015, at 4:50 PM, Eric Kimbrel eric.kimb...@soteradefense.com wrote: I am attempting to read an hbase table in pyspark with a range scan. conf = { hbase.zookeeper.quorum: host, hbase.mapreduce.inputtable: table, hbase.mapreduce.scan : scan } hbase_rdd = sc.newAPIHadoopRDD( org.apache.hadoop.hbase.mapreduce.TableInputFormat, org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result, keyConverter=keyConv, valueConverter=valueConv, conf=conf) If i jump over to scala or java and generate a base64 encoded protobuf scan object and convert it to a string, i can use that value for hbase.mapreduce.scan and everything works, the rdd will correctly perform the range scan and I am happy. The problem is that I can not find any reasonable way to generate that range scan string in python. The scala code required is: import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.client.{Delete, HBaseAdmin, HTable, Put, Result = HBaseResult, Scan} val scan = new Scan() scan.setStartRow(test_domain\0email.getBytes) scan.setStopRow(test_domain\0email~.getBytes) def scanToString(scan:Scan): String = { Base64.encodeBytes( ProtobufUtil.toScan(scan).toByteArray()) } scanToString(scan) Is there another way to perform an hbase range scan from pyspark or is that functionality something that might be supported in the future? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-hbase-range-scan-tp22348.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark, snappy and HDFS
Has anyone else encountered the following error when trying to read a snappy compressed sequence file from HDFS? *java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z* The following works for me when the file is uncompressed: import org.apache.hadoop.io._ val hdfsPath = hdfs://nost.name/path/to/folder val file = sc.sequenceFile[BytesWritable,String](hdfsPath) file.count() but fails when the encoding is Snappy. I've seen some stuff floating around on the web about having to explicitly enable support for Snappy in spark, but it doesn't seem to work for me: http://www.ericlin.me/enabling-snappy-support-for-sharkspark http://www.ericlin.me/enabling-snappy-support-for-sharkspark -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-snappy-and-HDFS-tp22349.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark, snappy and HDFS
Thanks for the super quick response! I can read the file just fine in hadoop, it's just when I point Spark at this file it can't seem to read it due to the missing snappy jars / so's. I'l paying around with adding some things to spark-env.sh file, but still nothing. On Wed, Apr 1, 2015 at 7:19 PM, Xianjin YE advance...@gmail.com wrote: Can you read snappy compressed file in hdfs? Looks like the libsnappy.so is not in the hadoop native lib path. On Thursday, April 2, 2015 at 10:13 AM, Nick Travers wrote: Has anyone else encountered the following error when trying to read a snappy compressed sequence file from HDFS? *java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z* The following works for me when the file is uncompressed: import org.apache.hadoop.io._ val hdfsPath = hdfs://nost.name/path/to/folder val file = sc.sequenceFile[BytesWritable,String](hdfsPath) file.count() but fails when the encoding is Snappy. I've seen some stuff floating around on the web about having to explicitly enable support for Snappy in spark, but it doesn't seem to work for me: http://www.ericlin.me/enabling-snappy-support-for-sharkspark http://www.ericlin.me/enabling-snappy-support-for-sharkspark -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-snappy-and-HDFS-tp22349.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark throws rsync: change_dir errors on startup
Error 23 is defined as a partial transfer and might be caused by filesystem incompatibilities, such as different character sets or access control lists. In this case it could be caused by the double slashes (// at the end of sbin), You could try editing your sbin/spark-daemon.sh file, look for rsync inside the file, add -v along with that command to see what exactly i going wrong. Thanks Best Regards On Wed, Apr 1, 2015 at 7:25 PM, Horsmann, Tobias tobias.horsm...@uni-due.de wrote: Hi, I try to set up a minimal 2-node spark cluster for testing purposes. When I start the cluster with start-all.sh I get a rsync error message: rsync: change_dir /usr/local/spark130/sbin//right failed: No such file or directory (2) rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1183) [sender=3.1.0] (For clarification, my 2 nodes are called ‚right‘ and ‚left‘ referencing to the physical machines standing in front of me) It seems that a file named after my master node ‚right‘ is expected to exist and the synchronisation with it fails as it does not exist. I don’t understand what Spark is trying to do here. Why does it expect this file to exist and what content should it have? I assume I did something wrong in my configuration setup – can someone interpret this error message and has an idea where his error is coming from? Regards, Tobias
Latest enhancement in Low Level Receiver based Kafka Consumer
Hi, Just to let you know, I have made some enhancement in Low Level Reliable Receiver based Kafka Consumer ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . Earlier version uses as many Receiver task for number of partitions of your kafka topic . Now you can configure desired number of Receivers task and every Receiver can handle subset of topic partitions. There was some use cases where consumer need to handle gigantic topics ( having 100+ partitions ) and using my receiver creates that many Receiver task and hence that many CPU cores is needed just for Receiver. It was a issue . In latest code, I have changed that behavior. The max limit for number of Receiver is still your number of partition, but if you specify less number of Receiver task, every receiver will handle a subset of partitions and consume using Kafka Low Level consumer API. Every receiver will manages partition(s) offset in ZK as usual way.. You can see the latest consumer here : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu
Re: Unable to run Spark application
Try sbt assembly instead. On Wed, Apr 1, 2015 at 10:09 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Why do I get Failed to find Spark assembly JAR. You need to build Spark before running this program. ? I downloaded spark-1.2.1.tgz from the downloads page and extracted it. When I do sbt package inside my application, it worked fine. But when I try to run my application, I get the above mentioned error. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext setConf seems not stable
Can you open a JIRA please? On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren inv...@gmail.com wrote: Hi, I find HiveContext.setConf does not work correctly. Here are some code snippets showing the problem: snippet 1: import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Main extends App { val conf = new SparkConf() .setAppName(context-test) .setMaster(local[8]) val sc = new SparkContext(conf) val hc = new HiveContext(sc) *hc.setConf(spark.sql.shuffle.partitions, 10)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println } *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) (spark.sql.shuffle.partitions,10) snippet 2: ... *hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(spark.sql.shuffle.partitions, 10)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/user/hive/warehouse) (spark.sql.shuffle.partitions,10) *You can see that I just permuted the two setConf call, then that leads to two different Hive configuration.* *It seems that HiveContext can not set a new value on hive.metastore.warehouse.dir key in one or the first setConf call.* *You need another setConf call before changing hive.metastore.warehouse.dir. For example, set hive.metastore.warehouse.dir twice and the snippet 1* snippet 3: ... * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) *You can reproduce this if you move to the latest branch-1.3 (1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)* *I have also tested the released 1.3.0 (htag = 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.* *Please tell me if I am missing something. Any help is highly appreciated.* Hao -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France
Re: SparkSQL - Caching RDDs
What do you mean by permanently. If you start up the JDBC server and say CACHE TABLE it will stay cached as long as the server is running. CACHE TABLE is idempotent, so you could even just have that command in your BI tools setup queries. On Wed, Apr 1, 2015 at 11:02 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote: I am trying to integrate SparkSQL with a BI tool. My requirement is to query a Hive table very frequently from the BI tool. Is there a way to cache the Hive Table permanently in SparkSQL? I don't want to read the Hive table and cache it everytime the query is submitted from BI tool. Thanks! Regards, Venkat Ankam This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
Re: Spark Streaming 1.3 Kafka Direct Streams
Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: How to specify the port for AM Actor ...
Filed https://issues.apache.org/jira/browse/SPARK-6653 On Sun, Mar 29, 2015 at 8:18 PM, Shixiong Zhu zsxw...@gmail.com wrote: LGTM. Could you open a JIRA and send a PR? Thanks. Best Regards, Shixiong Zhu 2015-03-28 7:14 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: I looked @ the 1.3.0 code and figured where this can be added In org.apache.spark.deploy.yarn ApplicationMaster.scala:282 is actorSystem = AkkaUtils.createActorSystem(sparkYarnAM, Utils.localHostName, 0, conf = sparkConf, securityManager = securityMgr)._1 If I change it to below, then I can start it on the port I want. val port = sparkConf.getInt(spark.am.actor.port, 0) // New property ... actorSystem = AkkaUtils.createActorSystem(sparkYarnAM, Utils.localHostName, port, conf = sparkConf, securityManager = securityMgr)._1 Thoughts? Any other place where any change is needed? On Wed, Mar 25, 2015 at 4:44 PM, Shixiong Zhu zsxw...@gmail.com wrote: There is no configuration for it now. Best Regards, Shixiong Zhu 2015-03-26 7:13 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: There may be firewall rules limiting the ports between host running spark and the hadoop cluster. In that case, not all ports are allowed. Can it be a range of ports that can be specified ? On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote: It's a random port to avoid port conflicts, since multiple AMs can run in the same machine. Why do you need a fixed port? Best Regards, Shixiong Zhu 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: Spark 1.3, Hadoop 2.5, Kerbeors When running spark-shell in yarn client mode, it shows following message with a random port every time (44071 in example below). Is there a way to specify that port to a specific port ? It does not seem to be part of ports specified in http://spark.apache.org/docs/latest/configuration.html spark.xxx.port ... Thanks, 15/03/25 22:27:10 INFO Client: Application report for application_1427316153428_0014 (state: ACCEPTED) 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@xyz :44071/user/YarnAM#-1989273896]
Re: Spark Streaming 1.3 Kafka Direct Streams
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: Error reading smallin in hive table with parquet format
Can you try with Spark 1.3? Much of this code path has been rewritten / improved in this version. On Wed, Apr 1, 2015 at 7:53 AM, Masf masfwo...@gmail.com wrote: Hi. In Spark SQL 1.2.0, with HiveContext, I'm executing the following statement: CREATE TABLE testTable STORED AS PARQUET AS SELECT field1 FROM table1 *field1 is SMALLINT. If table1 is in text format all it's ok, but if table1 is in parquet format, spark returns the following error*: 15/04/01 16:48:24 ERROR TaskSetManager: Task 26 in stage 1.0 failed 1 times; aborting job Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 1.0 failed 1 times, most recent failure: Lost task 26.0 in stage 1.0 (TID 28, localhost): java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Short at org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector.get(JavaShortObjectInspector.java:41) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.createPrimitive(ParquetHiveSerDe.java:251) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.createObject(ParquetHiveSerDe.java:301) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.createStruct(ParquetHiveSerDe.java:178) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.serialize(ParquetHiveSerDe.java:164) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:123) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:114) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:114) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:93) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:93) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks! -- Regards. Miguel Ángel
Re: Use with Data justifying Spark
Maybe check the examples? http://spark.apache.org/examples.html Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Wed, Apr 1, 2015 at 8:31 PM, Vila, Didier didier.v...@teradata.com wrote: Good Morning All, I would like to use Spark in a special “synthetics” case that justifies the use of spark. So , I am looking for a case based on data ( can be big) and eventually the associated java and/or python code. It will be fantastic if you can refer me a link where I can load this case ! Didier
Re: Spark Streaming 1.3 Kafka Direct Streams
If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without building your own spark. On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote: Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs
That's a good question, Twinkle. One solution could be to allow a maximum number of failures within any given time span. E.g. a max failures per hour property. -Sandy On Tue, Mar 31, 2015 at 11:52 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In spark over YARN, there is a property spark.yarn.max.executor.failures which controls the maximum number of executor's failure an application will survive. If number of executor's failures ( due to any reason like OOM or machine failure etc ), increases this value then applications quits. For small duration spark job, this looks fine, but for the long running jobs as this does not take into account the duration, this can lead to same treatment for two different scenarios ( mentioned below) : 1. executors failing with in 5 mins. 2. executors failing sparsely, but at some point even a single executor failure ( which application could have survived ) can make the application quit. Sending it to the community to listen what kind of behaviour / strategy they think will be suitable for long running spark jobs or spark streaming jobs. Thanks and Regards, Twinkle
Re: Latest enhancement in Low Level Receiver based Kafka Consumer
Hi Dibyendu, Thanks for your work on this project. Spark 1.3 now has direct kafka streams, but still does not provide enough control over partitions and topics. For example, the streams are fairly statically configured - RDD.getPartitions() is computed only once, thus making it difficult to use in a SaaS environment where topics are created and deactivated on the fly (one topic per customer, for example). But its easy to build a wrapper around your receivers. May be there is a play where one can club direct streams with your receivers, but I don't quite fully understand how the 1.3 direct streams work yet Another thread - Kafka 0.8.2 supports non ZK offset management , which I think is more scalable than bombarding ZK. I'm working on supporting the new offset management strategy for Kafka with kafka-spark-consumer. Thanks! -neelesh On Wed, Apr 1, 2015 at 9:49 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Just to let you know, I have made some enhancement in Low Level Reliable Receiver based Kafka Consumer ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . Earlier version uses as many Receiver task for number of partitions of your kafka topic . Now you can configure desired number of Receivers task and every Receiver can handle subset of topic partitions. There was some use cases where consumer need to handle gigantic topics ( having 100+ partitions ) and using my receiver creates that many Receiver task and hence that many CPU cores is needed just for Receiver. It was a issue . In latest code, I have changed that behavior. The max limit for number of Receiver is still your number of partition, but if you specify less number of Receiver task, every receiver will handle a subset of partitions and consume using Kafka Low Level consumer API. Every receiver will manages partition(s) offset in ZK as usual way.. You can see the latest consumer here : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu
Re: Unable to run Spark application
That is failing too, with sbt.resolveexception: unresolved dependency:org.apache.spark#spark-network-common_2.10;1.2.1 On Wed, Apr 1, 2015 at 1:24 PM, Marcelo Vanzin van...@cloudera.com wrote: Try sbt assembly instead. On Wed, Apr 1, 2015 at 10:09 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Why do I get Failed to find Spark assembly JAR. You need to build Spark before running this program. ? I downloaded spark-1.2.1.tgz from the downloads page and extracted it. When I do sbt package inside my application, it worked fine. But when I try to run my application, I get the above mentioned error. -- Marcelo
HiveContext setConf seems not stable
Hi, I find HiveContext.setConf does not work correctly. Here are some code snippets showing the problem: snippet 1: import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Main extends App { val conf = new SparkConf() .setAppName(context-test) .setMaster(local[8]) val sc = new SparkContext(conf) val hc = new HiveContext(sc) *hc.setConf(spark.sql.shuffle.partitions, 10)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println } *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) (spark.sql.shuffle.partitions,10) snippet 2: ... *hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(spark.sql.shuffle.partitions, 10)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/user/hive/warehouse) (spark.sql.shuffle.partitions,10) *You can see that I just permuted the two setConf call, then that leads to two different Hive configuration.* *It seems that HiveContext can not set a new value on hive.metastore.warehouse.dir key in one or the first setConf call.* *You need another setConf call before changing hive.metastore.warehouse.dir. For example, set hive.metastore.warehouse.dir twice and the snippet 1* snippet 3: ... * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* * hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)* hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println ... *Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) *You can reproduce this if you move to the latest branch-1.3 (1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)* *I have also tested the released 1.3.0 (htag = 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.* *Please tell me if I am missing something. Any help is highly appreciated.* Hao -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France
Re: Spark 1.3.0 DataFrame and Postgres
Can you open a JIRA for this please? On Wed, Apr 1, 2015 at 6:14 AM, Ted Yu yuzhih...@gmail.com wrote: +1 on escaping column names. On Apr 1, 2015, at 5:50 AM, fergjo00 johngfergu...@gmail.com wrote: Question: --- Is there a way to have JDBC DataFrames use quoted/escaped column names? Right now, it looks like it sees the names correctly in the schema created but does not escape them in the SQL it creates when they are not compliant: org.apache.spark.sql.jdbc.JDBCRDD private val columnList: String = { val sb = new StringBuilder() columns.foreach(x = sb.append(,).append(x)) if (sb.length == 0) 1 else sb.substring(1) } If you see value in this, I would take a shot at adding the quoting (escaping) of column names here. If you don't do it, some drivers... like postgresql's will simply drop case all names when parsing the query. As you can see in the TL;DR below that means they won't match the schema I am given. Thanks. TL;DR: I am able to connect to a Postgres database in the shell (with driver referenced): val jdbcDf = sqlContext.jdbc(jdbc:postgresql://localhost/sparkdemo?user=dbuser, sp500) In fact when I run: jdbcDf.registerTempTable(sp500) val avgEPSNamed = sqlContext.sql(SELECT AVG(`Earnings/Share`) as AvgCPI FROM sp500) and val avgEPSProg = jsonDf.agg(avg(jsonDf.col(Earnings/Share))) The values come back as expected. However, if I try: jdbcDf.show Or if I try val all = sqlContext.sql(SELECT * FROM sp500) all.show I get errors about column names not being found. In fact the error includes a mention of column names all lower cased. For now I will change my schema to be more restrictive. Right now it is, per a Stack Overflow poster, not ANSI compliant by doing things that are allowed by 's in pgsql, MySQL and SQLServer. BTW, our users are giving us tables like this... because various tools they already use support non-compliant names. In fact, this is mild compared to what we've had to support. Currently the schema in question uses mixed case, quoted names with special characters and spaces: CREATE TABLE sp500 ( Symbol text, Name text, Sector text, Price double precision, Dividend Yield double precision, Price/Earnings double precision, Earnings/Share double precision, Book Value double precision, 52 week low double precision, 52 week high double precision, Market Cap double precision, EBITDA double precision, Price/Sales double precision, Price/Book double precision, SEC Filings text ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-DataFrame-and-Postgres-tp22338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: --driver-memory parameter doesn't work for spark-submmit on yarn?
Nice. But when my case shows that even I use Yarn-Client, I have same issue. I do verify it several times. And I am running 1.3.0 on EMR (use the version dispatch by installSpark script from AWS). I agree _JAVA_OPTIONS is not a right solution, but I will use it until 1.4.0 out :) Regards, Shuai -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 01, 2015 10:51 AM To: Shuai Zheng Cc: Akhil Das; user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? I feel like I recognize that problem, and it's almost the inverse of https://issues.apache.org/jira/browse/SPARK-3884 which I was looking at today. The spark-class script didn't seem to handle all the ways that driver memory can be set. I think this is also something fixed by the new launcher library in 1.4.0. _JAVA_OPTIONS is not a good solution since it's global. On Wed, Apr 1, 2015 at 3:21 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi Akhil, Thanks a lot! After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception disappeared. But this make me confused, so the driver-memory options doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it a bug? Regards, Shuai From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, April 01, 2015 2:40 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? Once you submit the job do a ps aux | grep spark-submit and see how much is the heap space allocated to the process (the -Xmx params), if you are seeing a lower value you could try increasing it yourself with: export _JAVA_OPTIONS=-Xmx5g Thanks Best Regards On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn’t allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 0) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 98) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68) Do I do anything wrong here? And no matter how much I set for --driver-memory value (from 512M to 20G), it always give me error on the same line (that line try to load a 600MB java serialization file). So looks like the script doesn’t allocate right memory to driver in my case? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL - Caching RDDs
I am trying to integrate SparkSQL with a BI tool. My requirement is to query a Hive table very frequently from the BI tool. Is there a way to cache the Hive Table permanently in SparkSQL? I don't want to read the Hive table and cache it everytime the query is submitted from BI tool. Thanks! Regards, Venkat Ankam This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
Re: Broadcasting a parquet file using spark and python
You will need to create a hive parquet table that points to the data and run ANALYZE TABLE tableName noscan so that we have statistics on the size. On Tue, Mar 31, 2015 at 9:36 PM, Jitesh chandra Mishra jitesh...@gmail.com wrote: Hi Michael, Thanks for your response. I am running 1.2.1. Is there any workaround to achieve the same with 1.2.1? Thanks, Jitesh On Wed, Apr 1, 2015 at 12:25 AM, Michael Armbrust mich...@databricks.com wrote: In Spark 1.3 I would expect this to happen automatically when the parquet table is small ( 10mb, configurable with spark.sql.autoBroadcastJoinThreshold). If you are running 1.3 and not seeing this, can you show the code you are using to create the table? On Tue, Mar 31, 2015 at 3:25 AM, jitesh129 jitesh...@gmail.com wrote: How can we implement a BroadcastHashJoin for spark with python? My SparkSQL inner joins are taking a lot of time since it is performing ShuffledHashJoin. Tables on which join is performed are stored as parquet files. Please help. Thanks and regards, Jitesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
persist(MEMORY_ONLY) takes lot of time
Hi Experts, I have a parquet dataset of 550 MB ( 9 Blocks) in HDFS. I want to run SQL queries repetitively. Few questions : 1. When I do the below (persist to memory after reading from disk), it takes lot of time to persist to memory, any suggestions of how to tune this? val inputP = sqlContext.parquetFile(some HDFS path) inputP.registerTempTable(sample_table) inputP.persist(MEMORY_ONLY) val result = sqlContext.sql(some sql query) result.count Note : Once the data is persisted to memory, it takes fraction of seconds to return query result from the second query onwards. So my concern is how to reduce the time when the data is first loaded to cache. 2. I have observed that if I omit the below line, inputP.persist(MEMORY_ONLY) the first time Query execution is comparatively quick (say it take 1min), as the load to Memory time is saved, but to my surprise the second time I run the same query it takes 30 sec as the inputP is not constructed from disk (checked from UI). So my question is, Does spark use some kind of internal caching for inputP in this scenario? Thanks in advance Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/persist-MEMORY-ONLY-takes-lot-of-time-tp22343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming 1.3 Kafka Direct Streams
As I said in the original ticket, I think the implementation classes should be exposed so that people can subclass and override compute() to suit their needs. Just adding a function from Time = Set[TopicAndPartition] wouldn't be sufficient for some of my current production use cases. compute() isn't really a function from Time = Option[KafkaRDD], it's a function from (Time, current offsets, kafka metadata, etc) = Option[KafkaRDD] I think it's more straightforward to give access to that additional state via subclassing than it is to add in more callbacks for every possible use case. On Wed, Apr 1, 2015 at 2:01 PM, Tathagata Das t...@databricks.com wrote: We should be able to support that use case in the direct API. It may be as simple as allowing the users to pass on a function that returns the set of topic+partitions to read from. That is function (Time) = Set[TopicAndPartition] This gets called every batch interval before the offsets are decided. This would allow users to add topics, delete topics, modify partitions on the fly. What do you think Cody? On Wed, Apr 1, 2015 at 11:57 AM, Neelesh neele...@gmail.com wrote: Thanks Cody! On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org wrote: If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without building your own spark. On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote: Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
RE: Use with Data justifying Spark
Sonal, Thanks for the link ( I worked with them yet) Do you have any other example more “exotics” where I can play with specific data too and algo ? Cheers, Didier From: Sonal Goyal [mailto:sonalgoy...@gmail.com] Sent: Wednesday, April 01, 2015 7:09 PM To: Vila, Didier Cc: user@spark.apache.org Subject: Re: Use with Data justifying Spark Maybe check the examples? http://spark.apache.org/examples.html Best Regards, Sonal Founder, Nube Technologieshttp://www.nubetech.co On Wed, Apr 1, 2015 at 8:31 PM, Vila, Didier didier.v...@teradata.commailto:didier.v...@teradata.com wrote: Good Morning All, I would like to use Spark in a special “synthetics” case that justifies the use of spark. So , I am looking for a case based on data ( can be big) and eventually the associated java and/or python code. It will be fantastic if you can refer me a link where I can load this case ! Didier
Re: Size of arbitrary state managed via DStream updateStateByKey
Thanks for confirming! On Wed, Apr 1, 2015 at 12:33 PM, Tathagata Das t...@databricks.com wrote: In the current state yes there will be performance issues. It can be done much more efficiently and we are working on doing that. TD On Wed, Apr 1, 2015 at 7:49 AM, Vinoth Chandar vin...@uber.com wrote: Hi all, As I understand from docs and talks, the streaming state is in memory as RDD (optionally checkpointable to disk). SPARK-2629 hints that this in memory structure is not indexed efficiently? I am wondering how my performance would be if the streaming state does not fit in memory (say 100GB state over 10GB total RAM), and I did random updates to different keys via updateStateByKey? (Would throwing in SSDs help out). I am picturing some kind of performance degeneration would happen akin to Linux/innoDB Buffer cache thrashing. But if someone can demystify this, that would be awesome. Thanks Vinoth
Spark on EC2
Hi all, I just tried launching a Spark cluster on EC2 as described in http://spark.apache.org/docs/1.3.0/ec2-scripts.html I got the following response: *ResponseErrorsErrorCodePendingVerification/CodeMessageYour account is currently being verified. Verification normally takes less than 2 hours. Until your account is verified, you may not be able to launch additional instances or create additional volumes. If you are still receiving this message after more than 2 hours, please let us know by writing to aws-verificat...@amazon.com aws-verificat...@amazon.com. We appreciate your patience...* However I can see the EC2 instances in AWS console as running Any thoughts on what's going on? Thanks, Vadim ᐧ
Re: Spark Streaming 1.3 Kafka Direct Streams
Thanks Cody! On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org wrote: If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without building your own spark. On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote: Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: Spark Streaming 1.3 Kafka Direct Streams
We should be able to support that use case in the direct API. It may be as simple as allowing the users to pass on a function that returns the set of topic+partitions to read from. That is function (Time) = Set[TopicAndPartition] This gets called every batch interval before the offsets are decided. This would allow users to add topics, delete topics, modify partitions on the fly. What do you think Cody? On Wed, Apr 1, 2015 at 11:57 AM, Neelesh neele...@gmail.com wrote: Thanks Cody! On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org wrote: If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without building your own spark. On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote: Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: Spark Streaming and JMS
Its not a built in component of Spark. However there is a spark-package for Apache Camel receiver which can integrate with JMS. http://spark-packages.org/package/synsys/spark I have not tried it but do check it out. TD On Wed, Apr 1, 2015 at 4:38 AM, danila danila.erma...@gmail.com wrote: Hi Tathagata do you know if JMS Reciever was introduced during last year as standard Spark component or somebody is developing it? Regards Danila -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-JMS-tp5371p22337.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Size of arbitrary state managed via DStream updateStateByKey
In the current state yes there will be performance issues. It can be done much more efficiently and we are working on doing that. TD On Wed, Apr 1, 2015 at 7:49 AM, Vinoth Chandar vin...@uber.com wrote: Hi all, As I understand from docs and talks, the streaming state is in memory as RDD (optionally checkpointable to disk). SPARK-2629 hints that this in memory structure is not indexed efficiently? I am wondering how my performance would be if the streaming state does not fit in memory (say 100GB state over 10GB total RAM), and I did random updates to different keys via updateStateByKey? (Would throwing in SSDs help out). I am picturing some kind of performance degeneration would happen akin to Linux/innoDB Buffer cache thrashing. But if someone can demystify this, that would be awesome. Thanks Vinoth
Unable to run Spark application
Why do I get Failed to find Spark assembly JAR. You need to build Spark before running this program. ? I downloaded spark-1.2.1.tgz from the downloads page and extracted it. When I do sbt package inside my application, it worked fine. But when I try to run my application, I get the above mentioned error.
Re: Spark Streaming S3 Performance Implications
Hey Chris, Apologies for the delayed reply. Your responses are always insightful and appreciated :-) However, I have a few more questions. also, it looks like you're writing to S3 per RDD. you'll want to broaden that out to write DStream batches I assume you mean dstream.saveAsTextFiles() vs rdd.saveAsTextFile(). Although looking at the source code DStream.scala, the saveAsTextFiles is simply wrapping a rdd.saveAsTextFile def saveAsTextFiles(prefix: String, suffix: String = ) { val saveFunc = (rdd: RDD[T], time: Time) = { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc) } So it's not clear to me how it would improve the throughput. Also, for your comment expand even further and write window batches (where the window interval is a multiple of the batch interval). I still don't quite understand mechanics underneath, for example, what would be the difference between extending the batch interval and adding windowed batches? I presume it has something to do with the processor thread(s) within a receiver? By the way, in the near future, I'll be putting together some performance numbers on a proper deployment, and will be sure to share my findings. Thanks, Mike! On Sat, Mar 21, 2015 at 8:09 AM, Chris Fregly ch...@fregly.com wrote: hey mike! you'll definitely want to increase your parallelism by adding more shards to the stream - as well as spinning up 1 receiver per shard and unioning all the shards per the KinesisWordCount example that is included with the kinesis streaming package. you'll need more cores (cluster) or threads (local) to support this - equalling at least the number of shards/receivers + 1. also, it looks like you're writing to S3 per RDD. you'll want to broaden that out to write DStream batches - or expand even further and write window batches (where the window interval is a multiple of the batch interval). this goes for any spark streaming implementation - not just Kinesis. lemme know if that works for you. thanks! -Chris _ From: Mike Trienis mike.trie...@orcsol.com Sent: Wednesday, March 18, 2015 2:45 PM Subject: Spark Streaming S3 Performance Implications To: user@spark.apache.org Hi All, I am pushing data from Kinesis stream to S3 using Spark Streaming and noticed that during testing (i.e. master=local[2]) the batches (1 second intervals) were falling behind the incoming data stream at about 5-10 events / second. It seems that the rdd.saveAsTextFile(s3n://...) is taking at a few seconds to complete. val saveFunc = (rdd: RDD[String], time: Time) = { val count = rdd.count() if (count 0) { val s3BucketInterval = time.milliseconds.toString rdd.saveAsTextFile(s3n://...) } } dataStream.foreachRDD(saveFunc) Should I expect the same behaviour in a deployed cluster? Or does the rdd.saveAsTextFile(s3n://...) distribute the push work to each worker node? Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. Thanks, Mike.
Re: Spark SQL saveAsParquet failed after a few waves
When few waves (1 or 2) are used in a job, LoadApp could finish after a few failures and retries. But when more waves (3) are involved in a job, the job would terminate abnormally. Can you clarify what you mean by waves? Are you inserting from multiple programs concurrently?
Spark 1.3.0 DataFrame count() method throwing java.io.EOFException
Note: I am running Spark on Windows 7 in standalone mode. In my app, I run the following: DataFrame df = sqlContext.sql(SELECT * FROM tbBER); System.out.println(Count: + df.count()); tbBER is registered as a temp table in my SQLContext. When I try to print the number of rows in the DataFrame, the job fails and I get the following error message: java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This only happens when I try to call df.count(). The rest runs fine. Is the count() function not supported in standalone mode? The stack trace makes it appear to be Hadoop functionality... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-DataFrame-count-method-throwing-java-io-EOFException-tp22344.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark permission denied error when invoking saveAsTextFile
Ignore the question. There was a Hadoop setting that needed to be set to get it working. -- Kannan On Wed, Apr 1, 2015 at 1:37 PM, Kannan Rajah kra...@maprtech.com wrote: Running a simple word count job in standalone mode as a non root user from spark-shell. The spark master, worker services are running as root user. The problem is the _temporary under /user/krajah/output2/_temporary/0 dir is being created with root permission even when running the job as non root user - krajah in this case. The higher level directories are getting created with right permission though. There was a similar question posted long time back, but there is no answer: http://mail-archives.apache.org/mod_mbox/mesos-user/201408.mbox/%3CCAAeYHL2M9J9xEotf_0zXmZXy2_x-oBHa=xxl2naft203o6u...@mail.gmail.com%3E *Wrong permission for child directory* drwxr-xr-x - root root0 2015-04-01 11:20 /user/krajah/output2/_temporary/0/_temporary *Right permission for parent directories* hadoop fs -ls -R /user/krajah/my_output drwxr-xr-x - krajah krajah 1 2015-04-01 11:46 /user/krajah/my_output/_temporary drwxr-xr-x - krajah krajah 3 2015-04-01 11:46 /user/krajah/my_output/_temporary/0 *Job and Stacktrace* scala val file = sc.textFile(/user/krajah/junk.txt) scala val counts = file.flatMap(line = line.split( )) scala .map(word = (word, 1)) scala .reduceByKey(_ + _) scala counts.saveAsTextFile(/user/krajah/count2) java.io.IOException: Error: Permission denied at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:926) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:345) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1079) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:853) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1199) at $iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC.init(console:24) at $iwC.init(console:26) at init(console:28) at .init(console:32) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) -- Kannan
Re: Spark 1.3.0 DataFrame count() method throwing java.io.EOFException
Is it possible tbBER is empty? If so, it shouldn't fail like this, of course. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Apr 1, 2015 at 5:57 PM, ARose ashley.r...@telarix.com wrote: Note: I am running Spark on Windows 7 in standalone mode. In my app, I run the following: DataFrame df = sqlContext.sql(SELECT * FROM tbBER); System.out.println(Count: + df.count()); tbBER is registered as a temp table in my SQLContext. When I try to print the number of rows in the DataFrame, the job fails and I get the following error message: java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This only happens when I try to call df.count(). The rest runs fine. Is the count() function not supported in standalone mode? The stack trace makes it appear to be Hadoop functionality... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-DataFrame-count-method-throwing-java-io-EOFException-tp22344.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-EC2 Security Group Error
Appears to be a problem with boto. Make sure you have boto 2.34 on your system. On Wed, Apr 1, 2015 at 11:19 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi all – I’m trying to bring up a spark ec2 cluster with the script below and see the following error. Can anyone please advise as to what’s going on? Is this indicative of me being unable to connect in the first place? The keys are known to work (they’re used elsewhere). ./spark-ec2 -k $AWS_KEYPAIR_NAME -i $AWS_PRIVATE_KEY -s 2 --region=us-west1 --zone=us-west-1a --instance-type=r3.2xlarge launch FuntimePartyLand Setting up security groups... Traceback (most recent call last): File ./spark_ec2.py, line 1383, in module main() File ./spark_ec2.py, line 1375, in main real_main() File ./spark_ec2.py, line 1210, in real_main (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) File ./spark_ec2.py, line 431, in launch_cluster master_group = get_or_make_group(conn, cluster_name + -master, opts.vpc_id) File ./spark_ec2.py, line 310, in get_or_make_group groups = conn.get_all_security_groups() AttributeError: 'NoneType' object has no attribute ‘get_all_security_groups' Thank you, Ilya Ganelin -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. -- *Dan Osipov* *Shazam* 2114 Broadway Street, Redwood City, CA 94063 Please consider the environment before printing this document Shazam Entertainment Limited is incorporated in England and Wales under company number 3998831 and its registered office is at 26-28 Hammersmith Grove, London W6 7HA. Shazam Media Services Inc is a member of the Shazam Entertainment Limited group of companies.
Re: How to start master and workers on Windows
I'm in the same boat. What are the equivalent commands to stop the master and workers? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-start-master-and-workers-on-Windows-tp12669p22341.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming 1.3 Kafka Direct Streams
The challenge of opening up these internal classes to public (even with Developer API tag) is that it prevents us from making non-trivial changes without breaking API compatibility for all those who had subclassed. Its a tradeoff that is hard to optimize. That's why we favor exposing more optional parameters in the stable API (KafkaUtils) so that we can maintain binary compatibility with user code as well as allowing us to make non-trivial changes internally. That said, it may be worthwhile to actually take an optional compute function as a parameter through the KafkaUtils, as Cody suggested ( (Time, current offsets, kafka metadata, etc) = Option[KafkaRDD]). Worth thinking about its implications in the context of the driver restarts, etc (as those function will get called again on restart, and different return value from before can screw up semantics). TD On Wed, Apr 1, 2015 at 12:28 PM, Neelesh neele...@gmail.com wrote: +1 for subclassing. its more flexible if we can subclass the implementation classes. On Apr 1, 2015 12:19 PM, Cody Koeninger c...@koeninger.org wrote: As I said in the original ticket, I think the implementation classes should be exposed so that people can subclass and override compute() to suit their needs. Just adding a function from Time = Set[TopicAndPartition] wouldn't be sufficient for some of my current production use cases. compute() isn't really a function from Time = Option[KafkaRDD], it's a function from (Time, current offsets, kafka metadata, etc) = Option[KafkaRDD] I think it's more straightforward to give access to that additional state via subclassing than it is to add in more callbacks for every possible use case. On Wed, Apr 1, 2015 at 2:01 PM, Tathagata Das t...@databricks.com wrote: We should be able to support that use case in the direct API. It may be as simple as allowing the users to pass on a function that returns the set of topic+partitions to read from. That is function (Time) = Set[TopicAndPartition] This gets called every batch interval before the offsets are decided. This would allow users to add topics, delete topics, modify partitions on the fly. What do you think Cody? On Wed, Apr 1, 2015 at 11:57 AM, Neelesh neele...@gmail.com wrote: Thanks Cody! On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org wrote: If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without building your own spark. On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote: Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: Unable to run Spark application
Managed to make sbt assembly work. I run into another issue now. When I do ./sbin/start-all.sh, the script fails saying JAVA_HOME is not set although I have explicitly set that variable to point to the correct Java location. Same happens with ./sbin/start-master.sh script. Any idea what I might be missing? On Wed, Apr 1, 2015 at 1:32 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: That is failing too, with sbt.resolveexception: unresolved dependency:org.apache.spark#spark-network-common_2.10;1.2.1 On Wed, Apr 1, 2015 at 1:24 PM, Marcelo Vanzin van...@cloudera.com wrote: Try sbt assembly instead. On Wed, Apr 1, 2015 at 10:09 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: Why do I get Failed to find Spark assembly JAR. You need to build Spark before running this program. ? I downloaded spark-1.2.1.tgz from the downloads page and extracted it. When I do sbt package inside my application, it worked fine. But when I try to run my application, I get the above mentioned error. -- Marcelo