Re: How to insert complex types like mapstring,mapstring,int in spark sql
https://github.com/apache/spark/blob/84d79ee9ec47465269f7b0a7971176da93c96f3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala Doesn't look like spark sql support nested complex types right now -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-complex-types-like-map-string-map-string-int-in-spark-sql-tp19603p19730.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Join returns less rows that expected
Hi, I have 2 files which come from csv import of 2 Oracle tables. F1 has 46730613 rows F2 has 3386740 rows I build 2 tables with spark. Table F1 join with table F2 on c1=d1. All keys F2.d1 exists in F1.c1, so i expect to retrieve 46730613 rows. But it returns only 3437 rows // --- begin code --- val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD val rddFile = sc.textFile(hdfs://referential/F1/part-*) case class F1(c1:String, c2:String,c3:Double, c3:String, c5: String) val stkrdd = rddFile.map(x = x.split(|)).map(f = F1(f(44),f(3),f(10).toDouble, ,f(2))) stkrdd.registerAsTable(F1) sqlContext.cacheTable(F1) val prdfile = sc.textFile(hdfs://referential/F2/part-*) case class F2(d1: String, d2:String, d3:String,d4:String) val productrdd = prdfile.map(x = x.split(|)).map(f = F2(f(0),f(2),f(101),f(3))) productrdd.registerAsTable(F2) sqlContext.cacheTable(F2) val resrdd = sqlContext.sql(Select count(*) from F1, F2 where F1.c1 = F2.d1 ).count() // --- end of code --- Does anybody know what i missed ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-returns-less-rows-that-expected-tp19731.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Understanding stages in WebUI
Hi, I have the classic word count example: file.flatMap(line = line.split( )).map(word = (word,1)).reduceByKey(_ + _).collect() From the Job UI, I can only see 2 stages: 0-collect and 1-map. What happened to ShuffledRDD in reduceByKey? And both flatMap and map operations is collapsed into a single stage? 14/11/25 16:02:35 INFO SparkContext: Starting job: collect at console:15 14/11/25 16:02:35 INFO DAGScheduler: Registering RDD 6 (map at console:15) 14/11/25 16:02:35 INFO DAGScheduler: Got job 0 (collect at console:15) with 2 output partitions (allowLocal=false) 14/11/25 16:02:35 INFO DAGScheduler: Final stage: Stage 0(collect at console:15) 14/11/25 16:02:35 INFO DAGScheduler: Parents of final stage: List(Stage 1) 14/11/25 16:02:35 INFO DAGScheduler: Missing parents: List(Stage 1) 14/11/25 16:02:35 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[6] at map at console:15), which has no missing parents 14/11/25 16:02:35 INFO MemoryStore: ensureFreeSpace(3464) called with curMem=163705, maxMem=278302556 14/11/25 16:02:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.4 KB, free 265.3 MB) 14/11/25 16:02:35 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[6] at map at console:15) 14/11/25 16:02:35 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/11/25 16:02:35 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, localhost, PROCESS_LOCAL, 1208 bytes) 14/11/25 16:02:35 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1208 bytes) 14/11/25 16:02:35 INFO Executor: Running task 0.0 in stage 1.0 (TID 0) 14/11/25 16:02:35 INFO Executor: Running task 1.0 in stage 1.0 (TID 1) 14/11/25 16:02:35 INFO HadoopRDD: Input split: file:/Users/ltsai/Downloads/spark-1.1.0-bin-hadoop2.4/README.md:0+2405 14/11/25 16:02:35 INFO HadoopRDD: Input split: file:/Users/ltsai/Downloads/spark-1.1.0-bin-hadoop2.4/README.md:2405+2406 14/11/25 16:02:35 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 14/11/25 16:02:35 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 14/11/25 16:02:35 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 14/11/25 16:02:35 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 14/11/25 16:02:35 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 14/11/25 16:02:36 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0). 1869 bytes result sent to driver 14/11/25 16:02:36 INFO Executor: Finished task 1.0 in stage 1.0 (TID 1). 1869 bytes result sent to driver 14/11/25 16:02:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 536 ms on localhost (1/2) 14/11/25 16:02:36 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 529 ms on localhost (2/2) 14/11/25 16:02:36 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/11/25 16:02:36 INFO DAGScheduler: Stage 1 (map at console:15) finished in 0.562 s 14/11/25 16:02:36 INFO DAGScheduler: looking for newly runnable stages 14/11/25 16:02:36 INFO DAGScheduler: running: Set() 14/11/25 16:02:36 INFO DAGScheduler: waiting: Set(Stage 0) 14/11/25 16:02:36 INFO DAGScheduler: failed: Set() 14/11/25 16:02:36 INFO DAGScheduler: Missing parents for Stage 0: List() 14/11/25 16:02:36 INFO DAGScheduler: Submitting Stage 0 (ShuffledRDD[7] at reduceByKey at console:15), which is now runnable 14/11/25 16:02:36 INFO MemoryStore: ensureFreeSpace(2112) called with curMem=167169, maxMem=278302556 14/11/25 16:02:36 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.1 KB, free 265.2 MB) 14/11/25 16:02:36 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (ShuffledRDD[7] at reduceByKey at console:15) 14/11/25 16:02:36 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/11/25 16:02:36 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 2, localhost, PROCESS_LOCAL, 948 bytes) 14/11/25 16:02:36 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 3, localhost, PROCESS_LOCAL, 948 bytes) 14/11/25 16:02:36 INFO Executor: Running task 0.0 in stage 0.0 (TID 2) 14/11/25 16:02:36 INFO Executor: Running task 1.0 in stage 0.0 (TID 3) 14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 5 ms 14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0
RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
Made progress but still blocked. After recompiling the code on cmd instead of PowerShell, now I can see all 5 classes as you mentioned. However I am still seeing the same error as before. Anything else I can check for? From: Judy Nash [mailto:judyn...@exchange.microsoft.com] Sent: Monday, November 24, 2014 11:50 PM To: Cheng Lian; u...@spark.incubator.apache.org Subject: RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava This is what I got from jar tf: org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class I seem to have the line that reported missing, but I am missing this file: com/google/inject/internal/util/$Preconditions.class Any suggestion on how to fix this? Very much appreciate the help as I am very new to Spark and open source technologies. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Monday, November 24, 2014 8:24 PM To: Judy Nash; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hm, I tried exactly the same commit and the build command locally, but couldn’t reproduce this. Usually this kind of errors are caused by classpath misconfiguration. Could you please try this to ensure corresponding Guava classes are included in the assembly jar you built? jar tf assembly/target/scala-2.10/spark-assembly-1.2.1-SNAPSHOT-hadoop2.4.0.jar | grep Preconditions On my machine I got these lines (the first line is the one reported as missing in your case): org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class com/google/inject/internal/util/$Preconditions.class On 11/25/14 6:25 AM, Judy Nash wrote: Thank you Cheng for responding. Here is the commit SHA1 on the 1.2 branch I saw this failure in: commit 6f70e0295572e3037660004797040e026e440dbd Author: zsxwing zsxw...@gmail.commailto:zsxw...@gmail.com Date: Fri Nov 21 00:42:43 2014 -0800 [SPARK-4472][Shell] Print Spark context available as sc. only when SparkContext is created... ... successfully It's weird that printing Spark context available as sc when creating SparkContext unsuccessfully. Let me know if you need anything else. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Friday, November 21, 2014 8:02 PM To: Judy Nash; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hi Judy, could you please provide the commit SHA1 of the version you're using? Thanks! On 11/22/14 11:05 AM, Judy Nash wrote: Hi, Thrift server is failing to start for me on latest spark 1.2 branch. I got the error below when I start thrift server. Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314)…. Here is my setup: 1) Latest spark 1.2 branch build 2) Used build command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package 3) Added hive-site.xml to \conf 4) Version on the box: Hive 0.13, Hadoop 2.4 Is this a real bug or am I doing something wrong? --- Full Stacktrace: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314) at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:327) at org.apache.hadoop.conf.Configuration.clinit(Configuration.java:409) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU til.scala:82) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala: 42) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala :202) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca la) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:230) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv. scala:38) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh riftServer2.scala:56) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr
Re: How to insert complex types like mapstring,mapstring,int in spark sql
Exactly that seems to be the problem will have to wait for the next release -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-complex-types-like-map-string-map-string-int-in-spark-sql-tp19603p19734.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to assign consecutive numeric id to each row based on its content?
Thanks a lot, both solutions work. best, /Shahab On Tue, Nov 18, 2014 at 5:28 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I think zipWithIndex is zero-based, so if you want 1 to N, you'll need to increment them like so: val r2 = r1.keys.distinct().zipWithIndex().mapValues(_ + 1) If the number of distinct keys is relatively small, you might consider collecting them into a map and broadcasting them rather than using a join, like so: val keyIndices = sc.broadcast(r2.collect.toMap) val r3 = r1.map { case (k, v) = (keyIndices(k), v) } On Tue, Nov 18, 2014 at 8:16 AM, Cheng Lian lian.cs@gmail.com wrote: A not so efficient way can be this: val r0: RDD[OriginalRow] = ...val r1 = r0.keyBy(row = extractKeyFromOriginalRow(row))val r2 = r1.keys.distinct().zipWithIndex()val r3 = r2.join(r1).values On 11/18/14 8:54 PM, shahab wrote: Hi, In my spark application, I am loading some rows from database into Spark RDDs Each row has several fields, and a string key. Due to my requirements I need to work with consecutive numeric ids (starting from 1 to N, where N is the number of unique keys) instead of string keys . Also several rows can have same string key . In spark context, how I can map each row into (Numeric_Key, OriginalRow) as map/reduce tasks such that rows with same original string key get same numeric consecutive key? Any hints? best, /Shahab -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Ideas on how to use Spark for anomaly detection on a stream of data
Yes, and I prepared a basic talk on this exact topic. Slides here: http://www.slideshare.net/srowen/anomaly-detection-with-apache-spark-41975155 This is elaborated in a chapter of an upcoming book that's available in early release; you can look at the accompanying source code to get some ideas too: https://github.com/sryza/aas/tree/master/kmeans On Mon, Nov 24, 2014 at 10:17 PM, Natu Lauchande nlaucha...@gmail.com wrote: Hi all, I am getting started with Spark. I would like to use for a spike on anomaly detection in a massive stream of metrics. Can Spark easily handle this use case ? Thanks, Natu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL - Any time line to move beyond Alpha version ?
The main reason for the alpha tag is actually that APIs might still be evolving, but we'd like to freeze the API as soon as possible. Hopefully it will happen in one of 1.3 or 1.4. In Spark 1.2, we're adding an external data source API that we'd like to get experience with before freezing it. Matei On Nov 24, 2014, at 2:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Is there any timeline where Spark SQL goes beyond alpha version? Thanks, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: streaming linear regression is not building the model
Computing will be triggered by new files added in the directory. If you place new files to the directory and it will start training the model. 2014-11-11 5:03 GMT+08:00 Bui, Tri tri@verizonwireless.com.invalid: Hi, The model weight is not updating for streaming linear regression. The code and data below is what I am running. import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val conf = new SparkConf().setMaster(local[1]).setAppName(1feature) val ssc = new StreamingContext(conf, Seconds(25)) val trainingData = ssc.textFileStream(file:///data/TrainStreamDir).map(LabeledPoint.parse) val testData = ssc.textFileStream(file:///data/TestStreamDir).map(LabeledPoint.parse) val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() *sample Data in the TrainStreamDir:* (10240,[1,21,0]) (9936,[2,21,15]) (10118,[3,21,30]) (10174,[4,21,45]) (10460,[5,22,0]) (9961,[6,22,15]) (10372,[7,22,30]) (10666,[8,22,45]) (10300,[9,23,0]) *Sample of output results:* 14/11/10 15:52:55 INFO scheduler.JobScheduler: Added jobs for time 1415652775000 ms 14/11/10 15:52:55 INFO scheduler.JobScheduler: Starting job streaming job 1415652775000 ms.0 from job set of time 141565 2775000 ms 14/11/10 15:52:55 INFO spark.SparkContext: Starting job: count at GradientDescent.scala:162 14/11/10 15:52:55 INFO spark.SparkContext: Job finished: count at GradientDescent.scala:162, took 3.1689E-5 s 14/11/10 15:52:55 INFO optimization.GradientDescent: GradientDescent.runMiniBatchSGD returning initial weights, no data found 14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Model updated at time 1415652775000 ms 14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Current model: weights, [0.0,0.0,0.0] Thanks Tri
Re: How to insert complex types like mapstring,mapstring,int in spark sql
Spark SQL supports complex types, but casting doesn't work for complex types right now. On 11/25/14 4:04 PM, critikaled wrote: https://github.com/apache/spark/blob/84d79ee9ec47465269f7b0a7971176da93c96f3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala Doesn't look like spark sql support nested complex types right now -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-complex-types-like-map-string-map-string-int-in-spark-sql-tp19603p19730.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Join returns less rows that expected
Which version are you using? Or if you are using the most recent master or branch-1.2, which commit are you using? On 11/25/14 4:08 PM, david wrote: Hi, I have 2 files which come from csv import of 2 Oracle tables. F1 has 46730613 rows F2 has 3386740 rows I build 2 tables with spark. Table F1 join with table F2 on c1=d1. All keys F2.d1 exists in F1.c1, so i expect to retrieve 46730613 rows. But it returns only 3437 rows // --- begin code --- val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD val rddFile = sc.textFile(hdfs://referential/F1/part-*) case class F1(c1:String, c2:String,c3:Double, c3:String, c5: String) val stkrdd = rddFile.map(x = x.split(|)).map(f = F1(f(44),f(3),f(10).toDouble, ,f(2))) stkrdd.registerAsTable(F1) sqlContext.cacheTable(F1) val prdfile = sc.textFile(hdfs://referential/F2/part-*) case class F2(d1: String, d2:String, d3:String,d4:String) val productrdd = prdfile.map(x = x.split(|)).map(f = F2(f(0),f(2),f(101),f(3))) productrdd.registerAsTable(F2) sqlContext.cacheTable(F2) val resrdd = sqlContext.sql(Select count(*) from F1, F2 where F1.c1 = F2.d1 ).count() // --- end of code --- Does anybody know what i missed ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-returns-less-rows-that-expected-tp19731.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD
The case run correctly in my environment. 14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Model updated at time 141690890 ms 14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Current model: weights, [0.8588] Can you provide more detail information if it is convenience? Turn on the intercept value can be set as following: val model = new StreamingLinearRegressionWithSGD() .algorithm.setIntercept(true) 2014-11-25 3:31 GMT+08:00 Bui, Tri tri@verizonwireless.com.invalid: Hi, I am getting incorrect weights model from StreamingLinearRegressionwith SGD. One feature Input data is: (1,[1]) (2,[2]) … . (20,[20]) The result from the Current model: weights is [-4.432]….which is not correct. Also, how do I turn on the intercept value for the StreamingLinearRegression ? Thanks Tri
Re: Unable to use Kryo
The problem was I didn't use the correct class name, it should be org.apache.spark.*serializer*.KryoSerializer On Mon, Nov 24, 2014 at 11:12 PM, Daniel Haviv danielru...@gmail.com wrote: Hi, I want to test Kryo serialization but when starting spark-shell I'm hitting the following error: java.lang.ClassNotFoundException: org.apache.spark.KryoSerializer the kryo-2.21.jar is on the classpath so I'm not sure why it's not picking it up. Thanks for your help, Daniel
K-means clustering
I have generated a sparse matrix by python, which has the size of 4000*174000 (.pkl), the following is a small part of this matrix : (0, 45) 1 (0, 413) 1 (0, 445) 1 (0, 107) 4 (0, 80) 2 (0, 352) 1 (0, 157) 1 (0, 191) 1 (0, 315) 1 (0, 395) 4 (0, 282) 3 (0, 184) 1 (0, 403) 1 (0, 169) 1 (0, 267) 1 (0, 148) 1 (0, 449) 1 (0, 241) 1 (0, 303) 1 (0, 364) 1 (0, 257) 1 (0, 372) 1 (0, 73) 1 (0, 64) 1 (0, 427) 1 : : (2, 399) 1 (2, 277) 1 (2, 229) 1 (2, 255) 1 (2, 409) 1 (2, 355) 1 (2, 391) 1 (2, 28) 1 (2, 384) 1 (2, 86) 1 (2, 285) 2 (2, 166) 1 (2, 165) 1 (2, 419) 1 (2, 367) 2 (2, 133) 1 (2, 61) 1 (2, 434) 1 (2, 51) 1 (2, 423) 1 (2, 398) 1 (2, 438) 1 (2, 389) 1 (2, 26) 1 (2, 455) 1 I am new in Spark and would like to cluster this matrix by k-means algorithm. Can anyone explain to me what kind of problems I might be faced. Please note that I do not want to use Mllib and would like to write my own k-means. Best Regards ... Amin Mohebbi PhD candidate in Software Engineering at university of Malaysia Tel : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my amin_...@me.com
restructure key-value pair with lambda in java
Hello, I have a key value pair, whose value is an ArrayList and I would like to move one value of the ArrayList to the key position and the key back into the ArrayList. Is it possible to do tis with java lambda expression? This workes in python: newMap = sourceMap.map(lambda (key,((value1, value2), value3)) : (value1, (key, value2,value3))) I resolved this in Java with the abstract call method, but is there a way to do this with lambda expression, to reduce the number of lines needed? JavaPairRDDString, ArrayListlt;String newMap= sourceMap.flatMapToPair((String a, ArrayListString b) - ??? Thank you for any suggestions Rob -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/restructure-key-value-pair-with-lambda-in-java-tp19748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Lifecycle of RDD in spark-streaming
Hey Experts, I wanted to understand in detail about the lifecycle of rdd(s) in a streaming app. From my current understanding - rdd gets created out of the realtime input stream. - Transform(s) functions are applied in a lazy fashion on the RDD to transform into another rdd(s). - Actions are taken on the final transformed rdds to get the data out of the system. Also rdd(s) are stored in the clusters RAM (disc if configured so) and are cleaned in LRU fashion. So I have the following questions on the same. - How spark (streaming) guarantees that all the actions are taken on each input rdd/batch. - How does spark determines that the life-cycle of a rdd is complete. Is there any chance that a RDD will be cleaned out of ram before all actions are taken on them? Thanks in advance for all your help. Also, I'm relatively new to scala spark so pardon me in case these are naive questions/assumptions. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
RE: Control number of parquet generated from JavaSchemaRDD
Hi, While submitting your spark job mention --executor-cores 2 --num-executors 24 it will divide the dataset into 24*2 parquet files. Or set spark.default.parallelism value like 50 on sparkconf object. It will divide the dataset into 50 files into your HDFS. -Naveen -Original Message- From: tridib [mailto:tridib.sama...@live.com] Sent: Tuesday, November 25, 2014 9:54 AM To: u...@spark.incubator.apache.org Subject: Control number of parquet generated from JavaSchemaRDD Hello, I am reading around 1000 input files from disk in an RDD and generating parquet. It always produces same number of parquet files as number of input files. I tried to merge them using rdd.coalesce(n) and/or rdd.repatition(n). also tried using: int MB_128 = 128*1024*1024; sc.hadoopConfiguration().setInt(dfs.blocksize, MB_128); sc.hadoopConfiguration().setInt(parquet.block.size, MB_128); No luck. Is there a way to control the size/number of parquet files generated? Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: advantages of SparkSQL?
Thank you for answering, this is all very helpful! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/advantages-of-SparkSQL-tp19661p19753.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark cluster with Java 8 using ./spark-ec2
I'm trying to use the spark-ec2 command to launch a Spark cluster that runs Java 8, but so far I haven't been able to get the Spark processes to use the right JVM at start up. Here's the command I use for launching the cluster. Note I'm using the user-data feature to install Java 8: ./spark-ec2 -k spark -i ~/.ssh/spark.pem \ -t m3.large -s 1 \ --user-data=java8.sh launch spark After the cluster is running, I can SSH in and see that the default Java version is indeed 8: ssh root@... $ echo $JAVA_HOME /usr/java/default $ java -version java version 1.8.0 Java(TM) SE Runtime Environment (build 1.8.0-b132) Java HotSpot(TM) 64-Bit Server VM (build 25.0-b70, mixed mode) It seems that the Spark processes are still using Java 7. I've tried running sbin/stop-all.sh and start-all.sh from master, but that doesn't seem to help. What magic incantation am I missing? java8.sh user data script: #!/bin/bash # Check java version JAVA_VER=$(java -version 21 | sed 's/java version \(.*\)\.\(.*\)\..*/\1\2/; 1q') if [ $JAVA_VER -lt 18 ] then # Download jdk 8 echo Downloading and installing jdk 8 wget --no-cookies --no-check-certificate --header Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie http://download.oracle.com/otn-pub/java/jdk/8-b132/jdk-8-linux-x64.rpm; # Silent install yum -y install jdk-8-linux-x64.rpm # Figure out how many versions of Java we currently have NR_OF_OPTIONS=$(echo 0 | alternatives --config java 2/dev/null | grep 'There ' | awk '{print $3}' | tail -1) echo Found $NR_OF_OPTIONS existing versions of java. Adding new version. # Make the new java version available via /etc/alternatives alternatives --install /usr/bin/java java /usr/java/default/bin/java 1 # Make java 8 the default echo $(($NR_OF_OPTIONS + 1)) | alternatives --config java # Set some variables export JAVA_HOME=/usr/java/default/bin/java export JRE_HOME=/usr/java/default/jre export PATH=$PATH:/usr/java/default/bin fi # Check java version again JAVA_VER=$(java -version 21 | sed 's/java version \(.*\)\.\(.*\)\..*/\1\2/; 1q') echo export JAVA_HOME=/usr/java/default /root/.bash_profile . ~/.bash_profile echo Java version is $JAVA_VER echo JAVA_HOME: $JAVA_HOME echo JRE_HOME: $JRE_HOME echo PATH: $PATH Here's the stacktrace from stdout from the spark-submit command: 14/11/25 14:01:11 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 1.0 (TID 7) on executor ip-xx-xx-xxx-xx.eu-west-1.compute.internal: java.lang.UnsupportedClassVersionError (foo/spark/Main : Unsupported major.minor version 52.0) [duplicate 3] 14/11/25 14:01:11 ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job 14/11/25 14:01:11 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 14/11/25 14:01:11 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled 14/11/25 14:01:11 INFO scheduler.DAGScheduler: Failed to run saveAsHadoopFile at Main.java:146 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, ip-xx-xx-xxx-xx.eu-west-1.compute.internal): java.lang.UnsupportedClassVersionError: foo/spark/Main : Unsupported major.minor version 52.0 java.lang.ClassLoader.defineClass1(Native Method) java.lang.ClassLoader.defineClass(ClassLoader.java:800) java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) java.net.URLClassLoader.defineClass(URLClassLoader.java:449) java.net.URLClassLoader.access$100(URLClassLoader.java:71) java.net.URLClassLoader$1.run(URLClassLoader.java:361) java.net.URLClassLoader$1.run(URLClassLoader.java:355) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:354) java.lang.ClassLoader.loadClass(ClassLoader.java:425) java.lang.ClassLoader.loadClass(ClassLoader.java:358) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:274) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
ALS train error
Hi, I am getting the following error val model = ALS.train(ratings, rank, numIterations, 0.01) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 103.0 failed 1 times, most recent failure: Lost task 1.0 in stage 103.0 (TID 3, localhost): scala.MatchError: [Ljava.lang.String;@4837e797 (of class [Ljava.lang.String;) $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:16) $iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:16) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Remapping columns from a schemaRDD
Hi, I'm selecting columns from a json file, transform some of them and would like to store the result as a parquet file but I'm failing. This is what I'm doing: val jsonFiles=sqlContext.jsonFile(/requests.loading) jsonFiles.registerTempTable(jRequests) val clean_jRequests=sqlContext.sql(select c1, c2, c3 ... c55 from jRequests) and then I run a map: val jRequests_flat=clean_jRequests.map(line={((line(1),line(2),line(3),line(4),line(5),line(6),line(7), *line(8).asInstanceOf[Iterable[String]].mkString(,)*,line(9) ,line(10) ,line(11) ,line(12) ,line(13) ,line(14) ,line(15) ,line(16) ,line(17) ,line(18) ,line(19) ,line(20) ,line(21) ,line(22) ,line(23) ,line(24) ,line(25) ,line(26) ,line(27) ,line(28) ,line(29) ,line(30) ,line(31) ,line(32) ,line(33) ,line(34) ,line(35) ,line(36) ,line(37) ,line(38) ,line(39) ,line(40) ,line(41) ,line(42) ,line(43) ,line(44) ,line(45) ,line(46) ,line(47) ,line(48) ,line(49) ,line(50)))}) 1. Is there a smarter way to achieve that (only modify a certain column without relating to the others, but keeping all of them)? 2. The last statement fails because the tuple has too much members: console:19: error: object Tuple50 is not a member of package scala Thanks for your help, Daniel
RE: Spark Streaming with Python
Any idea how to resolve this? Regards, Venkat From: Venkat, Ankam Sent: Sunday, November 23, 2014 12:05 PM To: 'user@spark.apache.org' Subject: Spark Streaming with Python I am trying to run network_wordcount.py example mentioned at https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/network_wordcount.py on CDH5.2 Quickstart VM. Getting below error. Traceback (most recent call last): File /usr/lib/spark/examples/lib/network_wordcount.py, line 4, in module from pyspark.streaming import StreamingContext ImportError: No module named streaming. How to resolve this? Regards, Venkat This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments. This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
RE: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD
Thanks Liang! It was my bad, I fat finger one of the data point, correct it and the result match with yours. I am still not able to get the intercept. I am getting [error] /data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:47: value setIntercept mber of org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD I try code below: val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) model.setIntercept(addIntercept = true).trainOn(trainingData) and: val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) .setIntercept(true) But still get compilation error. Thanks Tri From: Yanbo Liang [mailto:yanboha...@gmail.com] Sent: Tuesday, November 25, 2014 4:08 AM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD The case run correctly in my environment. 14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Model updated at time 141690890 ms 14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Current model: weights, [0.8588] Can you provide more detail information if it is convenience? Turn on the intercept value can be set as following: val model = new StreamingLinearRegressionWithSGD() .algorithm.setIntercept(true) 2014-11-25 3:31 GMT+08:00 Bui, Tri tri@verizonwireless.com.invalidmailto:tri@verizonwireless.com.invalid: Hi, I am getting incorrect weights model from StreamingLinearRegressionwith SGD. One feature Input data is: (1,[1]) (2,[2]) … . (20,[20]) The result from the Current model: weights is [-4.432]….which is not correct. Also, how do I turn on the intercept value for the StreamingLinearRegression ? Thanks Tri
Re: Spark and Stanford CoreNLP
I’m not (yet!) an active Spark user, but saw this thread on twitter … and am involved with Stanford CoreNLP. Could someone explain how things need to be to work better with Spark — since that would be a useful goal. That is, while Stanford CoreNLP is not quite uniform (being developed by various people for over a decade), the general approach has always been that models should be serializable but that processors should not be. This make sense to me intuitively. It doesn’t really make sense to serialize a processor, which often has large mutable data structures used for processing. But does that not work well with Spark? Do processors need to be serializable, and then one needs to go through and make all the elements of the processor transient? Or what? Thanks! Chris On Nov 25, 2014, at 7:54 AM, Evan Sparks evan.spa...@gmail.com wrote: If you only mark it as transient, then the object won't be serialized, and on the worker the field will be null. When the worker goes to use it, you get an NPE. Marking it lazy defers initialization to first use. If that use happens to be after serialization time (e.g. on the worker), then the worker will first check to see if it's initialized, and then initialize it if not. I think if you *do* reference the lazy val before serializing you will likely get an NPE. On Nov 25, 2014, at 1:05 AM, Theodore Vasiloudis theodoros.vasilou...@gmail.com wrote: Great, Ian's approach seems to work fine. Can anyone provide an explanation as to why this works, but passing the CoreNLP object itself as transient does not? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654p19739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark yarn cluster Application Master not running yarn container
I am running a 3 node(32 core, 60gb) Yarn cluster for Spark jobs. 1) Below are my Yarn memory settings yarn.nodemanager.resource.memory-mb = 52224 yarn.scheduler.minimum-allocation-mb = 40960 yarn.scheduler.maximum-allocation-mb = 52224 Apache Spark Memory Settings export SPARK_EXECUTOR_MEMORY=40G export SPARK_EXECUTOR_CORES=27 export SPARK_EXECUTOR_INSTANCES=3 With above settings I am hoping to see my job run on two nodes how ever the the job is not running on the node where Application Master is running. 2) Yarn memory settings yarn.nodemanager.resource.memory-mb = 52224 yarn.scheduler.minimum-allocation-mb = 20480 yarn.scheduler.maximum-allocation-mb = 52224 Apache Spark Memory Settings export SPARK_EXECUTOR_MEMORY=18G export SPARK_EXECUTOR_CORES=13 export SPARK_EXECUTOR_INSTANCES=4 I would like to know how can I run the job on both the nodes with the first memory settings ? Thanks for the help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-yarn-cluster-Application-Master-not-running-yarn-container-tp19761.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
why MatrixFactorizationModel private?
Hi all, seems that all the mllib models are declared accessible in the package, except MatrixFactorizationModel, which is declared private to mllib. Any reason why? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-MatrixFactorizationModel-private-tp19763.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
Chris, Thanks for stopping by! Here's a simple example. Imagine I've got a corpus of data, which is an RDD[String], and I want to do some POS tagging on it. In naive spark, that might look like this: val props = new Properties.setAnnotators(pos) val proc = new StanfordCoreNLP(props) val data = sc.textFile(hdfs://some/distributed/corpus) def processData(s: String): Annotation = { val a = new Annotation(s) proc.annotate(a) } val processedData = data.map(processData) //Note that this is actually executed lazily. Under the covers, spark takes the closure (processData), serializes it and all objects/methods that it references (including the proc), and ships the serialized closure off to workers so that they can run it on their local partitions of the corpus. The issue at hand is that since the StanfordCoreNLP object isn't serializable, *this will fail at runtime.* Hence the solutions to this problem suggested in this thread, which all come down to initializing the processor on the worker side (preferably once). Your intuition about not wanting to serialize huge objects is fine. This issue is not unique to CoreNLP - any Java library which has non-serializable objects will face this issue. HTH, Evan On Tue, Nov 25, 2014 at 8:05 AM, Christopher Manning mann...@stanford.edu wrote: I’m not (yet!) an active Spark user, but saw this thread on twitter … and am involved with Stanford CoreNLP. Could someone explain how things need to be to work better with Spark — since that would be a useful goal. That is, while Stanford CoreNLP is not quite uniform (being developed by various people for over a decade), the general approach has always been that models should be serializable but that processors should not be. This make sense to me intuitively. It doesn’t really make sense to serialize a processor, which often has large mutable data structures used for processing. But does that not work well with Spark? Do processors need to be serializable, and then one needs to go through and make all the elements of the processor transient? Or what? Thanks! Chris On Nov 25, 2014, at 7:54 AM, Evan Sparks evan.spa...@gmail.com wrote: If you only mark it as transient, then the object won't be serialized, and on the worker the field will be null. When the worker goes to use it, you get an NPE. Marking it lazy defers initialization to first use. If that use happens to be after serialization time (e.g. on the worker), then the worker will first check to see if it's initialized, and then initialize it if not. I think if you *do* reference the lazy val before serializing you will likely get an NPE. On Nov 25, 2014, at 1:05 AM, Theodore Vasiloudis theodoros.vasilou...@gmail.com wrote: Great, Ian's approach seems to work fine. Can anyone provide an explanation as to why this works, but passing the CoreNLP object itself as transient does not? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654p19739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to keep a local variable in each cluster?
Any comments? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-local-variable-in-each-cluster-tp19604p19766.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark shell running on mesos
Hi! I started play with Spark some days ago and now I'm configuring a little cluster to play during my development. For this task, I'm using Apache Mesos running in Linux container managed by Docker. The mesos master and slave are running. I can see the webui and everything looks fine. I am trying run the Spark shell on my Mesos cluster but is not working. Like you can see in the output ( link below ) the Mesos master is found but the shell stay on the line No credentials provided. Attempting to register without authentication. And in the mesos master log I can see a lot of the same messages ( see link below ). What is wrong with my setup? Am I missing some configuration? Spark-shell output: http://pastebin.com/rUzuMbaT Mesos master log: http://pastebin.com/tz1013jZ Spark version: 1.1.0 Mesos version: 0.20.1 Thank you -- Att. José Guilherme Vanz br.linkedin.com/pub/josé-guilherme-vanz/51/b27/58b/ http://br.linkedin.com/pub/jos%C3%A9-guilherme-vanz/51/b27/58b/ O sofrimento é passageiro, desistir é para sempre - Bernardo Fonseca, recordista da Antarctic Ice Marathon.
Re: Remapping columns from a schemaRDD
Probably the easiest/closest way to do this would be with a UDF, something like: registerFunction(makeString, (s: Seq[String]) = s.mkString(,)) sql(SELECT *, makeString(c8) AS newC8 FROM jRequests) Although this does not modify a column, but instead appends a new column. Another more complicated way to do something like this would be by using the applySchema function http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema . I'll note that, as part of the ML pipeline work, we have been considering adding something like: def modifyColumn(columnName, function) Any comments anyone has on this interface would be appreciated! Michael On Tue, Nov 25, 2014 at 7:02 AM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm selecting columns from a json file, transform some of them and would like to store the result as a parquet file but I'm failing. This is what I'm doing: val jsonFiles=sqlContext.jsonFile(/requests.loading) jsonFiles.registerTempTable(jRequests) val clean_jRequests=sqlContext.sql(select c1, c2, c3 ... c55 from jRequests) and then I run a map: val jRequests_flat=clean_jRequests.map(line={((line(1),line(2),line(3),line(4),line(5),line(6),line(7), *line(8).asInstanceOf[Iterable[String]].mkString(,)*,line(9) ,line(10) ,line(11) ,line(12) ,line(13) ,line(14) ,line(15) ,line(16) ,line(17) ,line(18) ,line(19) ,line(20) ,line(21) ,line(22) ,line(23) ,line(24) ,line(25) ,line(26) ,line(27) ,line(28) ,line(29) ,line(30) ,line(31) ,line(32) ,line(33) ,line(34) ,line(35) ,line(36) ,line(37) ,line(38) ,line(39) ,line(40) ,line(41) ,line(42) ,line(43) ,line(44) ,line(45) ,line(46) ,line(47) ,line(48) ,line(49) ,line(50)))}) 1. Is there a smarter way to achieve that (only modify a certain column without relating to the others, but keeping all of them)? 2. The last statement fails because the tuple has too much members: console:19: error: object Tuple50 is not a member of package scala Thanks for your help, Daniel
Re: Control number of parquet generated from JavaSchemaRDD
repartition and coalesce should both allow you to achieve what you describe. Can you maybe share the code that is not working? On Mon, Nov 24, 2014 at 8:24 PM, tridib tridib.sama...@live.com wrote: Hello, I am reading around 1000 input files from disk in an RDD and generating parquet. It always produces same number of parquet files as number of input files. I tried to merge them using rdd.coalesce(n) and/or rdd.repatition(n). also tried using: int MB_128 = 128*1024*1024; sc.hadoopConfiguration().setInt(dfs.blocksize, MB_128); sc.hadoopConfiguration().setInt(parquet.block.size, MB_128); No luck. Is there a way to control the size/number of parquet files generated? Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Merging Parquet Files
You'll need to be running a very recent version of Spark SQL as this feature was just added. On Tue, Nov 25, 2014 at 1:01 AM, Daniel Haviv danielru...@gmail.com wrote: Hi, Thanks for your reply.. I'm trying to do what you suggested but I get: scala sqlContext.sql(CREATE TEMPORARY TABLE data USING org.apache.spark.sql.parquet OPTIONS (path '/requests_parquet.toomany')) *java.lang.RuntimeException: Failed to load class for data source: org.apache.spark.sql.parquet* *at scala.sys.package$.error(package.scala:27)* any idea why ? Thanks, Daniel On Mon, Nov 24, 2014 at 11:30 PM, Michael Armbrust mich...@databricks.com wrote: Parquet does a lot of serial metadata operations on the driver which makes it really slow when you have a very large number of files (especially if you are reading from something like S3). This is something we are aware of and that I'd really like to improve in 1.3. You might try the (brand new and very experimental) new parquet support that I added into 1.2 at the last minute in an attempt to make our metadata handling more efficient. Basically you load the parquet files using the new data source API instead of using parquetFile: CREATE TEMPORARY TABLE data USING org.apache.spark.sql.parquet OPTIONS ( path 'path/to/parquet' ) This will at least parallelize the retrieval of file status object, but there is a lot more optimization that I hope to do. On Sat, Nov 22, 2014 at 1:53 PM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm ingesting a lot of small JSON files and convert them to unified parquet files, but even the unified files are fairly small (~10MB). I want to run a merge operation every hour on the existing files, but it takes a lot of time for such a small amount of data: about 3 GB spread of 3000 parquet files. Basically what I'm doing is load files in the existing directory, coalesce them and save to the new dir: val parquetFiles=sqlContext.parquetFile(/requests_merged/inproc) parquetFiles.coalesce(2).saveAsParquetFile(/requests_merged/$currday) Doing this takes over an hour on my 3 node cluster... Is there a better way to achieve this ? Any ideas what can cause such a simple operation take so long? Thanks, Daniel
RDD Cache Cleanup
Hi I am noticing that the RDDs that are persisted get cleaned up very quickly. This usually happens in a matter of a few minutes. I tried setting a value of 20 hours for the /spark.cleaner.ttl/ property and still get the same behavior. In my use-case, I have to persist about 20 RDDs each of size 10 GB. There is enough memory available (around 1 TB). The /spark.storage.memoryFraction/ property is set at 0.7. How does the cleanup work? Any help is appreciated. - Ranga -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Cache-Cleanup-tp19771.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Control number of parquet generated from JavaSchemaRDD
I am experimenting with two files and trying to generate 1 parquet file. public class CompactParquetGenerator implements Serializable { public void generateParquet(JavaSparkContext sc, String jsonFilePath, String parquetPath) { //int MB_128 = 128*1024*1024; //sc.hadoopConfiguration().setInt(dfs.blocksize, MB_128); //sc.hadoopConfiguration().setInt(parquet.block.size, MB_128); JavaSQLContext sqlCtx = new JavaSQLContext(sc); JavaRDDClaim claimRdd = sc.textFile(jsonFilePath).map(new StringToClaimMapper()).filter(new NullFilter()); JavaSchemaRDD claimSchemaRdd = sqlCtx.applySchema(claimRdd, Claim.class); claimSchemaRdd.coalesce(1) claimSchemaRdd.saveAsParquetFile(parquetPath); } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19773.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is spark streaming +MlLib for online learning?
In 1.2, we added streaming k-means: https://github.com/apache/spark/pull/2942 . -Xiangrui On Mon, Nov 24, 2014 at 5:25 PM, Joanne Contact joannenetw...@gmail.com wrote: Thank you Tobias! On Mon, Nov 24, 2014 at 5:13 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Nov 25, 2014 at 9:40 AM, Joanne Contact joannenetw...@gmail.com wrote: I seemed to read somewhere that spark is still batch learning, but spark streaming could allow online learning. Spark doesn't do Machine Learning itself, but MLlib does. MLlib currently can do online learning only for linear regression https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html#streaming-linear-regression, as far as I know. Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why is this operation so expensive
I have an JavaPairRDDKeyType,Tuple2Type1,Type2 originalPairs. There are on the order of 100 million elements I call a function to rearrange the tuples JavaPairRDDString,Tuple2Type1,Type2 newPairs = originalPairs.values().mapToPair(new PairFunctionTuple2Type1,Type2, String, Tuple2IType1,Type2 { @Override public Tuple2String, Tuple2Type1,Type2 doCall(final Tuple2Type1,Type2 t) { return new Tuple2String, Tuple2Type1,Type2(t._1().getId(), t); } } where Type1.getId() returns a String The data are spread across 120 partitions on 15 machines. The operation is dead simple and yet it takes 5 minutes to generate the data and over 30 minutes to perform this simple operation. I am at a loss to understand what is taking so long or how to make it faster. It this stage there is no reason to move data to different partitions Anyone have bright ideas - Oh yes Type1 and Type2 are moderately complex objects weighing in at about 10kb
Re: Control number of parquet generated from JavaSchemaRDD
public void generateParquet(JavaSparkContext sc, String jsonFilePath, String parquetPath) { //int MB_128 = 128*1024*1024; //sc.hadoopConfiguration().setInt(dfs.blocksize, MB_128); //sc.hadoopConfiguration().setInt(parquet.block.size, MB_128); JavaSQLContext sqlCtx = new JavaSQLContext(sc); JavaRDDClaim claimRdd = sc.textFile(jsonFilePath).map(new StringToClaimMapper()).filter(new NullFilter()); JavaSchemaRDD claimSchemaRdd = sqlCtx.applySchema(claimRdd, Claim.class); claimSchemaRdd.coalesce(1, true); //tried with false also. Tried repartition(1) too. claimSchemaRdd.saveAsParquetFile(parquetPath); } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19776.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: K-means clustering
There is a simple example here: https://github.com/apache/spark/blob/master/examples/src/main/python/kmeans.py . You can take advantage of sparsity by computing the distance via inner products: http://spark-summit.org/2014/talk/sparse-data-support-in-mllib-2 -Xiangrui On Tue, Nov 25, 2014 at 2:39 AM, amin mohebbi aminn_...@yahoo.com.invalid wrote: I have generated a sparse matrix by python, which has the size of 4000*174000 (.pkl), the following is a small part of this matrix : (0, 45) 1 (0, 413) 1 (0, 445) 1 (0, 107) 4 (0, 80) 2 (0, 352) 1 (0, 157) 1 (0, 191) 1 (0, 315) 1 (0, 395) 4 (0, 282) 3 (0, 184) 1 (0, 403) 1 (0, 169) 1 (0, 267) 1 (0, 148) 1 (0, 449) 1 (0, 241) 1 (0, 303) 1 (0, 364) 1 (0, 257) 1 (0, 372) 1 (0, 73) 1 (0, 64) 1 (0, 427) 1 : : (2, 399) 1 (2, 277) 1 (2, 229) 1 (2, 255) 1 (2, 409) 1 (2, 355) 1 (2, 391) 1 (2, 28) 1 (2, 384) 1 (2, 86) 1 (2, 285) 2 (2, 166) 1 (2, 165) 1 (2, 419) 1 (2, 367) 2 (2, 133) 1 (2, 61) 1 (2, 434) 1 (2, 51) 1 (2, 423) 1 (2, 398) 1 (2, 438) 1 (2, 389) 1 (2, 26) 1 (2, 455) 1 I am new in Spark and would like to cluster this matrix by k-means algorithm. Can anyone explain to me what kind of problems I might be faced. Please note that I do not want to use Mllib and would like to write my own k-means. Best Regards ... Amin Mohebbi PhD candidate in Software Engineering at university of Malaysia Tel : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my amin_...@me.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark sql UDF for array aggergation
Hi, I am looking for some resources/tutorials that will help me achive this: My JavaSchemaRDD is from JSON objects like below. How do I go about writing a UDF aggregate function let's say 'vectorAgg' which I can call from sql that returns one result array that is a positional aggregate across all the arrays of matching json objects that are in the select. { vectorId: 7d27f49e-6388-11e4-9fe2-001f29ebd7e2, vectorData: [ -1.727238, -0.2929525, -0.3537626, 0.240797, -0.1168013, 0.1258268, -0.1070271, 0.4877119, -0.06820393, -0.01934624, 0.1777821, 0.7428637, 0.0328331, 0.05247593, 0.01435089, 0.03089523, -0.1077004, 0.08026028, 0.006148338, -0.197648, 0.1349506, 0.5219278, 0.07526779, -0.01389027, 0.3850908, 0.06177521, 0.2421045, 0.01061058] } -Regards Seemanto Barua This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.
Re: MlLib Colaborative filtering factors
It is data-dependent, and hence needs hyper-parameter tuning, e.g., grid search. The first batch is certainly expensive. But after you figure out a small range for each parameter that fits your data, following batches should be not that expensive. There is an example from AMPCamp: http://ampcamp.berkeley.edu/5/exercises/movie-recommendation-with-mllib.html -Xiangrui On Tue, Nov 25, 2014 at 4:28 AM, Saurabh Agrawal saurabh.agra...@markit.com wrote: HI, I am trying to execute Collaborative filtering using MlLib. Can somebody please suggest how to calculate the following 1. Rank 2. Iterations 3. Lambda I understand these are adjustment factors and they help reduce the MSE in turn defining accuracy of algorithm but then is it all hit and trial or is there a definitive way to calculate them? Thanks!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Remapping columns from a schemaRDD
Thank you. How can I address more complex columns like maps and structs? Thanks again! Daniel On 25 בנוב׳ 2014, at 19:43, Michael Armbrust mich...@databricks.com wrote: Probably the easiest/closest way to do this would be with a UDF, something like: registerFunction(makeString, (s: Seq[String]) = s.mkString(,)) sql(SELECT *, makeString(c8) AS newC8 FROM jRequests) Although this does not modify a column, but instead appends a new column. Another more complicated way to do something like this would be by using the applySchema function. I'll note that, as part of the ML pipeline work, we have been considering adding something like: def modifyColumn(columnName, function) Any comments anyone has on this interface would be appreciated! Michael On Tue, Nov 25, 2014 at 7:02 AM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm selecting columns from a json file, transform some of them and would like to store the result as a parquet file but I'm failing. This is what I'm doing: val jsonFiles=sqlContext.jsonFile(/requests.loading) jsonFiles.registerTempTable(jRequests) val clean_jRequests=sqlContext.sql(select c1, c2, c3 ... c55 from jRequests) and then I run a map: val jRequests_flat=clean_jRequests.map(line={((line(1),line(2),line(3),line(4),line(5),line(6),line(7),line(8).asInstanceOf[Iterable[String]].mkString(,),line(9) ,line(10) ,line(11) ,line(12) ,line(13) ,line(14) ,line(15) ,line(16) ,line(17) ,line(18) ,line(19) ,line(20) ,line(21) ,line(22) ,line(23) ,line(24) ,line(25) ,line(26) ,line(27) ,line(28) ,line(29) ,line(30) ,line(31) ,line(32) ,line(33) ,line(34) ,line(35) ,line(36) ,line(37) ,line(38) ,line(39) ,line(40) ,line(41) ,line(42) ,line(43) ,line(44) ,line(45) ,line(46) ,line(47) ,line(48) ,line(49) ,line(50)))}) 1. Is there a smarter way to achieve that (only modify a certain column without relating to the others, but keeping all of them)? 2. The last statement fails because the tuple has too much members: console:19: error: object Tuple50 is not a member of package scala Thanks for your help, Daniel
Re: why MatrixFactorizationModel private?
Besides API stability concerns, models constructed directly from users rather than returned by ALS may not work well. The userFeatures and productFeatures are both with partitioners so we can perform quick lookup for prediction. If you save userFeatures and productFeatures and load them back, it is very likely the partitioning info is missing. That being said, we will try to address model export/import in v1.3: https://issues.apache.org/jira/browse/SPARK-4587 . -Xiangrui On Tue, Nov 25, 2014 at 8:26 AM, jamborta jambo...@gmail.com wrote: Hi all, seems that all the mllib models are declared accessible in the package, except MatrixFactorizationModel, which is declared private to mllib. Any reason why? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-MatrixFactorizationModel-private-tp19763.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD C
Hi I am noticing that the RDDs that are persisted get cleaned up very quickly. This usually happens in a matter of a few minutes. I tried setting a value of 20 hours for the /spark.cleaner.ttl/ property and still get the same behavior. In my use-case, I have to persist about 20 RDDs each of size 10 GB. There is enough memory available (around 1 TB). The /spark.storage.memoryFraction/ property is set at 0.7. How does the cleanup work? Any help is appreciated. - Ranga -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-C-tp19782.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why MatrixFactorizationModel private?
hi Xiangrui, thanks. that is a very useful feature. any suggestion on saving/loading the model in the meantime? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-MatrixFactorizationModel-private-tp19763p19783.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Remapping columns from a schemaRDD
Maps should just be scala maps, structs are rows inside of rows. If you wan to return a struct from a UDF you can do that with a case class. On Tue, Nov 25, 2014 at 10:25 AM, Daniel Haviv danielru...@gmail.com wrote: Thank you. How can I address more complex columns like maps and structs? Thanks again! Daniel On 25 בנוב׳ 2014, at 19:43, Michael Armbrust mich...@databricks.com wrote: Probably the easiest/closest way to do this would be with a UDF, something like: registerFunction(makeString, (s: Seq[String]) = s.mkString(,)) sql(SELECT *, makeString(c8) AS newC8 FROM jRequests) Although this does not modify a column, but instead appends a new column. Another more complicated way to do something like this would be by using the applySchema function http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema . I'll note that, as part of the ML pipeline work, we have been considering adding something like: def modifyColumn(columnName, function) Any comments anyone has on this interface would be appreciated! Michael On Tue, Nov 25, 2014 at 7:02 AM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm selecting columns from a json file, transform some of them and would like to store the result as a parquet file but I'm failing. This is what I'm doing: val jsonFiles=sqlContext.jsonFile(/requests.loading) jsonFiles.registerTempTable(jRequests) val clean_jRequests=sqlContext.sql(select c1, c2, c3 ... c55 from jRequests) and then I run a map: val jRequests_flat=clean_jRequests.map(line={((line(1),line(2),line(3),line(4),line(5),line(6),line(7), *line(8).asInstanceOf[Iterable[String]].mkString(,)*,line(9) ,line(10) ,line(11) ,line(12) ,line(13) ,line(14) ,line(15) ,line(16) ,line(17) ,line(18) ,line(19) ,line(20) ,line(21) ,line(22) ,line(23) ,line(24) ,line(25) ,line(26) ,line(27) ,line(28) ,line(29) ,line(30) ,line(31) ,line(32) ,line(33) ,line(34) ,line(35) ,line(36) ,line(37) ,line(38) ,line(39) ,line(40) ,line(41) ,line(42) ,line(43) ,line(44) ,line(45) ,line(46) ,line(47) ,line(48) ,line(49) ,line(50)))}) 1. Is there a smarter way to achieve that (only modify a certain column without relating to the others, but keeping all of them)? 2. The last statement fails because the tuple has too much members: console:19: error: object Tuple50 is not a member of package scala Thanks for your help, Daniel
Re: Spark sql UDF for array aggergation
We don't support native UDAs at the moment in Spark SQL. You can write a UDA using Hive's API and use that within Spark SQL On Tue, Nov 25, 2014 at 10:10 AM, Barua, Seemanto seemanto.ba...@jpmchase.com.invalid wrote: Hi, I am looking for some resources/tutorials that will help me achive this: My JavaSchemaRDD is from JSON objects like below. How do I go about writing a UDF aggregate function let’s say ‘vectorAgg’ which I can call from sql that returns one result array that is a positional aggregate across all the arrays of matching json objects that are in the select. { vectorId: 7d27f49e-6388-11e4-9fe2-001f29ebd7e2, vectorData: [ -1.727238, -0.2929525, -0.3537626, 0.240797, -0.1168013, 0.1258268, -0.1070271, 0.4877119, -0.06820393, -0.01934624, 0.1777821, 0.7428637, 0.0328331, 0.05247593, 0.01435089, 0.03089523, -0.1077004, 0.08026028, 0.006148338, -0.197648, 0.1349506, 0.5219278, 0.07526779, -0.01389027, 0.3850908, 0.06177521, 0.2421045, 0.01061058] } -Regards *Seemanto Barua* This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.
Re: Control number of parquet generated from JavaSchemaRDD
RDDs are immutable, so calling coalesce doesn't actually change the RDD but instead returns a new RDD that has fewer partitions. You need to save that to a variable and call saveAsParquetFile on the new RDD. On Tue, Nov 25, 2014 at 10:07 AM, tridib tridib.sama...@live.com wrote: public void generateParquet(JavaSparkContext sc, String jsonFilePath, String parquetPath) { //int MB_128 = 128*1024*1024; //sc.hadoopConfiguration().setInt(dfs.blocksize, MB_128); //sc.hadoopConfiguration().setInt(parquet.block.size, MB_128); JavaSQLContext sqlCtx = new JavaSQLContext(sc); JavaRDDClaim claimRdd = sc.textFile(jsonFilePath).map(new StringToClaimMapper()).filter(new NullFilter()); JavaSchemaRDD claimSchemaRdd = sqlCtx.applySchema(claimRdd, Claim.class); claimSchemaRdd.coalesce(1, true); //tried with false also. Tried repartition(1) too. claimSchemaRdd.saveAsParquetFile(parquetPath); } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19776.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Control number of parquet generated from JavaSchemaRDD
Ohh...how can I miss that. :(. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
using MultipleOutputFormat to ensure one output file per key
Hi, How can I implement a custom MultipleOutputFormat and specify it as the output of my Spark job so that I can ensure that there is a unique output file per key (instead of a a unique output file per reducer)? Thanks Arpan
Re: Ideas on how to use Spark for anomaly detection on a stream of data
Fantastic!!! Exactly what i was looking for. Thanks, Natu On Tue, Nov 25, 2014 at 10:46 AM, Sean Owen so...@cloudera.com wrote: Yes, and I prepared a basic talk on this exact topic. Slides here: http://www.slideshare.net/srowen/anomaly-detection-with-apache-spark-41975155 This is elaborated in a chapter of an upcoming book that's available in early release; you can look at the accompanying source code to get some ideas too: https://github.com/sryza/aas/tree/master/kmeans On Mon, Nov 24, 2014 at 10:17 PM, Natu Lauchande nlaucha...@gmail.com wrote: Hi all, I am getting started with Spark. I would like to use for a spike on anomaly detection in a massive stream of metrics. Can Spark easily handle this use case ? Thanks, Natu
Re: Control number of parquet generated from JavaSchemaRDD
Thanks Michael, It worked like a charm! I have few more queries: 1. Is there a way to control the size of parquet file? 2. Which method do you recommend coalesce(n, true), coalesce(n, false) or repartition(n)? Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: rack-topology.sh no such file or directory
Problem was solved by having the admins put this file on the edge nodes. Thanks, Arun On Wed, Nov 19, 2014 at 12:27 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Your Hadoop configuration is set to look for this file to determine racks. Is the file present on cluster nodes? If not, look at your hdfs-site.xml and remove the setting for a rack topology script there (or it might be in core-site.xml). Matei On Nov 19, 2014, at 12:13 PM, Arun Luthra arun.lut...@gmail.com wrote: I'm trying to run Spark on Yarn on a hortonworks 2.1.5 cluster. I'm getting this error: 14/11/19 13:46:34 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@#/user/Executor#- 2027837001] with ID 42 14/11/19 13:46:34 WARN net.ScriptBasedMapping: Exception running /etc/hadoop/conf/rack-topology.sh 10.0.28.130 java.io.IOException: Cannot run program /etc/hadoop/conf/rack-topology.sh (in directory ###): error=2, No such file or directory The rack-topology script is not on system (find / 2/dev/null -name rack-topology). Any possibly solution? Arun Luthra
RE: Spark SQL parser bug?
Hello I just stumbled on exactly the same issue as you are discussing in this thread. Here are my dependencies: dependencies dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector-java_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.2-SNAPSHOT/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.1.2-SNAPSHOT/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.1.2-SNAPSHOT/version scopeprovided/scope /dependency /dependencies As you can see I am using the latest of Spark and Spark Cassandra Connector and I still get the same error message: Exception in thread main java.util.NoSuchElementException: head of empty list So, I don't believe this bug was really fixed in Spark 1.1.1 release as reported above. Did you problem get fixed with the latest Spark update? Thanks, Leon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-parser-bug-tp15999p19793.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: using MultipleOutputFormat to ensure one output file per key
Hi, Arpan Ghosh wrote: Hi, How can I implement a custom MultipleOutputFormat and specify it as the output of my Spark job so that I can ensure that there is a unique output file per key (instead of a a unique output file per reducer)? I use something like this: class KeyBasedOutput[T : Null ,V : AnyRef] extends MultipleTextOutputFormat[T , V] { override protected def generateFileNameForKeyValue(key: T, value: V, leaf: String) = { key.toString()+/+leaf } override protected def generateActualKey(key: T, value: V) = { null } // this could be dangerous and overwrite files @throws(classOf[FileAlreadyExistsException]) @throws(classOf[InvalidJobConfException]) @throws(classOf[IOException]) override def checkOutputSpecs(ignored: FileSystem,job: JobConf) ={ } } and then just set a jobconf: val jobConf = new JobConf(self.context.hadoopConfiguration) jobConf.setOutputKeyClass(classOf[String]) jobConf.setOutputValueClass(classOf[String]) jobConf.setOutputFormat(classOf[KeyBasedOutput[String, String]]) rdd.saveAsHadoopDataset(jobConf) /Rafal Thanks Arpan -- Regards Rafał Kwasny mailto:/jabberid: m...@entropy.be - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to execute a custom python library on spark
Hi, I have written few datastructures as classes like following.. So, here is my code structure: project/foo/foo.py , __init__.py /bar/bar.py, __init__.py bar.py imports foo as from foo.foo import * /execute/execute.py imports bar as from bar.bar import * Ultimately I am executing execute.py as pyspark execute.py And this works fine locally.. but as soon I submit it on cluster... I see modules missing error.. I tried to send each and every file using --py-files flag (foo.py bar.py ) and other helper files.. But even then it complaints that module is not found So, the question is.. When one is building a library which is suppose to execute on top of spark, how should the imports and library be structured so that it works fine on spark. When to use pyspark and when to use spark submit to execute python scripts/module Bonus points if one can point an example library and how to run it :) Thanks
Re: Spark SQL Join returns less rows that expected
I guess you want to use split(\\|) instead of split(|). On Tue, Nov 25, 2014 at 4:51 AM, Cheng Lian lian.cs@gmail.com wrote: Which version are you using? Or if you are using the most recent master or branch-1.2, which commit are you using? On 11/25/14 4:08 PM, david wrote: Hi, I have 2 files which come from csv import of 2 Oracle tables. F1 has 46730613 rows F2 has 3386740 rows I build 2 tables with spark. Table F1 join with table F2 on c1=d1. All keys F2.d1 exists in F1.c1, so i expect to retrieve 46730613 rows. But it returns only 3437 rows // --- begin code --- val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD val rddFile = sc.textFile(hdfs://referential/F1/part-*) case class F1(c1:String, c2:String,c3:Double, c3:String, c5: String) val stkrdd = rddFile.map(x = x.split(|)).map(f = F1(f(44),f(3),f(10).toDouble, ,f(2))) stkrdd.registerAsTable(F1) sqlContext.cacheTable(F1) val prdfile = sc.textFile(hdfs://referential/F2/part-*) case class F2(d1: String, d2:String, d3:String,d4:String) val productrdd = prdfile.map(x = x.split(|)).map(f = F2(f(0),f(2),f(101),f(3))) productrdd.registerAsTable(F2) sqlContext.cacheTable(F2) val resrdd = sqlContext.sql(Select count(*) from F1, F2 where F1.c1 = F2.d1 ).count() // --- end of code --- Does anybody know what i missed ? Thanks -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Spark-SQL-Join-returns-less-rows- that-expected-tp19731.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Kryo NPE with Array
I am running into the following NullPointerException: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: underlying (scala.collection.convert.Wrappers$JListWrapper) myArrayField (MyCaseClass) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) I have been running into similar issues when using avro classes, that I was able to resolve by registering them with a Kryo serializer that uses chill-avro. However, in this case the field is in a case class and it seems that registering the class does not help. I found this stack overflow that seems to be relevant: http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist I have this line of code translated to Scala, that supposedly solves the issue: val kryo = new Kryo() kryo.getInstantiatorStrategy().asInstanceOf[Kryo.DefaultInstantiatorStrategy].setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()) However, I am not sure where this line should be placed to take effect. I already have the following, should it go somewhere in here? class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(...) } } Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
Re: How to execute a custom python library on spark
a quick thought on this: I think this is distro dependent also, right? We ran into a similar issue in https://issues.apache.org/jira/browse/BIGTOP-1546 where it looked like the python libraries might be overwritten on launch. On Tue, Nov 25, 2014 at 3:09 PM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I have written few datastructures as classes like following.. So, here is my code structure: project/foo/foo.py , __init__.py /bar/bar.py, __init__.py bar.py imports foo as from foo.foo import * /execute/execute.py imports bar as from bar.bar import * Ultimately I am executing execute.py as pyspark execute.py And this works fine locally.. but as soon I submit it on cluster... I see modules missing error.. I tried to send each and every file using --py-files flag (foo.py bar.py ) and other helper files.. But even then it complaints that module is not found So, the question is.. When one is building a library which is suppose to execute on top of spark, how should the imports and library be structured so that it works fine on spark. When to use pyspark and when to use spark submit to execute python scripts/module Bonus points if one can point an example library and how to run it :) Thanks -- jay vyas
Re: Why is this operation so expensive
Hi Steve, You changed the first value in a Tuple2, which is the one that Spark uses to hash and determine where in the cluster to place the value. By changing the first part of the PairRDD, you've implicitly asked Spark to reshuffle the data according to the new keys. I'd guess that you would observe large amounts of shuffle in the webui as a result of this code. If you don't actually need your data shuffled by the first part of the pair RDD, then consider making the KeyType not in the first half of the PairRDD. An alternative is to make the .equals() and .hashcode() of KeyType delegate to the .getId() method you use in the anonymous function. Cheers, Andrew On Tue, Nov 25, 2014 at 10:06 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an JavaPairRDDKeyType,Tuple2Type1,Type2 originalPairs. There are on the order of 100 million elements I call a function to rearrange the tuples JavaPairRDDString,Tuple2Type1,Type2 newPairs = originalPairs.values().mapToPair(new PairFunctionTuple2Type1,Type2, String, Tuple2IType1,Type2 { @Override public Tuple2String, Tuple2Type1,Type2 doCall(final Tuple2Type1,Type2 t) { return new Tuple2String, Tuple2Type1,Type2(t._1().getId(), t); } } where Type1.getId() returns a String The data are spread across 120 partitions on 15 machines. The operation is dead simple and yet it takes 5 minutes to generate the data and over 30 minutes to perform this simple operation. I am at a loss to understand what is taking so long or how to make it faster. It this stage there is no reason to move data to different partitions Anyone have bright ideas - Oh yes Type1 and Type2 are moderately complex objects weighing in at about 10kb
Re: Configuring custom input format
Hi, I'm trying to make custom input format for CSV file, if you can share little bit more what you read as input and what things you have implemented. I'll try to replicate the same things. If I find something interesting at my end I'll let you know. Thanks, Harihar - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-custom-input-format-tp18220p19800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring custom input format
How are you creating the object in your Scala shell? Maybe you can write a function that directly returns the RDD, without assigning the object to a temporary variable. Matei On Nov 5, 2014, at 2:54 PM, Corey Nolet cjno...@gmail.com wrote: The closer I look @ the stack trace in the Scala shell, it appears to be the call to toString() that is causing the construction of the Job object to fail. Is there a ways to suppress this output since it appears to be hindering my ability to new up this object? On Wed, Nov 5, 2014 at 5:49 PM, Corey Nolet cjno...@gmail.com mailto:cjno...@gmail.com wrote: I'm trying to use a custom input format with SparkContext.newAPIHadoopRDD. Creating the new RDD works fine but setting up the configuration file via the static methods on input formats that require a Hadoop Job object is proving to be difficult. Trying to new up my own Job object with the SparkContext.hadoopConfiguration is throwing the exception on line 283 of this grepcode: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-mapreduce-client-core/2.5.0/org/apache/hadoop/mapreduce/Job.java#Job http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-mapreduce-client-core/2.5.0/org/apache/hadoop/mapreduce/Job.java#Job Looking in the SparkContext code, I'm seeing that it's newing up Job objects just fine using nothing but the configuraiton. Using SparkContext.textFile() appears to be working for me. Any ideas? Has anyone else run into this as well? Is it possible to have a method like SparkContext.getJob() or something similar? Thanks.
Data Source for Spark SQL
I am using Spark SQL from Hive table with Parquet SerDe. Most queries are executed from Spark's JDBC Thrift server. Is there more efficient way to access/query data? For example, using saveAsParquetFile() and parquetFile() to save/load Parquet data and run queries directly? Thanks, Ken -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-Source-for-Spark-SQL-tp19802.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring custom input format
I was wiring up my job in the shell while i was learning Spark/Scala. I'm getting more comfortable with them both now so I've been mostly testing through Intellij with mock data as inputs. I think the problem lies more on Hadoop than Spark as the Job object seems to check it's state and throw an exception when the toString() method is called before the Job has physically been submitted. On Tue, Nov 25, 2014 at 5:31 PM, Matei Zaharia matei.zaha...@gmail.com wrote: How are you creating the object in your Scala shell? Maybe you can write a function that directly returns the RDD, without assigning the object to a temporary variable. Matei On Nov 5, 2014, at 2:54 PM, Corey Nolet cjno...@gmail.com wrote: The closer I look @ the stack trace in the Scala shell, it appears to be the call to toString() that is causing the construction of the Job object to fail. Is there a ways to suppress this output since it appears to be hindering my ability to new up this object? On Wed, Nov 5, 2014 at 5:49 PM, Corey Nolet cjno...@gmail.com wrote: I'm trying to use a custom input format with SparkContext.newAPIHadoopRDD. Creating the new RDD works fine but setting up the configuration file via the static methods on input formats that require a Hadoop Job object is proving to be difficult. Trying to new up my own Job object with the SparkContext.hadoopConfiguration is throwing the exception on line 283 of this grepcode: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-mapreduce-client-core/2.5.0/org/apache/hadoop/mapreduce/Job.java#Job Looking in the SparkContext code, I'm seeing that it's newing up Job objects just fine using nothing but the configuraiton. Using SparkContext.textFile() appears to be working for me. Any ideas? Has anyone else run into this as well? Is it possible to have a method like SparkContext.getJob() or something similar? Thanks.
RE: Spark SQL parser bug?
Leon, I solved the problem by creating a work around for it, so didn't have a need to upgrade to 1.1.2-SNAPSHOT. Mohammed -Original Message- From: Leon [mailto:pachku...@gmail.com] Sent: Tuesday, November 25, 2014 11:36 AM To: u...@spark.incubator.apache.org Subject: RE: Spark SQL parser bug? Hello I just stumbled on exactly the same issue as you are discussing in this thread. Here are my dependencies: dependencies dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector-java_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.2-SNAPSHOT/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.1.2-SNAPSHOT/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.1.2-SNAPSHOT/version scopeprovided/scope /dependency /dependencies As you can see I am using the latest of Spark and Spark Cassandra Connector and I still get the same error message: Exception in thread main java.util.NoSuchElementException: head of empty list So, I don't believe this bug was really fixed in Spark 1.1.1 release as reported above. Did you problem get fixed with the latest Spark update? Thanks, Leon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-parser-bug-tp15999p19793.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Submitting job from local to EC2 cluster
Hi All, I have spark deployed to an EC2 cluster and were able to run jobs successfully when drive is reside within the cluster. However, job was killed when I tried to submit it from local. My guess is spark cluster can’t open connection back to the driver since it is on my machine. I’m wondering if spark actually support submitting jobs from local? If so, would you please advise? Many thanks in advance! YK - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Classpath issue: Custom authentication with sparkSQL/Spark 1.2
Hi, I am trying to launch a spark 1.2 cluster with SparkSQL and custom authentication. After launching the cluster using the ec2 scripts, I copied the following hive-site.xml file into spark/conf dir: /configuration property namehive.server2.authentication/name valueCUSTOM/value /property property nameHIVE_SERVER2_CUSTOM_AUTHENTICATION_CLASS/name valuemypackage.MySharkAuthenticator/value /property property namehive.server2.enable.doAs/name valuefalse/value /property /configuration/ And I also copied my custom authentication jar file into spark/lib dir. I am getting the following error from sparkSql thrift server: /*java.lang.RuntimeException: java.lang.ClassNotFoundException: Class mypackage.SharkAuthenticator not found* at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1587) at org.apache.hive.service.auth.CustomAuthenticationProviderImpl.init(CustomAuthenticationProviderImpl.java:34) at org.apache.hive.service.auth.AuthenticationProviderFactory.getAuthenticationProvider(AuthenticationProviderFactory.java:57) at org.apache.hive.service.auth.PlainSaslHelper$PlainServerCallbackHandler.handle(PlainSaslHelper.java:61) at org.apache.hive.service.auth.PlainSaslServer.evaluateResponse(PlainSaslServer.java:127) at org.apache.thrift.transport.TSaslTransport$SaslParticipant.evaluateChallengeOrResponse(TSaslTransport.java:509) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:264) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.lang.ClassNotFoundException: Class mypackage.SharkAuthenticator not found* at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1493) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1585) ... 12 more/ In Another attempt, I also copied the custom authentication jar file to spark/conf directory (instead of spark/lib), but this is not working too. I didn’t have this issue with Spark 1.1. Any thoughts? Are there any changes in Spark 1.2 regarding the Classpath? Thanks, -Arin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-issue-Custom-authentication-with-sparkSQL-Spark-1-2-tp19806.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Creating a front-end for output from Spark/PySpark
Two options that I can think of: 1) Use the Spark SQL Thrift/JDBC server. 2) Develop a web app using some framework such as Play and expose a set of REST APIs for sending queries. Inside your web app backend, you initialize the Spark SQL context only once when your app initializes. Then you use that context for executing queries sent using your REST API. Mohammed From: Alaa Ali [mailto:contact.a...@gmail.com] Sent: Sunday, November 23, 2014 12:37 PM To: user@spark.apache.org Subject: Creating a front-end for output from Spark/PySpark Hello. Okay, so I'm working on a project to run analytic processing using Spark or PySpark. Right now, I connect to the shell and execute my commands. The very first part of my commands is: create an SQL JDBC connection and cursor to pull from Apache Phoenix, do some processing on the returned data, and spit out some output. I want to create a web gui tool kind of a thing where I play around with what SQL query is executed for my analysis. I know that I can write my whole Spark program and use spark-submit and have it accept and argument to be the SQL query I want to execute, but this means that every time I submit: an SQL connection will be created, query ran, processing done, output printed, program closes and SQL connection closes, and then the whole thing repeats if I want to do another query right away. That will probably cause it to be very slow. Is there a way where I can somehow have the SQL connection working in the backend for example, and then all I have to do is supply a query from my GUI tool where it then takes it, runs it, displays the output? I just want to know the big picture and a broad overview of how would I go about doing this and what additional technology to use and I'll dig up the rest. Regards, Alaa Ali
RE: querying data from Cassandra through the Spark SQL Thrift JDBC server
Thanks, Cheng. As an FYI for others trying to integrate Spark SQL JDBC server with Cassandra - I ended up using CalliopeServer2, which extends the Thrift Server and it was really straightforward. Mohammed From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Saturday, November 22, 2014 3:54 AM To: Mohammed Guller; u...@spark.incubator.apache.org Subject: Re: querying data from Cassandra through the Spark SQL Thrift JDBC server This thread might be helpful http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282.html On 11/20/14 4:11 AM, Mohammed Guller wrote: Hi - I was curious if anyone is using the Spark SQL Thrift JDBC server with Cassandra. It would be great be if you could share how you got it working? For example, what config changes have to be done in hive-site.xml, what additional jars are required, etc.? I have a Spark app that can programmatically query data from Cassandra using Spark SQL and Spark-Cassandra-Connector. No problem there, but I couldn't find any documentation for using the Thrift JDBC server for querying data from Cassandra. Thanks, Mohammed
RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
I traced the code and used the following to call: Spark-class.cmd org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 spark-internal --hiveconf hive.server2.thrift.port=1 The issue ended up to be much more fundamental however. Spark doesn’t work at all in configuration below. When open spark-shell, it fails with the same ClassNotFound error. Now I wonder if this is a windows-only issue or the hive/Hadoop configuration that is having this problem. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Tuesday, November 25, 2014 1:50 AM To: Judy Nash; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Oh so you're using Windows. What command are you using to start the Thrift server then? On 11/25/14 4:25 PM, Judy Nash wrote: Made progress but still blocked. After recompiling the code on cmd instead of PowerShell, now I can see all 5 classes as you mentioned. However I am still seeing the same error as before. Anything else I can check for? From: Judy Nash [mailto:judyn...@exchange.microsoft.com] Sent: Monday, November 24, 2014 11:50 PM To: Cheng Lian; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava This is what I got from jar tf: org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class I seem to have the line that reported missing, but I am missing this file: com/google/inject/internal/util/$Preconditions.class Any suggestion on how to fix this? Very much appreciate the help as I am very new to Spark and open source technologies. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Monday, November 24, 2014 8:24 PM To: Judy Nash; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hm, I tried exactly the same commit and the build command locally, but couldn’t reproduce this. Usually this kind of errors are caused by classpath misconfiguration. Could you please try this to ensure corresponding Guava classes are included in the assembly jar you built? jar tf assembly/target/scala-2.10/spark-assembly-1.2.1-SNAPSHOT-hadoop2.4.0.jar | grep Preconditions On my machine I got these lines (the first line is the one reported as missing in your case): org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class com/google/inject/internal/util/$Preconditions.class On 11/25/14 6:25 AM, Judy Nash wrote: Thank you Cheng for responding. Here is the commit SHA1 on the 1.2 branch I saw this failure in: commit 6f70e0295572e3037660004797040e026e440dbd Author: zsxwing zsxw...@gmail.commailto:zsxw...@gmail.com Date: Fri Nov 21 00:42:43 2014 -0800 [SPARK-4472][Shell] Print Spark context available as sc. only when SparkContext is created... ... successfully It's weird that printing Spark context available as sc when creating SparkContext unsuccessfully. Let me know if you need anything else. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Friday, November 21, 2014 8:02 PM To: Judy Nash; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hi Judy, could you please provide the commit SHA1 of the version you're using? Thanks! On 11/22/14 11:05 AM, Judy Nash wrote: Hi, Thrift server is failing to start for me on latest spark 1.2 branch. I got the error below when I start thrift server. Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314)…. Here is my setup: 1) Latest spark 1.2 branch build 2) Used build command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package 3) Added hive-site.xml to \conf 4) Version on the box: Hive 0.13, Hadoop 2.4 Is this a real bug or am I doing something wrong? --- Full Stacktrace: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314) at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:327) at org.apache.hadoop.conf.Configuration.clinit(Configuration.java:409) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU til.scala:82) at
Re: Why is this operation so expensive
If I combineByKey in the next step I suppose I am paying for a shuffle I need any way - right? Also if I supply a custom partitioner rather than hash can I control where and how data is shuffled - overriding equals and hashcode could be a bad thing but a custom partitioner is less dangerous On Tue, Nov 25, 2014 at 1:55 PM, Andrew Ash and...@andrewash.com wrote: Hi Steve, You changed the first value in a Tuple2, which is the one that Spark uses to hash and determine where in the cluster to place the value. By changing the first part of the PairRDD, you've implicitly asked Spark to reshuffle the data according to the new keys. I'd guess that you would observe large amounts of shuffle in the webui as a result of this code. If you don't actually need your data shuffled by the first part of the pair RDD, then consider making the KeyType not in the first half of the PairRDD. An alternative is to make the .equals() and .hashcode() of KeyType delegate to the .getId() method you use in the anonymous function. Cheers, Andrew On Tue, Nov 25, 2014 at 10:06 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an JavaPairRDDKeyType,Tuple2Type1,Type2 originalPairs. There are on the order of 100 million elements I call a function to rearrange the tuples JavaPairRDDString,Tuple2Type1,Type2 newPairs = originalPairs.values().mapToPair(new PairFunctionTuple2Type1,Type2, String, Tuple2IType1,Type2 { @Override public Tuple2String, Tuple2Type1,Type2 doCall(final Tuple2Type1,Type2 t) { return new Tuple2String, Tuple2Type1,Type2(t._1().getId(), t); } } where Type1.getId() returns a String The data are spread across 120 partitions on 15 machines. The operation is dead simple and yet it takes 5 minutes to generate the data and over 30 minutes to perform this simple operation. I am at a loss to understand what is taking so long or how to make it faster. It this stage there is no reason to move data to different partitions Anyone have bright ideas - Oh yes Type1 and Type2 are moderately complex objects weighing in at about 10kb -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
To determine if this is a Windows vs. other configuration, can you just try to call the Spark-class.cmd SparkSubmit without actually referencing the Hadoop or Thrift server classes? On Tue Nov 25 2014 at 5:42:09 PM Judy Nash judyn...@exchange.microsoft.com wrote: I traced the code and used the following to call: Spark-class.cmd org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 spark-internal --hiveconf hive.server2.thrift.port=1 The issue ended up to be much more fundamental however. Spark doesn’t work at all in configuration below. When open spark-shell, it fails with the same ClassNotFound error. Now I wonder if this is a windows-only issue or the hive/Hadoop configuration that is having this problem. *From:* Cheng Lian [mailto:lian.cs@gmail.com] *Sent:* Tuesday, November 25, 2014 1:50 AM *To:* Judy Nash; u...@spark.incubator.apache.org *Subject:* Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Oh so you're using Windows. What command are you using to start the Thrift server then? On 11/25/14 4:25 PM, Judy Nash wrote: Made progress but still blocked. After recompiling the code on cmd instead of PowerShell, now I can see all 5 classes as you mentioned. However I am still seeing the same error as before. Anything else I can check for? *From:* Judy Nash [mailto:judyn...@exchange.microsoft.com judyn...@exchange.microsoft.com] *Sent:* Monday, November 24, 2014 11:50 PM *To:* Cheng Lian; u...@spark.incubator.apache.org *Subject:* RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava This is what I got from jar tf: org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class I seem to have the line that reported missing, but I am missing this file: com/google/inject/internal/util/$Preconditions.class Any suggestion on how to fix this? Very much appreciate the help as I am very new to Spark and open source technologies. *From:* Cheng Lian [mailto:lian.cs@gmail.com lian.cs@gmail.com] *Sent:* Monday, November 24, 2014 8:24 PM *To:* Judy Nash; u...@spark.incubator.apache.org *Subject:* Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hm, I tried exactly the same commit and the build command locally, but couldn’t reproduce this. Usually this kind of errors are caused by classpath misconfiguration. Could you please try this to ensure corresponding Guava classes are included in the assembly jar you built? jar tf assembly/target/scala-2.10/spark-assembly-1.2.1-SNAPSHOT-hadoop2.4.0.jar | grep Preconditions On my machine I got these lines (the first line is the one reported as missing in your case): org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class com/google/inject/internal/util/$Preconditions.class On 11/25/14 6:25 AM, Judy Nash wrote: Thank you Cheng for responding. Here is the commit SHA1 on the 1.2 branch I saw this failure in: commit 6f70e0295572e3037660004797040e026e440dbd Author: zsxwing zsxw...@gmail.com zsxw...@gmail.com Date: Fri Nov 21 00:42:43 2014 -0800 [SPARK-4472][Shell] Print Spark context available as sc. only when SparkContext is created... ... successfully It's weird that printing Spark context available as sc when creating SparkContext unsuccessfully. Let me know if you need anything else. *From:* Cheng Lian [mailto:lian.cs@gmail.com lian.cs@gmail.com] *Sent:* Friday, November 21, 2014 8:02 PM *To:* Judy Nash; u...@spark.incubator.apache.org *Subject:* Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hi Judy, could you please provide the commit SHA1 of the version you're using? Thanks! On 11/22/14 11:05 AM, Judy Nash wrote: Hi, Thrift server is failing to start for me on latest spark 1.2 branch. I got the error below when I start thrift server. Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314)…. Here is my setup: 1) Latest spark 1.2 branch build 2) Used build command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package 3) Added hive-site.xml to \conf 4) Version on the box: Hive 0.13, Hadoop 2.4 Is this a real bug or am I doing something wrong? --- Full Stacktrace: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at
Re: Control number of parquet generated from JavaSchemaRDD
I believe coalesce(..., true) and repartition are the same. If the input files are of similar sizes, then coalesce will be cheaper as it introduces a narrow dependency https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf, meaning there won't be a shuffle. However, if there is a lot of skew in the input file size, then a repartition will ensure that data is shuffled evenly. There is currently no way to control the file size other than pick a 'good' number of partitions. On Tue, Nov 25, 2014 at 11:30 AM, tridib tridib.sama...@live.com wrote: Thanks Michael, It worked like a charm! I have few more queries: 1. Is there a way to control the size of parquet file? 2. Which method do you recommend coalesce(n, true), coalesce(n, false) or repartition(n)? Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717p19789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring custom input format
Yeah, unfortunately that will be up to them to fix, though it wouldn't hurt to send them a JIRA mentioning this. Matei On Nov 25, 2014, at 2:58 PM, Corey Nolet cjno...@gmail.com wrote: I was wiring up my job in the shell while i was learning Spark/Scala. I'm getting more comfortable with them both now so I've been mostly testing through Intellij with mock data as inputs. I think the problem lies more on Hadoop than Spark as the Job object seems to check it's state and throw an exception when the toString() method is called before the Job has physically been submitted. On Tue, Nov 25, 2014 at 5:31 PM, Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com wrote: How are you creating the object in your Scala shell? Maybe you can write a function that directly returns the RDD, without assigning the object to a temporary variable. Matei On Nov 5, 2014, at 2:54 PM, Corey Nolet cjno...@gmail.com mailto:cjno...@gmail.com wrote: The closer I look @ the stack trace in the Scala shell, it appears to be the call to toString() that is causing the construction of the Job object to fail. Is there a ways to suppress this output since it appears to be hindering my ability to new up this object? On Wed, Nov 5, 2014 at 5:49 PM, Corey Nolet cjno...@gmail.com mailto:cjno...@gmail.com wrote: I'm trying to use a custom input format with SparkContext.newAPIHadoopRDD. Creating the new RDD works fine but setting up the configuration file via the static methods on input formats that require a Hadoop Job object is proving to be difficult. Trying to new up my own Job object with the SparkContext.hadoopConfiguration is throwing the exception on line 283 of this grepcode: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-mapreduce-client-core/2.5.0/org/apache/hadoop/mapreduce/Job.java#Job http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-mapreduce-client-core/2.5.0/org/apache/hadoop/mapreduce/Job.java#Job Looking in the SparkContext code, I'm seeing that it's newing up Job objects just fine using nothing but the configuraiton. Using SparkContext.textFile() appears to be working for me. Any ideas? Has anyone else run into this as well? Is it possible to have a method like SparkContext.getJob() or something similar? Thanks.
IDF model error
Hello Spark fans, I am trying to use the IDF model available in the spark mllib to create an tf-idf representation of a n RDD[Vectors]. Below i have attached my MWE I get the following error java.lang.IndexOutOfBoundsException: 7 not in [-4,4) at breeze.linalg.DenseVector.apply$mcI$sp(DenseVector.scala:70) at breeze.linalg.DenseVector.apply(DenseVector.scala:69) at org.apache.spark.mllib.feature.IDF$DocumentFrequencyAggregator.add(IDF.scala:81) Any ideas? Regards, Shivani import org.apache.spark.mllib.feature.VectorTransformer import com.box.analytics.ml.dms.vector.{SparkSparseVector,SparkDenseVector} import org.apache.spark.mllib.linalg.{DenseVector = SDV, SparseVector = SSV} import org.apache.spark.mllib.linalg.{Vector = SparkVector} import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix} import org.apache.spark.mllib.feature._ val doc1s = new IndexedRow(1L, new SSV(4, Array(1, 3, 5, 7),Array(1.0, 1.0, 0.0, 5.0))) val doc2s = new IndexedRow(2L, new SSV(4, Array(1, 2, 4, 13), Array(0.0, 1.0, 2.0, 0.0))) val doc3s = new IndexedRow(3L, new SSV(4, Array(10, 14, 20, 21),Array(2.0, 0.0, 2.0, 1.0))) val doc4s = new IndexedRow(4L, new SSV(4, Array(3, 7, 13, 20),Array(2.0, 0.0, 2.0, 1.0))) val indata = sc.parallelize(List(doc1s,doc2s,doc3s,doc4s)).map(e=e.vector) (new IDF()).fit(indata).idf -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
Issue with Spark latest 1.2.0 build - ClassCastException from [B to SerializableWritable
Hello forum, We are using spark distro built from the source of latest 1.2.0 tag. And we are facing the below issue, while trying to act upon the JavaRDD instance, the stacktrace is given below. Can anyone please let me know, what can be wrong here? java.lang.ClassCastException: [B cannot be cast to org.apache.spark.SerializableWritable at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:138) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:419) at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:32) at com.dataken.common.chores.InformationDataLoadChore.run(InformationDataLoadChore.java:69) at com.dataken.common.pipeline.DatakenTask.start(DatakenTask.java:110) at com.dataken.tasks.objectcentricprocessor.ObjectCentricProcessTask.execute(ObjectCentricProcessTask.java:99) at org.quartz.core.JobRunShell.run(JobRunShell.java:202) at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) 2014-11-26 08:07:38,454 ERROR [DefaultQuartzScheduler_Worker-2] org.quartz.core.ErrorLogger Job (report_report.report_report threw an exception. org.quartz.SchedulerException: Job threw an unhandled exception. [See nested exception: java.lang.ClassCastException: [B cannot be cast to org.apache.spark.SerializableWritable] at org.quartz.core.JobRunShell.run(JobRunShell.java:213) at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.spark.SerializableWritable at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:138) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:419) at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:32) at com.dataken.common.chores.InformationDataLoadChore.run(InformationDataLoadChore.java:69) at com.dataken.common.pipeline.DatakenTask.start(DatakenTask.java:110) at com.dataken.tasks.objectcentricprocessor.ObjectCentricProcessTask.execute(ObjectCentricProcessTask.java:99) at org.quartz.core.JobRunShell.run(JobRunShell.java:202) ... 1 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-Spark-latest-1-2-0-build-ClassCastException-from-B-to-SerializableWritable-tp19815.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark on YARN - master role
Hi, In the Spark on YARN, the AM (driver) will ask the RM for resources. Once the resources are allocated by the RM, the AM will start the executors through the NM. This is my understanding. But, according to the Spark documentation (1), the `spark.yarn.applicationMaster.waitTries` properties specifies `Set the number of times the ApplicationMaster waits for the the Spark master and then also the number of tries it waits for the SparkContext to be initialized.`. What is the significance of master in the Spark on YARN configuration? (1) - http://spark.apache.org/docs/latest/running-on-yarn.html Thanks, Praveen
Re: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD
Hi Tri, setIntercept() is not a member function of StreamingLinearRegressionWithSGD, it's a member function of LinearRegressionWithSGD(GeneralizedLinearAlgorithm) which is a member variable(named algorithm) of StreamingLinearRegressionWithSGD. So you need to change your code to: val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) .algorithm.setIntercept(true) Thanks Yanbo 2014-11-25 23:51 GMT+08:00 Bui, Tri tri@verizonwireless.com.invalid: Thanks Liang! It was my bad, I fat finger one of the data point, correct it and the result match with yours. I am still not able to get the intercept. I am getting [error] /data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:47: value setIntercept mber of org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD I try code below: val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) model.setIntercept(addIntercept = true).trainOn(trainingData) and: val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) .setIntercept(true) But still get compilation error. Thanks Tri *From:* Yanbo Liang [mailto:yanboha...@gmail.com] *Sent:* Tuesday, November 25, 2014 4:08 AM *To:* Bui, Tri *Cc:* user@spark.apache.org *Subject:* Re: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD The case run correctly in my environment. 14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Model updated at time 141690890 ms 14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Current model: weights, [0.8588] Can you provide more detail information if it is convenience? Turn on the intercept value can be set as following: val model = new StreamingLinearRegressionWithSGD() .algorithm.setIntercept(true) 2014-11-25 3:31 GMT+08:00 Bui, Tri tri@verizonwireless.com.invalid: Hi, I am getting incorrect weights model from StreamingLinearRegressionwith SGD. One feature Input data is: (1,[1]) (2,[2]) … . (20,[20]) The result from the Current model: weights is [-4.432]….which is not correct. Also, how do I turn on the intercept value for the StreamingLinearRegression ? Thanks Tri
Re: IDF model error
Hi Shivani, You misunderstand the parameter of SparseVector. class SparseVector( override val size: Int, val indices: Array[Int], val values: Array[Double]) extends Vector { } The first parameter is the total length of the Vector rather than the length of non-zero elements. So it need greater than the maximum non-zero element index which is 21 in your case. The following code can work: val doc1s = new IndexedRow(1L, new SSV(22, Array(1, 3, 5, 7),Array(1.0, 1.0, 0.0, 5.0))) val doc2s = new IndexedRow(2L, new SSV(22, Array(1, 2, 4, 13), Array(0.0, 1.0, 2.0, 0.0))) val doc3s = new IndexedRow(3L, new SSV(22, Array(10, 14, 20, 21),Array(2.0, 0.0, 2.0, 1.0))) val doc4s = new IndexedRow(4L, new SSV(22, Array(3, 7, 13, 20),Array(2.0, 0.0, 2.0, 1.0))) 2014-11-26 10:09 GMT+08:00 Shivani Rao raoshiv...@gmail.com: Hello Spark fans, I am trying to use the IDF model available in the spark mllib to create an tf-idf representation of a n RDD[Vectors]. Below i have attached my MWE I get the following error java.lang.IndexOutOfBoundsException: 7 not in [-4,4) at breeze.linalg.DenseVector.apply$mcI$sp(DenseVector.scala:70) at breeze.linalg.DenseVector.apply(DenseVector.scala:69) at org.apache.spark.mllib.feature.IDF$DocumentFrequencyAggregator.add(IDF.scala:81) Any ideas? Regards, Shivani import org.apache.spark.mllib.feature.VectorTransformer import com.box.analytics.ml.dms.vector.{SparkSparseVector,SparkDenseVector} import org.apache.spark.mllib.linalg.{DenseVector = SDV, SparseVector = SSV} import org.apache.spark.mllib.linalg.{Vector = SparkVector} import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix} import org.apache.spark.mllib.feature._ val doc1s = new IndexedRow(1L, new SSV(4, Array(1, 3, 5, 7),Array(1.0, 1.0, 0.0, 5.0))) val doc2s = new IndexedRow(2L, new SSV(4, Array(1, 2, 4, 13), Array(0.0, 1.0, 2.0, 0.0))) val doc3s = new IndexedRow(3L, new SSV(4, Array(10, 14, 20, 21),Array(2.0, 0.0, 2.0, 1.0))) val doc4s = new IndexedRow(4L, new SSV(4, Array(3, 7, 13, 20),Array(2.0, 0.0, 2.0, 1.0))) val indata = sc.parallelize(List(doc1s,doc2s,doc3s,doc4s)).map(e=e.vector) (new IDF()).fit(indata).idf -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
do not assemble the spark example jar
Hi, The spark assembly is time costly. If I only need the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need the spark-examples-1.1.0-hadoop2.3.0.jar. How to configure the spark to avoid assemble the example jar. I know *export SPARK_PREPEND_CLASSES=**true* method can reduce the assembly, but I do not develop locally. Any advice? -- *Best Wishes!*
Re: do not assemble the spark example jar
You can do sbt/sbt assembly/assembly to assemble only the main package. Matei On Nov 25, 2014, at 7:50 PM, lihu lihu...@gmail.com wrote: Hi, The spark assembly is time costly. If I only need the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need the spark-examples-1.1.0-hadoop2.3.0.jar. How to configure the spark to avoid assemble the example jar. I know export SPARK_PREPEND_CLASSES=true method can reduce the assembly, but I do not develop locally. Any advice? -- Best Wishes!
Re: do not assemble the spark example jar
BTW as another tip, it helps to keep the SBT console open as you make source changes (by just running sbt/sbt with no args). It's a lot faster the second time it builds something. Matei On Nov 25, 2014, at 8:31 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can do sbt/sbt assembly/assembly to assemble only the main package. Matei On Nov 25, 2014, at 7:50 PM, lihu lihu...@gmail.com mailto:lihu...@gmail.com wrote: Hi, The spark assembly is time costly. If I only need the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need the spark-examples-1.1.0-hadoop2.3.0.jar. How to configure the spark to avoid assemble the example jar. I know export SPARK_PREPEND_CLASSES=true method can reduce the assembly, but I do not develop locally. Any advice? -- Best Wishes!
Issue with Spark latest 1.2.0 build - ClassCastException from [B to SerializableWritable
Hello forum, We are using spark distro built from the source of latest 1.2.0 tag. And we are facing the below issue, while trying to act upon the JavaRDD instance, the stacktrace is given below. Can anyone please let me know, what can be wrong here? java.lang.ClassCastException: [B cannot be cast to org.apache.spark.SerializableWritable at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:138) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:419) at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:32) at org.quartz.core.JobRunShell.run(JobRunShell.java:202) at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) 2014-11-26 08:07:38,454 ERROR [DefaultQuartzScheduler_Worker-2] org.quartz.core.ErrorLogger Job (report_report.report_report threw an exception. org.quartz.SchedulerException: Job threw an unhandled exception. [See nested exception: java.lang.ClassCastException: [B cannot be cast to org.apache.spark.SerializableWritable] at org.quartz.core.JobRunShell.run(JobRunShell.java:213) at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.spark.SerializableWritable at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:138) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:419) at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:32) at org.quartz.core.JobRunShell.run(JobRunShell.java:202) ... 1 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-Spark-latest-1-2-0-build-ClassCastException-from-B-to-SerializableWritable-tp19824.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Determine number of running executors
Hi, Thanks for your help! Sandy, I had a bit of trouble finding the spark.executor.cores property. (It wasn't there although its value should have been 2.) I ended up throwing regular expressions on scala.util.Properties.propOrElse(sun.java.command, ), which worked surprisingly well ;-) Thanks Tobias
RE: beeline via spark thrift doesn't retain cache
Thanks Yanbo. My issue was 1) . I had spark thrift server setup, but it was running against hive instead of Spark SQL due a local change. After I fix this, beeline automatically caches rerun queries + accepts cache table. From: Yanbo Liang [mailto:yanboha...@gmail.com] Sent: Friday, November 21, 2014 12:42 AM To: Judy Nash Cc: u...@spark.incubator.apache.org Subject: Re: beeline via spark thrift doesn't retain cache 1) make sure your beeline client connected to Hiveserver2 of Spark SQL. You can found execution logs of Hiveserver2 in the environment of start-thriftserver.sh. 2) what about your scale of data. If cache with small data, it will take more time to schedule workload between different executors. Look the configuration of spark execution environment. Whether there are enough memory for RDD storage, if not, it will take some time to serialize/deserialize data between memory and disk. 2014-11-21 11:06 GMT+08:00 Judy Nash judyn...@exchange.microsoft.commailto:judyn...@exchange.microsoft.com: Hi friends, I have successfully setup thrift server and execute beeline on top. Beeline can handle select queries just fine, but it cannot seem to do any kind of caching/RDD operations. i.e. 1) Command “cache table” doesn’t work. See error: Error: Error while processing statement: FAILED: ParseException line 1:0 cannot recognize input near 'cache' 'table' 'hivesampletable' (state=42000,code=4) 2) Re-run SQL commands do not have any performance improvements. By comparison, Spark-SQL shell can execute “cache table” command and rerunning SQL command has a huge performance boost. Am I missing something or this is expected when execute through Spark thrift server? Thanks! Judy
Spark 1.1.0 and HBase: Snappy UnsatisfiedLinkError
Hi everyone, I deployed Spark 1.1.0 and I m trying to use it with spark-job-server 0.4.0 (https://github.com/ooyala/spark-jobserver). I previously used Spark 1.0.2 and had no problems with it. I want to use the newer version of Spark (and Spark SQL) to create the SchemaRDD programmatically. The CLASSPATH variable was properly setted because the following code works perfectly (from https://spark.apache.org/docs/1.1.0/sql-programming-guide.html https://spark.apache.org/docs/1.1.0/sql-programming-guide.html but with input form base table). But when I try to put this in the override def runJob(sc:SparkContext, jobConfig: Config): Any = ??? method, this not work. The exception is: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:320) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) This exception occurs at the line val peopleRows = new NewHadoopRDD” when try to read rows from HBase (0.98). I execute this code in both in Scala and Java. Any ideas?? From what could it depend? CODE // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create an RDD val people = sc.textFile(examples/src/main/resources/people.txt) // The schema is encoded in a string val schemaString = name age // Import Spark SQL data types and Row. import org.apache.spark.sql._ // Generate the schema based on the string of schema val schema = StructType( schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, “people) val peopleRows = new NewHadoopRDD(sc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) // Convert records of the RDD (people) to Rows. val rowRDD = peopleRows.map // create Rows (name,age) // Apply the schema to the RDD. val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema) // Register the SchemaRDD as a table. peopleSchemaRDD.registerTempTable(people) // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql(SELECT name FROM people) // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. results.map(t = Name: +
Accessing posterior probability of Naive Baye's prediction
Hi, I am trying to access the posterior probability of Naive Baye's prediction with MLlib using Java. As the member variables brzPi and brzTheta are private, I applied a hack to access the values through reflection. I am using Java and couldn't find a way to use the breeze library with Java. If I am correct the relevant calculation is given through line number 66 in NaiveBayesModel class, labels(brzArgmax(brzPi + brzTheta * testData.toBreeze)) Here the element-wise additions and multiplication of DenseVectors are given as operators which are not directly accessible in Java. Also, the use of brzArgmax is not very clear with Java for me. Can anyone please help me convert the above mentioned calculation from Scala to Java. PS: I have raised a improvement request on Jira for making these variables directly accessible from outside. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-posterior-probability-of-Naive-Baye-s-prediction-tp19828.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to do broadcast join in SparkSQL
Hi, Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support. I got the following exceptions: org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat at org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) Using the same DDL and Analyze script above. Jianshi On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: It works fine, thanks for the help Michael. Liancheng also told me a trick, using a subquery with LIMIT n. It works in latest 1.2.0 BTW, looks like the broadcast optimization won't be recognized if I do a left join instead of a inner join. Is that true? How can I make it work for left joins? Cheers, Jianshi On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust mich...@databricks.com wrote: Thanks for the input. We purposefully made sure that the config option did not make it into a release as it is not something that we are willing to support long term. That said we'll try and make this easier in the future either through hints or better support for statistics. In this particular case you can get what you want by registering the tables as external tables and setting an flag. Here's a helper function to do what you need. /** * Sugar for creating a Hive external table from a parquet path. */ def createParquetTable(name: String, file: String): Unit = { import org.apache.spark.sql.hive.HiveMetastoreTypes val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s${f.name} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl = s |CREATE EXTERNAL TABLE $name ( | $schema |) |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' |LOCATION '$file'.stripMargin sql(ddl) setConf(spark.sql.hive.convertMetastoreParquet, true) } You'll also need to run this to populate the statistics: ANALYZE TABLE tableName COMPUTE STATISTICS noscan; On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, currently there's cost-based optimization however Parquet statistics is not implemented... What's the good way if I want to join a big fact table with several tiny dimension tables in Spark SQL (1.1)? I wish we can allow user hint for the join. Jianshi On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged into master? I cannot find spark.sql.hints.broadcastTables in latest master, but it's in the following patch. https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 Jianshi On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Yes, looks like it can only be controlled by the parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird to me. How am I suppose to know the exact bytes of a table? Let me specify the join algorithm is preferred I think. Jianshi On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at SPARK-1800 ? e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala Cheers On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I cannot find it in the documentation. And I have a dozen dimension tables to (left) join... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to do broadcast join in SparkSQL
Oh, I found a explanation from http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/ The error here is a bit misleading, what it really means is that the class parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive. Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as parquet-hive-1.0.jar Is it removed from latest Spark? Jianshi On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support. I got the following exceptions: org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat at org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) Using the same DDL and Analyze script above. Jianshi On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: It works fine, thanks for the help Michael. Liancheng also told me a trick, using a subquery with LIMIT n. It works in latest 1.2.0 BTW, looks like the broadcast optimization won't be recognized if I do a left join instead of a inner join. Is that true? How can I make it work for left joins? Cheers, Jianshi On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust mich...@databricks.com wrote: Thanks for the input. We purposefully made sure that the config option did not make it into a release as it is not something that we are willing to support long term. That said we'll try and make this easier in the future either through hints or better support for statistics. In this particular case you can get what you want by registering the tables as external tables and setting an flag. Here's a helper function to do what you need. /** * Sugar for creating a Hive external table from a parquet path. */ def createParquetTable(name: String, file: String): Unit = { import org.apache.spark.sql.hive.HiveMetastoreTypes val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s${f.name} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl = s |CREATE EXTERNAL TABLE $name ( | $schema |) |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' |LOCATION '$file'.stripMargin sql(ddl) setConf(spark.sql.hive.convertMetastoreParquet, true) } You'll also need to run this to populate the statistics: ANALYZE TABLE tableName COMPUTE STATISTICS noscan; On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, currently there's cost-based optimization however Parquet statistics is not implemented... What's the good way if I want to join a big fact table with several tiny dimension tables in Spark SQL (1.1)? I wish we can allow user hint for the join. Jianshi On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged into master? I cannot find spark.sql.hints.broadcastTables in latest master, but it's in the following patch. https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 Jianshi On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Yes, looks like it can only be controlled by the parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird to me. How am I suppose to know the exact bytes of a table? Let me specify the join algorithm is preferred I think. Jianshi On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at SPARK-1800 ? e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala Cheers On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I cannot find it in the documentation. And I have a dozen dimension tables to (left) join... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi
Re: k-means clustering
Pre-processing is major workload before training model. MLlib provide TD-IDF calculation, StandardScaler and Normalizer which is essential for preprocessing and would be great help to the model training. Take a look at this http://spark.apache.org/docs/latest/mllib-feature-extraction.html 2014-11-21 7:18 GMT+08:00 Jun Yang yangjun...@gmail.com: Guys, As to the questions of pre-processing, you could just migrate your logic to Spark before using K-means. I only used Scala on Spark, and haven't used Python binding on Spark, but I think the basic steps must be the same. BTW, if your data set is big with huge sparse dimension feature vector, K-Means may not works as good as you expected. And I think this is still the optimization direction of Spark MLLib. On Wed, Nov 19, 2014 at 2:21 PM, amin mohebbi aminn_...@yahoo.com.invalid wrote: Hi there, I would like to do text clustering using k-means and Spark on a massive dataset. As you know, before running the k-means, I have to do pre-processing methods such as TFIDF and NLTK on my big dataset. The following is my code in python : if __name__ == '__main__': # Cluster a bunch of text documents. import re import sys k = 6 vocab = {} xs = [] ns=[] cat=[] filename='2013-01.csv' with open(filename, newline='') as f: try: newsreader = csv.reader(f) for row in newsreader: ns.append(row[3]) cat.append(row[4]) except csv.Error as e: sys.exit('file %s, line %d: %s' % (filename, newsreader.line_num, e)) remove_spl_char_regex = re.compile('[%s]' % re.escape(string.punctuation)) # regex to remove special characters remove_num = re.compile('[\d]+') #nltk.download() stop_words= nltk.corpus.stopwords.words('english') for a in ns: x = defaultdict(float ) a1 = a.strip().lower() a2 = remove_spl_char_regex.sub( ,a1) # Remove special characters a3 = remove_num.sub(, a2) #Remove numbers #Remove stop words words = a3.split() filter_stop_words = [w for w in words if not w in stop_words] stemed = [PorterStemmer().stem_word(w) for w in filter_stop_words] ws=sorted(stemed) #ws=re.findall(r\w+, a1) for w in ws: vocab.setdefault(w, len(vocab)) x[vocab[w]] += 1 xs.append(x.items()) Can anyone explain to me how can I do the pre-processing step, before running the k-means using spark. Best Regards ... Amin Mohebbi PhD candidate in Software Engineering at university of Malaysia Tel : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my amin_...@me.com -- yangjun...@gmail.com http://hi.baidu.com/yjpro
RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
Looks like a config issue. I ran spark-pi job and still failing with the same guava error Command ran: .\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit --class org.apache.spark.examples.SparkPi --master spark://headnodehost:7077 --executor-memory 1G --num-executors 1 .\lib\spark-examples-1.2.1-SNAPSHOT-hadoop2.4.0.jar 100 Had used the same build steps on spark 1.1 and had no issue. From: Denny Lee [mailto:denny.g@gmail.com] Sent: Tuesday, November 25, 2014 5:47 PM To: Judy Nash; Cheng Lian; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava To determine if this is a Windows vs. other configuration, can you just try to call the Spark-class.cmd SparkSubmit without actually referencing the Hadoop or Thrift server classes? On Tue Nov 25 2014 at 5:42:09 PM Judy Nash judyn...@exchange.microsoft.commailto:judyn...@exchange.microsoft.com wrote: I traced the code and used the following to call: Spark-class.cmd org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 spark-internal --hiveconf hive.server2.thrift.port=1 The issue ended up to be much more fundamental however. Spark doesn’t work at all in configuration below. When open spark-shell, it fails with the same ClassNotFound error. Now I wonder if this is a windows-only issue or the hive/Hadoop configuration that is having this problem. From: Cheng Lian [mailto:lian.cs@gmail.commailto:lian.cs@gmail.com] Sent: Tuesday, November 25, 2014 1:50 AM To: Judy Nash; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Oh so you're using Windows. What command are you using to start the Thrift server then? On 11/25/14 4:25 PM, Judy Nash wrote: Made progress but still blocked. After recompiling the code on cmd instead of PowerShell, now I can see all 5 classes as you mentioned. However I am still seeing the same error as before. Anything else I can check for? From: Judy Nash [mailto:judyn...@exchange.microsoft.com] Sent: Monday, November 24, 2014 11:50 PM To: Cheng Lian; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava This is what I got from jar tf: org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class I seem to have the line that reported missing, but I am missing this file: com/google/inject/internal/util/$Preconditions.class Any suggestion on how to fix this? Very much appreciate the help as I am very new to Spark and open source technologies. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Monday, November 24, 2014 8:24 PM To: Judy Nash; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hm, I tried exactly the same commit and the build command locally, but couldn’t reproduce this. Usually this kind of errors are caused by classpath misconfiguration. Could you please try this to ensure corresponding Guava classes are included in the assembly jar you built? jar tf assembly/target/scala-2.10/spark-assembly-1.2.1-SNAPSHOT-hadoop2.4.0.jar | grep Preconditions On my machine I got these lines (the first line is the one reported as missing in your case): org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class com/google/inject/internal/util/$Preconditions.class On 11/25/14 6:25 AM, Judy Nash wrote: Thank you Cheng for responding. Here is the commit SHA1 on the 1.2 branch I saw this failure in: commit 6f70e0295572e3037660004797040e026e440dbd Author: zsxwing zsxw...@gmail.commailto:zsxw...@gmail.com Date: Fri Nov 21 00:42:43 2014 -0800 [SPARK-4472][Shell] Print Spark context available as sc. only when SparkContext is created... ... successfully It's weird that printing Spark context available as sc when creating SparkContext unsuccessfully. Let me know if you need anything else. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Friday, November 21, 2014 8:02 PM To: Judy Nash; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hi Judy, could you please provide the commit SHA1 of the version you're using? Thanks! On 11/22/14 11:05 AM, Judy Nash wrote: Hi, Thrift server is failing to start for me on latest spark 1.2 branch. I got the error below when I start thrift server. Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions
Spark setup on local windows machine
Hi All, I just installed a spark on my laptop and trying to get spark-shell to work. Here is the error I see: C:\spark\binspark-shell Exception in thread main java.util.NoSuchElementException: key not found: CLAS SPATH at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm itDriverBootstrapper.scala:49) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi tDriverBootstrapper.scala) The classpath seems to be right: C:\spark\bincompute-classpath.cmd ;;C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar; ;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucle us-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar Manually exporting the classpath to include the assembly jar doesnt help either. What could be wrong with this installation? Scala and SBT are installed, in path and are working fine. Appreciate your help. regards Sunita
configure to run multiple tasks on a core
I'm running a spark-ec2 cluster. I have a map task that calls a specialized C++ external app. The app doesn't fully utilize the core as it needs to download/upload data as part of the task. Looking at the worker nodes, it appears that there is one task with my app running per core. I'd like to better utilize the cpu resources with the hope of increasing throughput by running multiple tasks (with my app) per core in parallel. I see there is a spark.task.cpus config setting with a default value of 1. It appears though that this is used to go the other way than what I am looking for. Is there a way where I can specify multiple tasks per core rather than multiple cores per task? thanks for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/configure-to-run-multiple-tasks-on-a-core-tp19834.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Lifecycle of RDD in spark-streaming
Any pointers guys? On Tue, Nov 25, 2014 at 5:32 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hey Experts, I wanted to understand in detail about the lifecycle of rdd(s) in a streaming app. From my current understanding - rdd gets created out of the realtime input stream. - Transform(s) functions are applied in a lazy fashion on the RDD to transform into another rdd(s). - Actions are taken on the final transformed rdds to get the data out of the system. Also rdd(s) are stored in the clusters RAM (disc if configured so) and are cleaned in LRU fashion. So I have the following questions on the same. - How spark (streaming) guarantees that all the actions are taken on each input rdd/batch. - How does spark determines that the life-cycle of a rdd is complete. Is there any chance that a RDD will be cleaned out of ram before all actions are taken on them? Thanks in advance for all your help. Also, I'm relatively new to scala spark so pardon me in case these are naive questions/assumptions. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: Submitting job from local to EC2 cluster
Yes, it is possible to submit jobs to a remote spark cluster. Just make sure you follow the below steps. 1. Set spark.driver.host to your local ip (Where you runs your code, and it should be accessible from the cluster) 2. Make sure no firewall/router configurations are blocking/filtering the connection between your windows machine and the cluster. Best way to test would be to ping the windows machine's public ip from your cluster. (And if the pinging is working, then make sure you are portforwaring the required ports) 3. Also set spark.driver.port if you don't want to open up all the ports on your windows machine (default is random, so stick to one port) Thanks Best Regards On Wed, Nov 26, 2014 at 5:49 AM, Yingkai Hu yingka...@gmail.com wrote: Hi All, I have spark deployed to an EC2 cluster and were able to run jobs successfully when drive is reside within the cluster. However, job was killed when I tried to submit it from local. My guess is spark cluster can’t open connection back to the driver since it is on my machine. I’m wondering if spark actually support submitting jobs from local? If so, would you please advise? Many thanks in advance! YK - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark setup on local windows machine
You could try following this guidelines http://docs.sigmoidanalytics.com/index.php/How_to_build_SPARK_on_Windows Thanks Best Regards On Wed, Nov 26, 2014 at 12:24 PM, Sunita Arvind sunitarv...@gmail.com wrote: Hi All, I just installed a spark on my laptop and trying to get spark-shell to work. Here is the error I see: C:\spark\binspark-shell Exception in thread main java.util.NoSuchElementException: key not found: CLAS SPATH at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm itDriverBootstrapper.scala:49) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi tDriverBootstrapper.scala) The classpath seems to be right: C:\spark\bincompute-classpath.cmd ;;C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar; ;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucle us-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar Manually exporting the classpath to include the assembly jar doesnt help either. What could be wrong with this installation? Scala and SBT are installed, in path and are working fine. Appreciate your help. regards Sunita
Re: do not assemble the spark example jar
Mater, thank you very much! After take your advice, the time for assembly from about 20min down to 6min in my computer. that's a very big improvement. On Wed, Nov 26, 2014 at 12:32 PM, Matei Zaharia matei.zaha...@gmail.com wrote: BTW as another tip, it helps to keep the SBT console open as you make source changes (by just running sbt/sbt with no args). It's a lot faster the second time it builds something. Matei On Nov 25, 2014, at 8:31 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can do sbt/sbt assembly/assembly to assemble only the main package. Matei On Nov 25, 2014, at 7:50 PM, lihu lihu...@gmail.com wrote: Hi, The spark assembly is time costly. If I only need the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need the spark-examples-1.1.0-hadoop2.3.0.jar. How to configure the spark to avoid assemble the example jar. I know *export SPARK_PREPEND_CLASSES=* *true* method can reduce the assembly, but I do not develop locally. Any advice? -- *Best Wishes!* -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/)* *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/*
Re: do not assemble the spark example jar
Matei, sorry for my last typo error. And the tip can improve about 30s in my computer. On Wed, Nov 26, 2014 at 3:34 PM, lihu lihu...@gmail.com wrote: Mater, thank you very much! After take your advice, the time for assembly from about 20min down to 6min in my computer. that's a very big improvement. On Wed, Nov 26, 2014 at 12:32 PM, Matei Zaharia matei.zaha...@gmail.com wrote: BTW as another tip, it helps to keep the SBT console open as you make source changes (by just running sbt/sbt with no args). It's a lot faster the second time it builds something. Matei On Nov 25, 2014, at 8:31 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can do sbt/sbt assembly/assembly to assemble only the main package. Matei On Nov 25, 2014, at 7:50 PM, lihu lihu...@gmail.com wrote: Hi, The spark assembly is time costly. If I only need the spark-assembly-1.1.0-hadoop2.3.0.jar, do not need the spark-examples-1.1.0-hadoop2.3.0.jar. How to configure the spark to avoid assemble the example jar. I know *export SPARK_PREPEND_CLASSES=* *true* method can reduce the assembly, but I do not develop locally. Any advice? -- *Best Wishes!* -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/)* *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/* -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/)* *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/*
Re: Spark setup on local windows machine
Hi Sunita, This gitbook may also be useful for you to get Spark running in local mode on your Windows machine: http://blueplastic.gitbooks.io/how-to-light-your-spark-on-a-stick/content/ On Tue, Nov 25, 2014 at 11:09 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You could try following this guidelines http://docs.sigmoidanalytics.com/index.php/How_to_build_SPARK_on_Windows Thanks Best Regards On Wed, Nov 26, 2014 at 12:24 PM, Sunita Arvind sunitarv...@gmail.com wrote: Hi All, I just installed a spark on my laptop and trying to get spark-shell to work. Here is the error I see: C:\spark\binspark-shell Exception in thread main java.util.NoSuchElementException: key not found: CLAS SPATH at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm itDriverBootstrapper.scala:49) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi tDriverBootstrapper.scala) The classpath seems to be right: C:\spark\bincompute-classpath.cmd ;;C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar; ;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucle us-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar Manually exporting the classpath to include the assembly jar doesnt help either. What could be wrong with this installation? Scala and SBT are installed, in path and are working fine. Appreciate your help. regards Sunita