Re: spark - reading hfds files every 5 minutes
Spark Streaming https://spark.apache.org/docs/latest/streaming-programming-guide.html is the best fit for this use case. Basically you create a streaming context pointing to that directory, also you can set the streaming interval (in your case its 5 minutes). SparkStreaming will only process the new files which has not been processed already. At the end of every operation, you could possibly move the processed files to another directory just in case if the application crashes you don't want it to process everything from the beginning. Example: Prints the contents of the files in the hdfs directory /sigmoid val ssc = new StreamingContext(spark://akhldz:7077,Streaming Job,*Seconds(300)* ,/home/akhld/mobi/spark-streaming/spark-0.8.0-incubating,List(target/scala-2.9.3/simple-project_2.9.3-1.0.jar)) *val logData = ssc.textFileStream*(hdfs://127.0.0.1:54310/sigmoid/) logData.print() Thanks Best Regards On Tue, Aug 19, 2014 at 2:53 AM, salemi alireza.sal...@udo.edu wrote: Hi, Mine data source stores the incoming data every 10 second to hdfs. The naming convention save-timestamp.csv (see below) drwxr-xr-x ali supergroup 0 B 0 0 B save-1408396065000.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-140839607.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-1408396075000.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-140839608.csv I would like to periodically (every 5min) read the files and process them. is there a good example out there how to implement this? How do I know what part of the data I have already processed? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-reading-hfds-files-every-5-minutes-tp12325.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem in running a job on more than one workers
Hello, I am trying to run a job on two workers. I have cluster of 3 computers where one is the master and the other two are workers. I am able to successfully register the separate physical machines as workers in the cluster. When I run a job with a single worker connected, it runs successfully and calculates the value of Pi. (I am running the scala program SparkPi). But when I connect two workers and run the same program it fails saying 'Master removed our application : FAILED'. I have the same user account on all computers and the installation of Spark is also in the same location on all computers. The version of Spark is : 1.0.0 (prebuilt Hadoop1) on all computers Operating System : Ubuntu 12.04 LTS *-* *Following is the master log : * mpiuser@ashwini-pc:~/spark-1.0.0-bin-hadoop1$ sudo ./bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://10.0.0.2:7077 \--deploy-mode client \file:///home/mpiuser/spark-1.0.0-bin-hadoop1/lib/spark-examples-1.0.0-hadoop1.0.4.jar \--verbose 1 Spark assembly has been built with Hive, including Datanucleus jars on classpath Using properties file: /home/mpiuser/spark-1.0.0-bin-hadoop1/conf/spark-defaults.conf Adding default property: spark.master=spark://10.0.0.2:7077 Using properties file: /home/mpiuser/spark-1.0.0-bin-hadoop1/conf/spark-defaults.conf Adding default property: spark.master=spark://10.0.0.2:7077 Parsed arguments: master spark://10.0.0.2:7077 deployMode client executorMemory null executorCores null totalExecutorCores null propertiesFile /home/mpiuser/spark-1.0.0-bin-hadoop1/conf/spark-defaults.conf driverMemorynull driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutorsnull files null pyFiles null archivesnull mainClass org.apache.spark.examples.SparkPi primaryResource file:///home/mpiuser/spark-1.0.0-bin-hadoop1/lib/spark-examples-1.0.0-hadoop1.0.4.jar nameorg.apache.spark.examples.SparkPi childArgs [1] jarsnull verbose true Default properties from /home/mpiuser/spark-1.0.0-bin-hadoop1/conf/spark-defaults.conf: spark.master - spark://10.0.0.2:7077 Using properties file: /home/mpiuser/spark-1.0.0-bin-hadoop1/conf/spark-defaults.conf Adding default property: spark.master=spark://10.0.0.2:7077 Main class: org.apache.spark.examples.SparkPi Arguments: 1 System properties: SPARK_SUBMIT - true spark.app.name - org.apache.spark.examples.SparkPi spark.jars - file:///home/mpiuser/spark-1.0.0-bin-hadoop1/lib/spark-examples-1.0.0-hadoop1.0.4.jar spark.master - spark://10.0.0.2:7077 Classpath elements: file:///home/mpiuser/spark-1.0.0-bin-hadoop1/lib/spark-examples-1.0.0-hadoop1.0.4.jar 14/08/19 10:59:41 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/08/19 10:59:41 INFO SecurityManager: Changing view acls to: root 14/08/19 10:59:41 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/08/19 10:59:42 INFO Slf4jLogger: Slf4jLogger started 14/08/19 10:59:42 INFO Remoting: Starting remoting 14/08/19 10:59:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@ashwini-pc:34438] 14/08/19 10:59:42 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@ashwini-pc:34438] 14/08/19 10:59:42 INFO SparkEnv: Registering MapOutputTracker 14/08/19 10:59:42 INFO SparkEnv: Registering BlockManagerMaster 14/08/19 10:59:42 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140819105942-1287 14/08/19 10:59:42 INFO MemoryStore: MemoryStore started with capacity 294.6 MB. 14/08/19 10:59:42 INFO ConnectionManager: Bound socket to port 32777 with id = ConnectionManagerId(ashwini-pc,32777) 14/08/19 10:59:42 INFO BlockManagerMaster: Trying to register BlockManager 14/08/19 10:59:42 INFO BlockManagerInfo: Registering block manager ashwini-pc:32777 with 294.6 MB RAM 14/08/19 10:59:42 INFO BlockManagerMaster: Registered BlockManager 14/08/19 10:59:42 INFO HttpServer: Starting HTTP Server 14/08/19 10:59:42 INFO HttpBroadcast: Broadcast server started at http://10.0.0.2:49024 14/08/19 10:59:42 INFO HttpFileServer: HTTP File server directory is /tmp/spark-041bb9af-0703-4505-8c9e-cf355f857692 14/08/19 10:59:42 INFO HttpServer: Starting HTTP Server 14/08/19 10:59:48 INFO SparkUI: Started SparkUI at http://ashwini-pc:4040 14/08/19
Naive Bayes
I'm trying Naive Bayes classifier for Higg Boson challenge on Kaggle: http://www.kaggle.com/c/higgs-boson Here's the source code I'm working on: https://github.com/dnprock/SparkHiggBoson/blob/master/src/main/scala/KaggleHiggBosonLabel.scala Training data looks like this: 10,138.47,51.655,97.827,27.98,0.91,124.711,2.666,3.064,41.928,197.76,1.582,1.396,0.2,32.638,1.017,0.381,51.626,2.273,-2.414,16.824,-0.277,258.733,2,67.435,2.15,0.444,46.062,1.24,-2.475,113.497,s 11,160.937,68.768,103.235,48.146,-999,-999,-999,3.473,2.078,125.157,0.879,1.414,-999,42.014,2.039,-3.011,36.918,0.501,0.103,44.704,-1.916,164.546,1,46.226,0.725,1.158,-999,-999,-999,46.226,b My problem is Naive Bayes classifier always outputs 0 (for b) for all test data. I appreciate any help. -- Phuoc Do https://vida.io/dnprock
Performance problem on collect
Hi all, I’m totally newbie on Spark, so my question may be a dumb one. I tried Spark to compute values, on this side all works perfectly (and it's fast :) ). At the end of the process, I have an RDD with Key(String)/Values(Array of String), on this I want to get only one entry like this : myRdd.filter(t = t._1.equals(param)) If I make a collect to get the only « tuple » , It takes about 12 seconds to execute, I imagine that’s because Spark may be used differently... Best regards, Emmanuel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Naive Bayes
What is the ratio of examples labeled `s` to those labeled `b`? Also, Naive Bayes doesn't work on negative feature values. It assumes term frequencies as the input. We should throw an exception on negative feature values. -Xiangrui On Tue, Aug 19, 2014 at 12:07 AM, Phuoc Do phu...@vida.io wrote: I'm trying Naive Bayes classifier for Higg Boson challenge on Kaggle: http://www.kaggle.com/c/higgs-boson Here's the source code I'm working on: https://github.com/dnprock/SparkHiggBoson/blob/master/src/main/scala/KaggleHiggBosonLabel.scala Training data looks like this: 10,138.47,51.655,97.827,27.98,0.91,124.711,2.666,3.064,41.928,197.76,1.582,1.396,0.2,32.638,1.017,0.381,51.626,2.273,-2.414,16.824,-0.277,258.733,2,67.435,2.15,0.444,46.062,1.24,-2.475,113.497,s 11,160.937,68.768,103.235,48.146,-999,-999,-999,3.473,2.078,125.157,0.879,1.414,-999,42.014,2.039,-3.011,36.918,0.501,0.103,44.704,-1.916,164.546,1,46.226,0.725,1.158,-999,-999,-999,46.226,b My problem is Naive Bayes classifier always outputs 0 (for b) for all test data. I appreciate any help. -- Phuoc Do https://vida.io/dnprock - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Performance problem on collect
You can use the function lookup() to accomplish this too; it may be a bit faster. It will never be efficient like a database lookup since this is implemented by scanning through all of the data. There is no index or anything. On Tue, Aug 19, 2014 at 8:43 AM, Emmanuel Castanier emmanuel.castan...@gmail.com wrote: Hi all, I’m totally newbie on Spark, so my question may be a dumb one. I tried Spark to compute values, on this side all works perfectly (and it's fast :) ). At the end of the process, I have an RDD with Key(String)/Values(Array of String), on this I want to get only one entry like this : myRdd.filter(t = t._1.equals(param)) If I make a collect to get the only « tuple » , It takes about 12 seconds to execute, I imagine that’s because Spark may be used differently... Best regards, Emmanuel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem in running a job on more than one workers
Hello, On the web UI of the master even though there are two workers shown, there is only one executor. There is an executor for machine1 but no executor for machine2. Hence if only machine1 is added as a worker the program runs but if only machine2 is added, it fails with the same error 'Master removed our application : FAILED'. How can I give the master more than one executors? I tried searching and at one place it says I should have more than one executors to run the job in parallel but I seem to have only one executor.How to get more executors? Thankyou. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-running-a-job-on-more-than-one-workers-tp12361p12368.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Performance problem on collect
Thanks for your answer. In my case, that’s sad cause we have only 60 entries in the final RDD, I was thinking it will be fast to get the needed one. Le 19 août 2014 à 09:58, Sean Owen so...@cloudera.com a écrit : You can use the function lookup() to accomplish this too; it may be a bit faster. It will never be efficient like a database lookup since this is implemented by scanning through all of the data. There is no index or anything. On Tue, Aug 19, 2014 at 8:43 AM, Emmanuel Castanier emmanuel.castan...@gmail.com wrote: Hi all, I’m totally newbie on Spark, so my question may be a dumb one. I tried Spark to compute values, on this side all works perfectly (and it's fast :) ). At the end of the process, I have an RDD with Key(String)/Values(Array of String), on this I want to get only one entry like this : myRdd.filter(t = t._1.equals(param)) If I make a collect to get the only « tuple » , It takes about 12 seconds to execute, I imagine that’s because Spark may be used differently... Best regards, Emmanuel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Processing multiple files in parallel
sc.textFile already returns just one RDD for all of your files. The sc.union is unnecessary, although I don't know if it's adding any overhead. The data is certainly processed in parallel and how it is parallelized depends on where the data is -- how many InputSplits Hadoop produces for them. If you're willing to tolerate a little bit of approximation, use countApproxDistinctByKey instead of a groupBy and map. You can set relativeSD to trade off speed and accuracy. If not, you can probably do better than collecting all of the keys and then making a set. You can use aggregateByKey to build up a Set in the first place. On Tue, Aug 19, 2014 at 2:14 AM, SK skrishna...@gmail.com wrote: Hi, I have a piece of code that reads all the (csv) files in a folder. For each file, it parses each line, extracts the first 2 elements from each row of the file, groups the tuple by the key and finally outputs the number of unique values for each key. val conf = new SparkConf().setAppName(App) val sc = new SparkContext(conf) val user_time = sc.union(sc.textFile(/directory/*))// union of all files in the directory .map(line = { val fields = line.split(,) (fields(1), fields(0)) // extract first 2 elements }) .groupByKey // group by timestamp .map(g= (g._1, g._2.toSet.size)) // get the number of unique ids per timestamp I have a lot of files in the directory (several hundreds). The program takes a long time. I am not sure if the union operation is preventing the files from being processed in parallel. Is there a better way to parallelize the above code ? For example, the first two operations (reading each file and extracting the first 2 columns from each file) can be done in parallel, but I am not sure if that is how Spark schedules the above code. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-multiple-files-in-parallel-tp12336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Performance problem on collect
In that case, why not collectAsMap() and have the whole result as a simple Map in memory? then lookups are trivial. RDDs aren't distributed maps. On Tue, Aug 19, 2014 at 9:17 AM, Emmanuel Castanier emmanuel.castan...@gmail.com wrote: Thanks for your answer. In my case, that’s sad cause we have only 60 entries in the final RDD, I was thinking it will be fast to get the needed one. Le 19 août 2014 à 09:58, Sean Owen so...@cloudera.com a écrit : You can use the function lookup() to accomplish this too; it may be a bit faster. It will never be efficient like a database lookup since this is implemented by scanning through all of the data. There is no index or anything. On Tue, Aug 19, 2014 at 8:43 AM, Emmanuel Castanier emmanuel.castan...@gmail.com wrote: Hi all, I’m totally newbie on Spark, so my question may be a dumb one. I tried Spark to compute values, on this side all works perfectly (and it's fast :) ). At the end of the process, I have an RDD with Key(String)/Values(Array of String), on this I want to get only one entry like this : myRdd.filter(t = t._1.equals(param)) If I make a collect to get the only « tuple » , It takes about 12 seconds to execute, I imagine that’s because Spark may be used differently... Best regards, Emmanuel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Cannot run program Rscript using SparkR
Thanks Shivaram, This was the issue. Now I have installed Rscript on all the nodes in Spark cluster and it works now bith from script as well as R prompt. Thanks Stuti Awasthi From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu] Sent: Tuesday, August 19, 2014 1:17 PM To: Stuti Awasthi Cc: user@spark.apache.org Subject: Re: Cannot run program Rscript using SparkR Hi Stuti Could you check if Rscript is installed on all of the worker machines in the Spark cluster ? You can ssh into the machines and check if Rscript can be found in $PATH. Thanks Shivaram On Mon, Aug 18, 2014 at 10:05 PM, Stuti Awasthi stutiawas...@hcl.commailto:stutiawas...@hcl.com wrote: Hi All, I am using R 3.1 and Spark 0.9 and installed SparkR successfully. Now when I execute the “pi.R” example using spark master as local, then script executes fine. But when I try to execute same example using master as spark cluster master, then in throws Rcript error. Error : java.io.IOException: Cannot run program Rscript: java.io.IOException: error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:475) at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:113) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) I have checked, Rscript is present in my system and I also exported it in CLASSPATH and PATH variables. Script is given the permission 777 as there are multiple users of the clusters. $ which Rscript /usr/local/bin/Rscript $ type -a Rscript Rscript is /usr/local/bin/Rscript $ echo $PATH /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/var/local/maven3/bin/:/var/local/ant/bin/:/usr/lib/jvm/java-6-openjdk:/usr/local/bin/Rscript $ echo $CLASSPATH :/usr/local/bin/Rscript Also I am getting same error if I open R prompt and then execute the commands one after another OR if I execute the script. Please suggest Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: type issue: found RDD[T] expected RDD[A]
Hi Amit, I think the type of the data contained in your RDD needs to be a known case class and not abstract for createSchemaRDD. This makes sense when you think it needs to know about the fields in the object to create the schema. I had the same issue when I used an abstract base class for a collection of types I had. Best regards, Patrick On 6 August 2014 07:58, Amit Kumar kumarami...@gmail.com wrote: Hi All, I am having some trouble trying to write generic code that uses sqlContext and RDDs. Can you suggest what might be wrong? class SparkTable[T : ClassTag](val sqlContext:SQLContext, val extractor: (String) = (T) ) { private[this] var location:Option[String] =None private[this] var name:Option[String]=None private[this] val sc = sqlContext.sparkContext ... def makeRDD(sqlQuery:String):SchemaRDD={ require(this.location!=None) require(this.name!=None) import sqlContext._ val rdd:RDD[String] = sc.textFile(this.location.get) val rddT:RDD[T] = rdd.map(extractor) val schemaRDD:SchemaRDD= createSchemaRDD(rddT) schemaRDD.registerAsTable(name.get) val all = sqlContext.sql(sqlQuery) all } } I use it as below: def extractor(line:String):POJO={ val splits= line.split(pattern).toList POJO(splits(0),splits(1),splits(2),splits(3)) } val pojoTable:SparkTable[POJO] = new SparkTable[POJO](sqlContext,extractor) val identityData:SchemaRDD= pojoTable.atLocation(hdfs://location/table) .withName(pojo) .makeRDD(SELECT * FROM pojo) I get compilation failure inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A : Product] [error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT) [error] ^ [error] SparkTable.scala:37: type mismatch; [error] found : org.apache.spark.rdd.RDD[T] [error] required: org.apache.spark.rdd.RDD[A] [error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT) [error] ^ [error] two errors found I am probably missing something basic either in scala reflection/types or implicits? Any hints would be appreciated. Thanks Amit
Partitioning under Spark 1.0.x
Hi guys, I want to create two RDD[(K, V)] objects and then collocate partitions with the same K on one node. When the same partitioner for two RDDs is used, partitions with the same K end up being on different nodes. Here is a small example that illustrates this: // Let's say I have 10 nodes val partitioner = new HashPartitioner(10) // Create RDD val rdd = sc.parallelize(0 until 10).map(k = (k, computeValue(k))) // Partition twice using the same partitioner rdd.partitionBy(partitioner).foreach { case (k, v) = println(Dummy1 - k = + k) } rdd.partitionBy(partitioner).foreach { case (k, v) = println(Dummy2 - k = + k) } The output on one node is: Dummy1 - k = 2 Dummy2 - k = 7 I was expecting to see the same keys on each node. That was happening under Spark 0.9.2, but not under Spark 1.0.x. Anyone has an idea what has changed in the meantime? Or how to get corresponding partitions on one node? Thanks, Milos -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partitioning-under-Spark-1-0-x-tp12375.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark error when distinct on more than one cloume
sql:SELECT app_id,COUNT(DISTINCT app_id, macaddr) cut from object group by app_id Error Log 14/08/19 17:58:26 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 158.6 KB, free 294.7 MB) Exception in thread main java.lang.RuntimeException: [1.36] failure: ``)'' expected but `,' found SELECT app_id,COUNT(DISTINCT app_id, macaddr) cut from object group by app_id ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:47) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:70) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:150) at com.testbird.sparkapi.APILearn1$.main(APILearn1.scala:30) at com.testbird.sparkapi.APILearn1.main(APILearn1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) any one can help me , thanks
Executor Memory, Task hangs
Hi all, I'm doing some testing on a small dataset (HadoopRDD, 2GB, ~10M records), with a cluster of 3 nodes Simple calculations like count take approximately 5s when using the default value of executor.memory (512MB). When I scale this up to 2GB, several Tasks take 1m or more (while most still are 1s), and tasks hang indefinitely if I set it to 4GB or higher. While these worker nodes aren't very powerful, they seem to have enough RAM to handle this: Running 'free –m' shows I have 7GB free on each worker. Any tips on why these jobs would hang when given more available RAM? Thanks Ben The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Executor Memory, Task hangs
Looks like 1 worker is doing the job. Can you repartition the RDD? Also what is the number of cores that you allocated? Things like this, you can easily identify by looking at the workers webUI (default worker:8081) Thanks Best Regards On Tue, Aug 19, 2014 at 6:35 PM, Laird, Benjamin benjamin.la...@capitalone.com wrote: Hi all, I'm doing some testing on a small dataset (HadoopRDD, 2GB, ~10M records), with a cluster of 3 nodes Simple calculations like count take approximately 5s when using the default value of executor.memory (512MB). When I scale this up to 2GB, several Tasks take 1m or more (while most still are 1s), and tasks hang indefinitely if I set it to 4GB or higher. While these worker nodes aren't very powerful, they seem to have enough RAM to handle this: Running 'free –m' shows I have 7GB free on each worker. Any tips on why these jobs would hang when given more available RAM? Thanks Ben -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Executor Memory, Task hangs
Given a fixed amount of memory allocated to your workers, more memory per executor means fewer executors can execute in parallel. This means it takes longer to finish all of the tasks. Set high enough, and your executors can find no worker with enough memory and so they all are stuck waiting for resources. The reason the tasks seem to take longer is really that they spend time waiting for an executor rather than spend more time running. That's my first guess. If you want Spark to use more memory on your machines, give workers more memory. It sounds like there is no value in increasing executor memory as it only means you are underutilizing the CPU of your cluster by not running as many tasks in parallel as would be optimal. Hi all, I'm doing some testing on a small dataset (HadoopRDD, 2GB, ~10M records), with a cluster of 3 nodes Simple calculations like count take approximately 5s when using the default value of executor.memory (512MB). When I scale this up to 2GB, several Tasks take 1m or more (while most still are 1s), and tasks hang indefinitely if I set it to 4GB or higher. While these worker nodes aren't very powerful, they seem to have enough RAM to handle this: Running 'free –m' shows I have 7GB free on each worker. Any tips on why these jobs would hang when given more available RAM? Thanks Ben -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Executor Memory, Task hangs
Thanks Akhil and Sean. All three workers are doing the work and tasks stall simultaneously on all three. I think Sean hit on my issue. I've been under the impression that each application has one executor process per worker machine (not per core per machine). Is that incorrect? If an executor is running on each core that would totally make sense why things are stalling. Akhil, I'm running 8/cores per machine, and tasks are stalling on all three machines simultaneously. Also, no other Spark contexts are running, so I didn't think this was an issue of spark.executor.memory vs SPARK_WORKER_MEMORY (which is default currently). App UI ID NameCores Memory per Node Submitted Time UserState Duration app-20140819101355-0001http://tc1-master:8080/app?appId=app-20140819101355-0001 Spark shellhttp://tc1-master:4040/24 2.0 GB Worker UI ExecutorID Cores State Memory Job Details Logs 2 8 RUNNING 2.0 GB Tasks when it stalls: 129 129 SUCCESS NODE_LOCAL worker018/19/14 10:16 0.1 s 1 ms 130 130 RUNNING NODE_LOCAL worker038/19/14 10:16 5 s 131 131 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s 132 132 SUCCESS NODE_LOCAL worker028/19/14 10:16 0.1 s 1 ms 133 133 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 134 134 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s 135 135 RUNNING NODE_LOCAL worker038/19/14 10:16 5 s 136 136 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 137 137 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 138 138 RUNNING NODE_LOCAL worker038/19/14 10:16 5 s 139 139 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s 140 140 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 141 141 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s 142 142 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 143 143 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 144 144 RUNNING NODE_LOCAL worker038/19/14 10:16 5 s 145 145 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s From: Sean Owen so...@cloudera.commailto:so...@cloudera.com Date: Tuesday, August 19, 2014 at 9:23 AM To: Capital One benjamin.la...@capitalone.commailto:benjamin.la...@capitalone.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Executor Memory, Task hangs Given a fixed amount of memory allocated to your workers, more memory per executor means fewer executors can execute in parallel. This means it takes longer to finish all of the tasks. Set high enough, and your executors can find no worker with enough memory and so they all are stuck waiting for resources. The reason the tasks seem to take longer is really that they spend time waiting for an executor rather than spend more time running. That's my first guess. If you want Spark to use more memory on your machines, give workers more memory. It sounds like there is no value in increasing executor memory as it only means you are underutilizing the CPU of your cluster by not running as many tasks in parallel as would be optimal. Hi all, I'm doing some testing on a small dataset (HadoopRDD, 2GB, ~10M records), with a cluster of 3 nodes Simple calculations like count take approximately 5s when using the default value of executor.memory (512MB). When I scale this up to 2GB, several Tasks take 1m or more (while most still are 1s), and tasks hang indefinitely if I set it to 4GB or higher. While these worker nodes aren't very powerful, they seem to have enough RAM to handle this: Running 'free –m' shows I have 7GB free on each worker. Any tips on why these jobs would hang when given more available RAM? Thanks Ben The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended
Re: Bug or feature? Overwrite broadcasted variables.
Unfortunately, After some research I found its just a side effect of how closure containing var works in scala: http://stackoverflow.com/questions/11657676/how-does-scala-maintains-the-values-of-variable-when-the-closure-was-defined the closure keep referring var broadcasted wrapper as a pointer, until it is shipped to nodes, which is only triggered lazily. So, you can't do this after shipping already started (e.g. change the broadcasted value in a new thread when an action is running). It's neither a feature or bug, just an illusion. I would really like to see a non-blocking Broadcast.set() being implemented, it makes a lot of stochastic algorithms easier to write. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12382.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to configure SPARK_EXECUTOR_URI to access files from maprfs
We use MapR Hadoop and I have configured mesos-0.18.1 and spark-1.0.1 to work together on top of the nodes running mapr hadoop. I would like to configure spark to access files from the mapr filesystem (maprfs://) and I'm starting with configuring the SPARK_EXECUTOR_URI environment variable in the spark-env.sh file to access the precompiled spark version for our environment. I have tried the following in spark-env.sh: export SPARK_EXECUTOR_URI=maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz but when I submit a job from the spark-shell going throught the mesos master, I see this in my stderr log: WARNING: Logging before InitGoogleLogging() is written to STDERR I0819 11:09:59.404290 27027 fetcher.cpp:73] Fetching URI 'maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz' E0819 11:09:59.404392 27027 fetcher.cpp:142] A relative path was passed for the resource but the environment variable MESOS_FRAMEWORKS_HOME is not set. Please either specify this config option or avoid using a relative path Failed to fetch: maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz I the tried setting the MESOS_FRAMEWORKS_HOME environment variable in /etc/environment to MESOS_FRAMEWORKS_HOME=/usr/local/mesos-0.18.1 and now I see this in the stderr log: WARNING: Logging before InitGoogleLogging() is written to STDERR I0819 11:05:12.430522 4774 fetcher.cpp:73] Fetching URI 'maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz' I0819 11:05:12.430616 4774 fetcher.cpp:138] Prepended environment variable MESOS_FRAMEWORKS_HOME to relative path, making it: '/usr/local/mesos-0.18.1/maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz' I0819 11:05:12.430635 4774 fetcher.cpp:160] Copying resource from '/usr/local/mesos-0.18.1/maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz' to '/tmp/mesos/slaves/20140815-101817-3334820618-5050-32618-0/frameworks/20140819-101702-3334820618-5050-16778-0001/executors/20140815-101817-3334820618-5050-32618-0/runs/e56fffbe-942d-4b15-a798-a00401387927' cp: cannot stat `/usr/local/mesos-0.18.1/maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz': No such file or directory E0819 11:05:12.433964 4774 fetcher.cpp:165] Failed to copy '/usr/local/mesos-0.18.1/maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz' : Exit status 256 Failed to fetch: maprfs:///mapr/CLUSTER1/MAIN/tmp/spark-1.0.1-bin-mapr3.tgz Has any gotten Spark to interact successfully with maprfs?
Re: Viewing web UI after fact
Hi, Is there any way view history of applications statistics in master ui after restarting master server? I have all logs ing /tmp/spark-events/ but when I start history server in this directory it says No Completed Applications Found. Maybe I could copy this logs to dir used by master server but I couldn't find any. Or maybe I'm doing something wrong launching history server. Do you have any idea how to solve it? Thanks, Grzegorz On Thu, Aug 14, 2014 at 10:53 AM, Grzegorz Białek grzegorz.bia...@codilime.com wrote: Hi, Thank you both for your answers. Browsing using Master UI works fine. Unfortunately History Server shows No Completed Applications Found even if logs exists under given directory, but using Master UI is enough for me. Best regards, Grzegorz On Wed, Aug 13, 2014 at 8:09 PM, Andrew Or and...@databricks.com wrote: The Spark UI isn't available through the same address; otherwise new applications won't be able to bind to it. Once the old application finishes, the standalone Master renders the after-the-fact application UI and exposes it under a different URL. To see this, go to the Master UI (master-url:8080) and click on your application in the Completed Applications table. 2014-08-13 10:56 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com: Take a look at http://spark.apache.org/docs/latest/monitoring.html -- you need to launch a history server to serve the logs. Matei On August 13, 2014 at 2:03:08 AM, grzegorz-bialek ( grzegorz.bia...@codilime.com) wrote: Hi, I wanted to access Spark web UI after application stops. I set spark.eventLog.enabled to true and logs are availaible in JSON format in /tmp/spark-event but web UI isn't available under address http://driver-node:4040 I'm running Spark in standalone mode. What should I do to access web UI after application ends? Thanks, Grzegorz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Viewing-web-UI-after-fact-tp12023.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to RabbitMQ
Thanks for the quick and clear response! I now have a better understanding of what is going on regarding the driver and worker nodes which will help me greatly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12386.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark on disk executions
Hi , We have ~ 1TB of data to process , but our cluster doesn't have sufficient memory for such data set. ( we have 5-10 machine cluster). Is it possible to process 1TB data using ON DISK options using spark? If yes where can I read about the configuration for ON DISK executions. Thanks Oleg.
Updating shared data structure between executors
Hi, I am writing some Scala code to normalize a stream of logs using an input configuration file (multiple regex patterns). To avoid re-starting the job, I can read in a new config file using fileStream and then turn the config file to a map. But I am unsure about how to update a shared map (since broadcast vars cannot be updated)? Any help or pointers will be appreciated. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Python script runs fine in local mode, errors in other modes
Hello,I have a relatively simple python program that works just find in local most (--master local) but produces a strange error when I try to run it via Yarn ( --deploy-mode client --master yarn) or just execute the code through pyspark.Here's the code:sc = SparkContext(appName=foo)input = sc.textFile(hdfs://[valid hdfs path])mappedToLines = input.map(lambda myline: myline.split(,))The third line yields this error:TypeError: 'bool' object is not callableBut myline seems to be a valid string if I look at it this way: mappedToLines = input.map(lambda myline: len(myline)) mappedToLines.collect()[84, 104, 109, 89, 108, 92, 89, 90, 93, 102]I just now have access to a Hadoop cluster with Spark installed, so hopefully I'm running into some simple issues that I never had to deal with when testing in my own sandbox in purely local mode before. Any help would be appreciated, thanks! -Aaron -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-script-runs-fine-in-local-mode-errors-in-other-modes-tp12390.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Python script runs fine in local mode, errors in other modes
Could you post the completed stacktrace? On Tue, Aug 19, 2014 at 10:47 AM, Aaron aaron.doss...@target.com wrote: Hello, I have a relatively simple python program that works just find in local most (--master local) but produces a strange error when I try to run it via Yarn ( --deploy-mode client --master yarn) or just execute the code through pyspark. Here's the code: sc = SparkContext(appName=foo) input = sc.textFile(hdfs://[valid hdfs path]) mappedToLines = input.map(lambda myline: myline.split(,)) The third line yields this error: TypeError: 'bool' object is not callable But myline seems to be a valid string if I look at it this way: mappedToLines = input.map(lambda myline: len(myline)) mappedToLines.collect() [84, 104, 109, 89, 108, 92, 89, 90, 93, 102] I just now have access to a Hadoop cluster with Spark installed, so hopefully I'm running into some simple issues that I never had to deal with when testing in my own sandbox in purely local mode before. Any help would be appreciated, thanks! -Aaron View this message in context: Python script runs fine in local mode, errors in other modes Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Failed jobs show up as succeeded in YARN?
We see this all the time as well, I don't the believe there is much a relationship before the Spark job status and the what Yarn shows as the status. On Mon, Aug 11, 2014 at 3:17 PM, Shay Rojansky r...@roji.org wrote: Spark 1.0.2, Python, Cloudera 5.1 (Hadoop 2.3.0) It seems that Python jobs I'm sending to YARN show up as succeeded even if they failed... Am I doing something wrong, is this a known issue? Thanks, Shay
Re: Python script runs fine in local mode, errors in other modes
Sure thing, this is the stacktrace from pyspark. It's repeated a few times, but I think this is the unique stuff. Traceback (most recent call last): File stdin, line 1, in module File /opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/python/pyspark/rdd.py, line 583, in collect bytesInJava = self._jrdd.collect().iterator() File /opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o24.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent failure: Exception failure in TID 5 on host [hostname redacted]: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/python/pyspark/worker.py, line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/python/pyspark/serializers.py, line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/python/pyspark/serializers.py, line 123, in dump_stream for obj in iterator: File /opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/python/pyspark/serializers.py, line 180, in _batched for item in iterator: File stdin, line 1, in lambda TypeError: 'bool' object is not callable org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:118) org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:148) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:81) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-script-runs-fine-in-local-mode-errors-in-other-modes-tp12390p12392.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: a noob question for how to implement setup and cleanup in Spark map
Sean, would this work -- rdd.mapPartitions { partition = Iterator(partition) }.foreach( // Some setup code here // save partition to DB // Some cleanup code here ) I tried a pretty simple example ... I can see that the setup and cleanup are executed on the executor node, once per partition (I used mapPartitionWithIndex instead of mapPartition to track this a little better). Seems like an easier solution than Tobias's but I'm wondering if it's perhaps incorrect On Mon, Aug 18, 2014 at 3:29 AM, Henry Hung ythu...@winbond.com wrote: I slightly modify the code to use while(partitions.hasNext) { } instead of partitions.map(func) I suppose this can eliminate the uncertainty from lazy execution. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, August 18, 2014 3:10 PM To: MA33 YTHung1 Cc: user@spark.apache.org Subject: Re: a noob question for how to implement setup and cleanup in Spark map I think this was a more comprehensive answer recently. Tobias is right that it is not quite that simple: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung ythu...@winbond.com wrote: Hi All, Please ignore my question, I found a way to implement it via old archive mails: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E Best regards, Henry From: MA33 YTHung1 Sent: Monday, August 18, 2014 2:42 PM To: user@spark.apache.org Subject: a noob question for how to implement setup and cleanup in Spark map Hi All, I’m new to Spark and Scala, just recently using this language and love it, but there is a small coding problem when I want to convert my existing map reduce code from Java to Spark… In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and override the setup(), map() and cleanup() methods. But in the Spark, there is no a method called setup(), so I write the setup() code into map(), but it performs badly. The reason is I create database connection in the setup() once and run() will execute SQL query, then cleanup() will close the connection. Could someone tell me how to do it in Spark? Best regards, Henry Hung The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
EC2 instances missing SSD drives randomly?
Hi, Using the spark 1.0.1 ec2 script I launched 35 m3.2xlarge instances. (I was using Singapore region.) Some of the instances we got without the ephemeral internal (non-EBS) SSD devices that are supposed to be connected to them. Some of them have these drives but not all, and there is no sign from the outside, one can only notice this by ssh-ing into the instances and typing `df -l` thus it seems to be a bug to me. I am not sure if Amazon is not providing the drives or the Spark AMI configures something wrong. Do you have any idea what is going on? I neved faced this issue before. It is not like the drive is not formatted/mounted (as it was the case with the new r3 instances), they are not present physically. (Though the mnt and mnt2 are configured properly in fstab.) I did several tries and the result was the same: some of the instances launched with the drives, some without. Please, let me know if you have some ideas what to do with this inconsistent behaviour. András
Re: a noob question for how to implement setup and cleanup in Spark map
I think you're looking for foreachPartition(). You've kinda hacked it out of mapPartitions(). Your case has a simple solution, yes. After saving to the DB, you know you can close the connection, since you know the use of the connection has definitely just finished. But it's not a simpler solution for mapPartitions() since that's not really what you are using :) In general, mapPartitions creates an Iterator from another Iterator. Of course you could consume the input iterator, open the connection, perform operations, close the connection and return an iterator over the result. That works, but requires reading the entire input no matter what, and, reading it into memory. These may not be OK in all cases. Where possible, it's nicest to return an Iterator that accesses the source Iterator only as needed to produce elements. This means returning that Iterator before any work has been done. So you have to close the connection later when the Iterator has been exhausted. Really Tobias's method is trying to shim in a cleanup() lifecycle method into the Iterator. I suppose it could be done a little more cleanly using Guava's Iterator library, which would give you a more explicit way to execute something when done. On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Sean, would this work -- rdd.mapPartitions { partition = Iterator(partition) }.foreach( // Some setup code here // save partition to DB // Some cleanup code here ) I tried a pretty simple example ... I can see that the setup and cleanup are executed on the executor node, once per partition (I used mapPartitionWithIndex instead of mapPartition to track this a little better). Seems like an easier solution than Tobias's but I'm wondering if it's perhaps incorrect - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python script runs fine in local mode, errors in other modes
These three lines of python code cause the error for me: sc = SparkContext(appName=foo) input = sc.textFile(hdfs://[valid hdfs path]) mappedToLines = input.map(lambda myline: myline.split(,)) The file I'm loading is a simple CSV. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-script-runs-fine-in-local-mode-errors-in-other-modes-tp12390p12398.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
noob: how to extract different members of a VertexRDD
I'm a Scala / Spark / GraphX newbie, so may be missing something obvious. I have a set of edges that I read into a graph. For an iterative community-detection algorithm, I want to assign each vertex to a community with the name of the vertex. Intuitively it seems like I should be able to pull the vertexID out of the VertexRDD and build a new VertexRDD with 2 Int attributes. Unfortunately I'm not finding the recipe to unpack the VertexRDD into the vertexID and attribute pieces. The code snippet that builds the graph looks like import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val G = GraphLoader.edgeListFile(sc,[[...]]clique_5_2_3.edg) Poking at G to see what it looks like, I see scala :type G.vertices org.apache.spark.graphx.VertexRDD[Int] scala G.vertices.collect() res1: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((10002,1), (4,1), (10001,1), (1,1), (0,1), (1,1), (10003,1), (3,1), (10004,1), (2,1)) I've tried several ways to pull out just the first element of each tuple into a new variable, with no success. scala var (x: Int) = G.vertices console:21: error: type mismatch; found : org.apache.spark.graphx.VertexRDD[Int] required: Int var (x: Int) = G.vertices ^ scala val x: Int = G.vertices._1 console:21: error: value _1 is not a member of org.apache.spark.graphx.VertexRDD[Int] val x: Int = G.vertices._1 ^ What am I missing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/noob-how-to-extract-different-members-of-a-VertexRDD-tp12399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python script runs fine in local mode, errors in other modes
This script run very well without your CSV file. Could download you CSV file into local disks, and narrow down to the lines which triggle this issue? On Tue, Aug 19, 2014 at 12:02 PM, Aaron aaron.doss...@target.com wrote: These three lines of python code cause the error for me: sc = SparkContext(appName=foo) input = sc.textFile(hdfs://[valid hdfs path]) mappedToLines = input.map(lambda myline: myline.split(,)) The file I'm loading is a simple CSV. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-script-runs-fine-in-local-mode-errors-in-other-modes-tp12390p12398.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark - reading hfds files every 5 minutes
Thank you but how do you convert the stream to parquet file? Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-spark-reading-hfds-files-every-5-minutes-tp12359p12401.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: noob: how to extract different members of a VertexRDD
(+user) On Tue, Aug 19, 2014 at 12:05 PM, spr s...@yarcdata.com wrote: I want to assign each vertex to a community with the name of the vertex. As I understand it, you want to set the vertex attributes of a graph to the corresponding vertex ids. You can do this using Graph#mapVertices [1] as follows: val g = ... val newG = g.mapVertices((id, attr) = id) // newG.vertices has type VertexRDD[VertexId], or RDD[(VertexId, VertexId)] If you only want to do this for a VertexRDD without constructing a new graph using the modified vertices, you can use VertexRDD#mapVertices [2] in a similar fashion. [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD) ⇒VD2)(ClassTag[VD2]):Graph[VD2,ED] [2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.VertexRDD@mapValues[VD2]((VertexId,VD) ⇒VD2)(ClassTag[VD2]):VertexRDD[VD2] Ankur http://www.ankurdave.com/
Re: EC2 instances missing SSD drives randomly?
I think you have to explicitly list the ephemeral disks in the device map when launching the EC2 instance. http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/block-device-mapping-concepts.html On Tue, Aug 19, 2014 at 11:54 AM, Andras Barjak andras.bar...@lynxanalytics.com wrote: Hi, Using the spark 1.0.1 ec2 script I launched 35 m3.2xlarge instances. (I was using Singapore region.) Some of the instances we got without the ephemeral internal (non-EBS) SSD devices that are supposed to be connected to them. Some of them have these drives but not all, and there is no sign from the outside, one can only notice this by ssh-ing into the instances and typing `df -l` thus it seems to be a bug to me. I am not sure if Amazon is not providing the drives or the Spark AMI configures something wrong. Do you have any idea what is going on? I neved faced this issue before. It is not like the drive is not formatted/mounted (as it was the case with the new r3 instances), they are not present physically. (Though the mnt and mnt2 are configured properly in fstab.) I did several tries and the result was the same: some of the instances launched with the drives, some without. Please, let me know if you have some ideas what to do with this inconsistent behaviour. András - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: noob: how to extract different members of a VertexRDD
ankurdave wrote val g = ... val newG = g.mapVertices((id, attr) = id) // newG.vertices has type VertexRDD[VertexId], or RDD[(VertexId, VertexId)] Yes, that worked perfectly. Thanks much. One follow-up question. If I just wanted to get those values into a vanilla variable (not a VertexRDD or Graph or ...) so I could easily look at them in the REPL, what would I do? Are the aggregate data structures inside the VertexRDD/Graph/... Arrays or Lists or what, or do I even need to know/care? Thanks.Steve -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/noob-how-to-extract-different-members-of-a-VertexRDD-tp12399p12404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark/yarn and inconsistent number of executors
I've set up a YARN (Hadoop 2.4.1) cluster with Spark 1.0.1 and I've been seeing some inconsistencies with out of memory errors (java.lang.OutOfMemoryError: unable to create new native thread) when increasing the number of executors for a simple job (wordcount). The general format of my submission is: spark-submit \ --master yarn-client \ --num-executors=$EXECUTORS \ --executor-cores 1 \ --executor-memory 2G \ --driver-memory 3G \ count.py intput output If I run without specifying the number of executors, it defaults to two (3 containers: 2 executors, 1 driver). Is there any mechanism to let a spark application scale to the capacity of the YARN cluster automatically? Similarly, for low numbers of executors I get what I asked for (e.g., 10 executors results in 11 containers running, 20 executors results in 21 containers, etc) until a particular threshold... when I specify 50 containers, Spark seems to start asking for more and more containers until all the memory in the cluster is allocated and the job gets killed. I don't understand that particular behavior—if anyone has any thoughts, that would be great if you could share your experiences. Wouldn't it be preferable to have Spark stop requesting containers if the cluster is at capacity rather than kill the job or error out? Does anyone have any recommendations on how to tweak the number of executors in an automated manner? Thanks, Calvin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD Grouping
Hi, is there a way such that I can group items in an RDD together such that I can process them using parallelize/map Let's say I have data items with keys 1...1000 e.g. loading RDD = sc. newAPIHadoopFile(...).cache() Now, I would like them to be processed in chunks of e.g. tens chunk1=[0..9],chunk2=[10..19],...,chunk100=[991..999] sc.parallelize([chunk1,,chunk100]).map(process my chunk) I thought I could use groupBy() or something like that but the return-type is PipelinedRDD, which is not iterable. Anybody an idea? Thanks in advance, Tassilo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Grouping-tp12407.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD Grouping
groupBy seems to be exactly what you want. val data = sc.parallelize(1 to 200) data.groupBy(_ % 10).values.map(...) This would let you process 10 Iterable[Int] in parallel, each of which is 20 ints in this example. It may not make sense to do this in practice, as you'd be shuffling a lot of data around just to make the chunks. If you want to map chunks of data at once, look at mapPartitions(), which will tend to respect data locality. groupBy returns an RDD -- looks like ShuffledRDD actually but may depend on what comes before. It shouldn't matter though; it's an RDD and that's what you need, not an Iterable. On Tue, Aug 19, 2014 at 9:02 PM, TJ Klein tjkl...@gmail.com wrote: Hi, is there a way such that I can group items in an RDD together such that I can process them using parallelize/map Let's say I have data items with keys 1...1000 e.g. loading RDD = sc. newAPIHadoopFile(...).cache() Now, I would like them to be processed in chunks of e.g. tens chunk1=[0..9],chunk2=[10..19],...,chunk100=[991..999] sc.parallelize([chunk1,,chunk100]).map(process my chunk) I thought I could use groupBy() or something like that but the return-type is PipelinedRDD, which is not iterable. Anybody an idea? Thanks in advance, Tassilo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Grouping-tp12407.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: noob: how to extract different members of a VertexRDD
At 2014-08-19 12:47:16 -0700, spr s...@yarcdata.com wrote: One follow-up question. If I just wanted to get those values into a vanilla variable (not a VertexRDD or Graph or ...) so I could easily look at them in the REPL, what would I do? Are the aggregate data structures inside the VertexRDD/Graph/... Arrays or Lists or what, or do I even need to know/care? The vertex values are internally stored in hash tables within each partition (see VertexPartitionBase if you're curious) but to access them all from the REPL, you can just use RDD#collect as in your first mail. If you want just the vertex ids, you can use RDD#map first: val verts: VertexRDD[Int] = ... val pairs: Array[(VertexId, Int)] = verts.collect() val ids: Array[VertexId] = verts.map(kv = kv._1).collect() Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Naive Bayes
Hi Xiangrui, Training data: 42945 s out of 124659. Test data: 42722 s out of 125341. The ratio is very much the same. I tried Decision Tree. It outputs 0 to 1 decimals. I don't quite understand it yet. Would feature scaling make it work for Naive Bayes? Phuoc Do On Tue, Aug 19, 2014 at 12:51 AM, Xiangrui Meng men...@gmail.com wrote: What is the ratio of examples labeled `s` to those labeled `b`? Also, Naive Bayes doesn't work on negative feature values. It assumes term frequencies as the input. We should throw an exception on negative feature values. -Xiangrui On Tue, Aug 19, 2014 at 12:07 AM, Phuoc Do phu...@vida.io wrote: I'm trying Naive Bayes classifier for Higg Boson challenge on Kaggle: http://www.kaggle.com/c/higgs-boson Here's the source code I'm working on: https://github.com/dnprock/SparkHiggBoson/blob/master/src/main/scala/KaggleHiggBosonLabel.scala Training data looks like this: 10,138.47,51.655,97.827,27.98,0.91,124.711,2.666,3.064,41.928,197.76,1.582,1.396,0.2,32.638,1.017,0.381,51.626,2.273,-2.414,16.824,-0.277,258.733,2,67.435,2.15,0.444,46.062,1.24,-2.475,113.497,s 11,160.937,68.768,103.235,48.146,-999,-999,-999,3.473,2.078,125.157,0.879,1.414,-999,42.014,2.039,-3.011,36.918,0.501,0.103,44.704,-1.916,164.546,1,46.226,0.725,1.158,-999,-999,-999,46.226,b My problem is Naive Bayes classifier always outputs 0 (for b) for all test data. I appreciate any help. -- Phuoc Do https://vida.io/dnprock -- Phuoc Do https://vida.io/dnprock
Only master is really busy at KMeans training
When trying to use KMeans.train with some large data and 5 worker nodes, it would due to BlockManagers shutting down because of timeout. I was able to prevent that by adding spark.storage.blockManagerSlaveTimeoutMs 300 to the spark-defaults.conf. However, with 1 Million feature vectors, the Stage takeSample at KMeans.scala:263 runs for about 50 minutes. In this time, about half of the tasks are done, then I lose the executors and Spark starts a new repartitioning stage. I also noticed that in the takeSample stage, the task was running for about 2.5 minutes until suddenly it is finished and duration (prev. those 2.5min) change to 2s, with 0.9s GC time. The training data is supplied in this form: var vectors2 = vectors.repartition(1000).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) var broadcastVector = sc.broadcast(vectors2) The 1000 partitions is something that could probably be optimized, but too few will cause OOM erros. Using Ganglia, I can see that the master node is the only one that is properly busy regarding CPU, and that most (600-700 of 800 total percent CPU) is used by the master. The workers on each node only use 1 Core, i.e. 100% CPU. What would be the most likely cause for such an inefficient use of the cluster, and how to prevent it? Number of partitions, way of caching, ...? I'm trying to find out myself with tests, but ideas from someone with more experience are very welcome. Best regards, simn -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Only-master-is-really-busy-at-KMeans-training-tp12411.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
saveAsTextFile hangs with hdfs
I have a simple spark job that seems to hang when saving to hdfs. When looking at the spark web ui, the job reached 97 of 100 tasks completed. I need some help determining why the job appears to hang. The job hangs on the saveAsTextFile() call. https://www.dropbox.com/s/fdp7ck91hhm9w68/Screenshot%202014-08-19%2010.53.24.png The job is pretty simple: JavaRDDString analyticsLogs = context .textFile(Joiner.on(,).join(hdfs.glob(/spark-dfs, .*\\.log$)), 4); JavaRDDAnalyticsLogFlyweight flyweights = analyticsLogs .map(line - { try { AnalyticsLog log = GSON.fromJson(line, AnalyticsLog.class); AnalyticsLogFlyweight flyweight = new AnalyticsLogFlyweight(); flyweight.ipAddress = log.getIpAddress(); flyweight.time = log.getTime(); flyweight.trackingId = log.getTrackingId(); return flyweight; } catch (Exception e) { LOG.error(error parsing json, e); return null; } }); JavaRDDAnalyticsLogFlyweight filtered = flyweights .filter(log - log != null); JavaPairRDDString, AnalyticsLogFlyweight partitioned = filtered .mapToPair((AnalyticsLogFlyweight log) - new Tuple2(log.trackingId, log)) .partitionBy(new HashPartitioner(100)).cache(); OrderingAnalyticsLogFlyweight ordering = Ordering.natural().nullsFirst().onResultOf(new FunctionAnalyticsLogFlyweight, Long() { public Long apply(AnalyticsLogFlyweight log) { return log.time; } }); JavaPairRDDString, IterableAnalyticsLogFlyweight stringIterableJavaPairRDD = partitioned.groupByKey(); JavaPairRDDString, Integer stringIntegerJavaPairRDD = stringIterableJavaPairRDD.mapToPair((log) - { ListAnalyticsLogFlyweight sorted = Lists.newArrayList(log._2()); sorted.forEach(l - LOG.info(sorted {}, l)); return new Tuple2(log._1(), sorted.size()); }); String outputPath = /summarized/groupedByTrackingId4; hdfs.rm(outputPath, true); stringIntegerJavaPairRDD.saveAsTextFile(String.format(%s/%s, hdfs.getUrl(), outputPath)); Thanks in advance, David
Re: OpenCV + Spark : Where to put System.loadLibrary ?
Reviving this thread hoping I might be able to get an exact snippet for the correct way to do this in Scala. I had a solution for OpenCV that I thought was correct, but half the time the library was not loaded by time it was needed. Keep in mind that I am completely new at Scala, so you're going to have to be pretty explicit. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OpenCV-Spark-Where-to-put-System-loadLibrary-tp2523p12413.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark RuntimeException due to Unsupported datatype NullType
Hi Rafeeq, I think the following part triggered the bug https://issues.apache.org/jira/browse/SPARK-2908. [{*href:null*,rel:me}] It has been fixed. Can you try spark master and see if the error get resolved? Thanks, Yin On Mon, Aug 11, 2014 at 3:53 AM, rafeeq s rafeeq.ec...@gmail.com wrote: Hi, *Spark RuntimeException due to Unsupported datatype NullType , *When saving null primitives *jsonRDD *with *.saveAsParquetFile()* *Code: I am trying to* store jsonRDD into Parquet file using *saveAsParquetFile with below code.* JavaRDDString javaRDD = ssc.sparkContext().parallelize(jsonData); JavaSchemaRDD schemaObject = sqlContext.jsonRDD(javaRDD); *schemaObject.saveAsParquetFile*(tweets/tweet + time.toString().replace( ms, ) + .parquet); *Input: *In below *JSON input* have some *null values* which are not supported by spark and throwing error as *Unsupported datatype NullType.* {id:tag:search.twitter.com ,2005:11,objectType:activity,actor:{objectType:person,id:id: twitter.com:111,link:http://www.twitter.com/funtubevids,displayName:مشاهد حول العالم,postedTime:2014-05-01T06:14:51.000Z,image: https://pbs.twimg.com/profile_images/111/VORNn-Df_normal.png;, *summary*:*null*,links:[{*href:null* ,rel:me}],friendsCount:0,followersCount:49,listedCount:0,statusesCount:61, *twitterTimeZone:null*,verified:false*,utcOffset:null* ,preferredUsername:funtubevids,languages:[en],favoritesCount:0},verb:post,postedTime:2014-05-27T17:33:54.000Z,generator:{displayName:web,link: http://twitter.com },provider:{objectType:service,displayName:Twitter,link: http://www.twitter.com},link:; http://twitter.com/funtubevids/statuses/1,body:القيادة في مدرج الطيران #مهبط #مدرج #مطار #هبوط #قيادة #سيارة #طائرة #airport #plane #car https://t.co/gnn7LKE6pC,object:urls:[{url:; https://t.co/gnn7LKE6pC,expanded_url:; https://www.youtube.com/watch?v=J-j6RSRMvRo ,expanded_status:200}],klout_score:10,language:{value:ar}}} *ERROR* scheduler.JobScheduler: Error running job streaming job 140774119 ms.0 *java.lang.RuntimeException: Unsupported datatype NullType* at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:267) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$2.apply(ParquetTypes.scala:244) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$2.apply(ParquetTypes.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:243) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:235) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$2.apply(ParquetTypes.scala:244) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$2.apply(ParquetTypes.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:243) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$3.apply(ParquetTypes.scala:287) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$3.apply(ParquetTypes.scala:286) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105)
Re: NullPointerException when connecting from Spark to a Hive table backed by HBase
Seems https://issues.apache.org/jira/browse/SPARK-2846 is the jira tracking this issue. On Mon, Aug 18, 2014 at 6:26 PM, cesararevalo ce...@zephyrhealthinc.com wrote: Thanks, Zhan for the follow up. But, do you know how I am supposed to set that table name on the jobConf? I don't have access to that object from my client driver? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-connecting-from-Spark-to-a-Hive-table-backed-by-HBase-tp12284p12331.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark error when distinct on more than one cloume
Hi, The SQLParser used by SQLContext is pretty limited. Instead, can you try HiveContext? Thanks, Yin On Tue, Aug 19, 2014 at 7:57 AM, wan...@testbird.com wan...@testbird.com wrote: sql:SELECT app_id,COUNT(DISTINCT app_id, macaddr) cut from object group by app_id *Error Log* 14/08/19 17:58:26 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 158.6 KB, free 294.7 MB) Exception in thread main java.lang.RuntimeException: [1.36] failure: ``)'' expected but `,' found SELECT app_id,COUNT(DISTINCT app_id, macaddr) cut from object group by app_id ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:47) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:70) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:150) at com.testbird.sparkapi.APILearn1$.main(APILearn1.scala:30) at com.testbird.sparkapi.APILearn1.main(APILearn1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) any one can help me , thanks --
spark streaming - how to prevent that empty dstream to get written out to hdfs
Hi All, I have the following code and if the dstream is empty spark streaming writes empty files ti hdfs. How can I prevent it? val ssc = new StreamingContext(sparkConf, Minutes(1)) val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, csv, classOf[String], classOf[String], classOf[TextOutputFormat[String,String]], ssc.sparkContext.hadoopConfiguration) Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-how-to-prevent-that-empty-dstream-to-get-written-out-to-hdfs-tp12417.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointerException when connecting from Spark to a Hive table backed by HBase
Thanks! Yeah, it may be related to that. I'll check out that pull request that was sent and hopefully that fixes the issue. I'll let you know, after fighting with this issue yesterday I had decided to just leave it on the side and return to it after, so it may take me a while to get back to you. -Cesar On Tue, Aug 19, 2014 at 2:04 PM, Yin Huai huaiyin@gmail.com wrote: Seems https://issues.apache.org/jira/browse/SPARK-2846 is the jira tracking this issue. On Mon, Aug 18, 2014 at 6:26 PM, cesararevalo ce...@zephyrhealthinc.com wrote: Thanks, Zhan for the follow up. But, do you know how I am supposed to set that table name on the jobConf? I don't have access to that object from my client driver? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-connecting-from-Spark-to-a-Hive-table-backed-by-HBase-tp12284p12331.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Cesar Arevalo Software Engineer ❘ Zephyr Health 450 Mission Street, Suite #201 ❘ San Francisco, CA 94105 m: +1 415-571-7687 ❘ s: arevalocesar | t: @zephyrhealth https://twitter.com/zephyrhealth o: +1 415-529-7649 ❘ f: +1 415-520-9288 http://www.zephyrhealth.com
Re: saveAsTextFile hangs with hdfs
update: hangs even when not writing to hdfs. I changed the code to avoid saveAsTextFile() and instead do a forEachParitition and log the results. This time it hangs at 96/100 tasks, but still hangs. I changed the saveAsTextFile to: stringIntegerJavaPairRDD.foreachPartition(p - { while (p.hasNext()) { LOG.info({}, p.next()); } }); Thanks, David. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-hangs-with-hdfs-tp12412p12419.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile hangs with hdfs
Not sure if this is helpful or not, but in one executor stderr log, I found this: 14/08/19 20:17:04 INFO CacheManager: Partition rdd_5_14 not found, computing it 14/08/19 20:17:04 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/08/19 20:17:04 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 16251 non-empty blocks out of 25435 blocks 14/08/19 20:17:04 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 123 ms 14/08/19 20:34:00 INFO SendingConnection: Initiating connection to [localhost/127.0.0.1:39840] 14/08/19 20:34:00 WARN SendingConnection: Error finishing connection to localhost/127.0.0.1:39840 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:712) at org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/08/19 20:34:00 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(localhost,39840) 14/08/19 20:34:00 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(localhost,39840) 14/08/19 20:34:08 INFO SendingConnection: Initiating connection to [localhost/127.0.0.1:39840] 14/08/19 20:34:08 WARN SendingConnection: Error finishing connection to localhost/127.0.0.1:39840 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:712) at org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/08/19 20:34:08 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(localhost,39840) 14/08/19 20:34:08 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(localhost,39840) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-hangs-with-hdfs-tp12412p12420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-submit with Yarn
Is there more documentation on using spark-submit with Yarn? Trying to launch a simple job does not seem to work. My run command is as follows: /opt/cloudera/parcels/CDH/bin/spark-submit \ --master yarn \ --deploy-mode client \ --executor-memory 10g \ --driver-memory 10g \ --num-executors 50 \ --class $MAIN_CLASS \ --verbose \ $JAR \ $@ The verbose logging correctly parses the arguments: System properties: spark.executor.memory - 10g spark.executor.instances - 50 SPARK_SUBMIT - true spark.master - yarn-client But when I view the job 4040 page, SparkUI, there is a single executor (just the driver node) and I see the following in enviroment spark.master - local[24] Also, when I run with yarn-cluster, how can I access the SparkUI page? Thanks, Arun
Re: spark-submit with Yarn
On Tue, Aug 19, 2014 at 2:34 PM, Arun Ahuja aahuj...@gmail.com wrote: /opt/cloudera/parcels/CDH/bin/spark-submit \ --master yarn \ --deploy-mode client \ This should be enough. But when I view the job 4040 page, SparkUI, there is a single executor (just the driver node) and I see the following in enviroment spark.master - local[24] Hmmm. Are you sure the app itself is not overwriting spark.master before creating the SparkContext? That's the only explanation I can think of. Also, when I run with yarn-cluster, how can I access the SparkUI page? You can click on the link in the RM application list. The address is also printed to the AM logs, which are also available through the RM web ui. Finally, the link is printed to the output of the launcher process (look for appTrackingUrl). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit with Yarn
Yes, the application is overwriting it - I need to pass it as argument to the application otherwise it will be set as local. Thanks for the quick reply! Also, yes now the appTrackingUrl is set properly as well, before it just said unassigned. Thanks! Arun On Tue, Aug 19, 2014 at 5:47 PM, Marcelo Vanzin van...@cloudera.com wrote: On Tue, Aug 19, 2014 at 2:34 PM, Arun Ahuja aahuj...@gmail.com wrote: /opt/cloudera/parcels/CDH/bin/spark-submit \ --master yarn \ --deploy-mode client \ This should be enough. But when I view the job 4040 page, SparkUI, there is a single executor (just the driver node) and I see the following in enviroment spark.master - local[24] Hmmm. Are you sure the app itself is not overwriting spark.master before creating the SparkContext? That's the only explanation I can think of. Also, when I run with yarn-cluster, how can I access the SparkUI page? You can click on the link in the RM application list. The address is also printed to the AM logs, which are also available through the RM web ui. Finally, the link is printed to the output of the launcher process (look for appTrackingUrl). -- Marcelo
Re: RDD Grouping
Thanks a lot. Yes, this mapPartitions seems a better way of dealing with this problem as for groupBy() I need to collect() data before applying parallelize(), which is expensive. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Grouping-tp12407p12424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sqlContext.parquetFile(path) fails if path is a file but succeeds if a directory
it is definitively a bug, sqlContext.parquetFile should take both dir and single file as parameter. this if-check for isDir make no sense after this commit https://github.com/apache/spark/pull/1370/files#r14967550 i opened a ticket for this issue https://issues.apache.org/jira/browse/SPARK-3138 this ticket shows how to reproduce this bug. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sqlContext-parquetFile-path-fails-if-path-is-a-file-but-succeeds-if-a-directory-tp12345p12426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit with Yarn
The --master should override any other ways of setting the Spark master. Ah yes, actually you can set spark.master directly in your application through SparkConf. Thanks Marcelo. 2014-08-19 14:47 GMT-07:00 Marcelo Vanzin van...@cloudera.com: On Tue, Aug 19, 2014 at 2:34 PM, Arun Ahuja aahuj...@gmail.com wrote: /opt/cloudera/parcels/CDH/bin/spark-submit \ --master yarn \ --deploy-mode client \ This should be enough. But when I view the job 4040 page, SparkUI, there is a single executor (just the driver node) and I see the following in enviroment spark.master - local[24] Hmmm. Are you sure the app itself is not overwriting spark.master before creating the SparkContext? That's the only explanation I can think of. Also, when I run with yarn-cluster, how can I access the SparkUI page? You can click on the link in the RM application list. The address is also printed to the AM logs, which are also available through the RM web ui. Finally, the link is printed to the output of the launcher process (look for appTrackingUrl). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Processing multiple files in parallel
Without the sc.union, my program crashes with the following error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-multiple-files-in-parallel-tp12336p12428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Task's Scheduler Delay in web ui
Scheduling Delay is the time required to assign a task to an available resource. if you're seeing large scheduler delays, this likely means that other jobs/tasks are using up all of the resources. here's some more info on how to setup Fair Scheduling versus the default FIFO Scheduler: https://spark.apache.org/docs/latest/job-scheduling.html of course, increasing the cluster size would help assuming resources are being allocated fairly. also, delays can vary depending on the cluster resource manager that you're using (spark standalone, yarn, mesos). -chris On Tue, Jul 8, 2014 at 4:14 AM, haopu hw...@qilinsoft.com wrote: What's the meaning of a Task's Scheduler Delay in the web ui? And what could cause that delay? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-s-Scheduler-Delay-in-web-ui-tp9019.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
First Bay Area Tachyon meetup: August 25th, hosted by Yahoo! (Limited Space)
Hi folks, We've posted the first Tachyon meetup, which will be on August 25th and is hosted by Yahoo! (Limited Space): http://www.meetup.com/Tachyon/events/200387252/ . Hope to see you there! Best, Haoyuan -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Re: First Bay Area Tachyon meetup: August 25th, hosted by Yahoo! (Limited Space)
Fantastic! Sent while mobile. Pls excuse typos etc. On Aug 19, 2014 4:09 PM, Haoyuan Li haoyuan...@gmail.com wrote: Hi folks, We've posted the first Tachyon meetup, which will be on August 25th and is hosted by Yahoo! (Limited Space): http://www.meetup.com/Tachyon/events/200387252/ . Hope to see you there! Best, Haoyuan -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?
this would be awesome. did a jira get created for this? I searched, but didn't find one. thanks! -chris On Tue, Jul 8, 2014 at 1:30 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Thanks a lot Xiangrui. This will help. On Wed, Jul 9, 2014 at 1:34 AM, Xiangrui Meng men...@gmail.com wrote: Hi Rahul, We plan to add online model updates with Spark Streaming, perhaps in v1.1, starting with linear methods. Please open a JIRA for Naive Bayes. For Naive Bayes, we need to update the priors and conditional probabilities, which means we should also remember the number of observations for the updates. Best, Xiangrui On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hi, I am using the MLlib Naive Bayes for a text classification problem. I have very less amount of training data. And then the data will be coming continuously and I need to classify it as either A or B. I am training the MLlib Naive Bayes model using the training data but next time when data comes, I want to predict its class and then incorporate that also in the model for next time prediction of new data(I think that is obvious). So I am not able to figure out what is the way to do that using MLlib Naive Bayes. Is it that I have to train the model on the whole data every time new data comes in?? Thanks in Advance! -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Decision tree: categorical variables
Hi All, Is there any example of MLlib decision tree handling categorical variables? My dataset includes few categorical variables (20 out of 100 features) so was interested in knowing how I can use the current version of decision tree implementation to handle this situation? I looked at the LabeledData and not sure if that the way to go..
High-Level Implementation Documentation
Hey all, Other than reading the source (not a bad idea in and of iteself; something I will get to soon) I was hoping to find some high-level implementation documentation. Can anyone point me to such a document(s)? Thank you in advance. -Kenny -- :SIG:!0x1066BA71A5F56C58!: - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit with HA YARN
Hi Matt, I checked in the YARN code and I don't see any references to yarn.resourcemanager.address. Have you made sure that your YARN client configuration on the node you're launching from contains the right configs? -Sandy On Mon, Aug 18, 2014 at 4:07 PM, Matt Narrell matt.narr...@gmail.com wrote: Hello, I have an HA enabled YARN cluster with two resource mangers. When submitting jobs via “spark-submit —master yarn-cluster”. It appears that the driver is looking explicitly for the yarn.resourcemanager.address” property rather than round robin-ing through the resource managers via the “yarn.client.failover-proxy-provider” property set to “org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider” If I explicitly set the “yarn.resourcemanager.address” to the active resource manager, jobs will submit fine. Is there a manner to set “spark-submit —master yarn-cluster” to respect the failover proxy? Thanks in advance, Matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: slower worker node in the cluster
perhaps creating Fair Scheduler Pools might help? there's no way to pin certain nodes to a pool, but you can specify minShares (cpu's). not sure if that would help, but worth looking in to. On Tue, Jul 8, 2014 at 7:37 PM, haopu hw...@qilinsoft.com wrote: In a standalone cluster, is there way to specify the stage to be running on a faster worker? That stage is reading HDFS file and then doing some filter operations. The tasks are assigned to the slower worker also, but the slower worker delays to launch because it's running some tasks from other stages. So I think it may be better to assign stage to a worker. Any suggestions? And will the cluster on Yarn help? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/slower-worker-node-in-the-cluster-tp9125.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Decision tree: categorical variables
The categorical features must be encoded into indices starting from 0: 0, 1, ..., numCategories - 1. Then you can provide the categoricalFeatureInfo map to specify which columns contain categorical features and the number of categories in each. Joseph is updating the user guide. But if you want to try something now, you can take look at the docs of DecisionTree.trainClassifier and trainRegressor: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala#L360 -Xiangrui On Tue, Aug 19, 2014 at 4:24 PM, Sameer Tilak ssti...@live.com wrote: Hi All, Is there any example of MLlib decision tree handling categorical variables? My dataset includes few categorical variables (20 out of 100 features) so was interested in knowing how I can use the current version of decision tree implementation to handle this situation? I looked at the LabeledData and not sure if that the way to go.. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?
No. Please create one but it won't be able to catch the v1.1 train. -Xiangrui On Tue, Aug 19, 2014 at 4:22 PM, Chris Fregly ch...@fregly.com wrote: this would be awesome. did a jira get created for this? I searched, but didn't find one. thanks! -chris On Tue, Jul 8, 2014 at 1:30 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Thanks a lot Xiangrui. This will help. On Wed, Jul 9, 2014 at 1:34 AM, Xiangrui Meng men...@gmail.com wrote: Hi Rahul, We plan to add online model updates with Spark Streaming, perhaps in v1.1, starting with linear methods. Please open a JIRA for Naive Bayes. For Naive Bayes, we need to update the priors and conditional probabilities, which means we should also remember the number of observations for the updates. Best, Xiangrui On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hi, I am using the MLlib Naive Bayes for a text classification problem. I have very less amount of training data. And then the data will be coming continuously and I need to classify it as either A or B. I am training the MLlib Naive Bayes model using the training data but next time when data comes, I want to predict its class and then incorporate that also in the model for next time prediction of new data(I think that is obvious). So I am not able to figure out what is the way to do that using MLlib Naive Bayes. Is it that I have to train the model on the whole data every time new data comes in?? Thanks in Advance! -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple column families vs Multiple tables
ö_ö you should send this message to hbase user list, not spark user list... but i can give you some personal advice about this, keep column families as few as possible! at least, use some prefix of column qualifier could also be an idea. but read performance may be worse for your use case like search for a row with value x in column family A and with value Y in column family B. so it depends on which workload is important for you, if your use case is very read-heavy and you really want to use multi column families to hold a good read performance, you should try to disable region split, adjust compaction interval carefully, and so on. there is a good slide for this: http://photo.weibo.com/1431095941/wbphotos/large/mid/3735178188435939/pid/554cca85gw1eiloddlqa5j20or0ik77z more slides about hbase + coprocessor, hbase + hive and hbase + spark: http://www.weibo.com/1431095941/BeL90zozx -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-column-families-vs-Multiple-tables-tp12425p12439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Only master is really busy at KMeans training
There are only 5 worker nodes. So please try to reduce the number of partitions to the number of available CPU cores. 1000 partitions are too bigger, because the driver needs to collect to task result from each partition. -Xiangrui On Tue, Aug 19, 2014 at 1:41 PM, durin m...@simon-schaefer.net wrote: When trying to use KMeans.train with some large data and 5 worker nodes, it would due to BlockManagers shutting down because of timeout. I was able to prevent that by adding spark.storage.blockManagerSlaveTimeoutMs 300 to the spark-defaults.conf. However, with 1 Million feature vectors, the Stage takeSample at KMeans.scala:263 runs for about 50 minutes. In this time, about half of the tasks are done, then I lose the executors and Spark starts a new repartitioning stage. I also noticed that in the takeSample stage, the task was running for about 2.5 minutes until suddenly it is finished and duration (prev. those 2.5min) change to 2s, with 0.9s GC time. The training data is supplied in this form: var vectors2 = vectors.repartition(1000).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) var broadcastVector = sc.broadcast(vectors2) The 1000 partitions is something that could probably be optimized, but too few will cause OOM erros. Using Ganglia, I can see that the master node is the only one that is properly busy regarding CPU, and that most (600-700 of 800 total percent CPU) is used by the master. The workers on each node only use 1 Core, i.e. 100% CPU. What would be the most likely cause for such an inefficient use of the cluster, and how to prevent it? Number of partitions, way of caching, ...? I'm trying to find out myself with tests, but ideas from someone with more experience are very welcome. Best regards, simn -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Only-master-is-really-busy-at-KMeans-training-tp12411.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Naive Bayes
The ratio should be okay. Could you try to pre-process the data and map -999.0 to 0 before calling NaiveBayes? Btw, I added a check to ensure nonnegative features values: https://github.com/apache/spark/pull/2038 -Xiangrui On Tue, Aug 19, 2014 at 1:39 PM, Phuoc Do phu...@vida.io wrote: Hi Xiangrui, Training data: 42945 s out of 124659. Test data: 42722 s out of 125341. The ratio is very much the same. I tried Decision Tree. It outputs 0 to 1 decimals. I don't quite understand it yet. Would feature scaling make it work for Naive Bayes? Phuoc Do On Tue, Aug 19, 2014 at 12:51 AM, Xiangrui Meng men...@gmail.com wrote: What is the ratio of examples labeled `s` to those labeled `b`? Also, Naive Bayes doesn't work on negative feature values. It assumes term frequencies as the input. We should throw an exception on negative feature values. -Xiangrui On Tue, Aug 19, 2014 at 12:07 AM, Phuoc Do phu...@vida.io wrote: I'm trying Naive Bayes classifier for Higg Boson challenge on Kaggle: http://www.kaggle.com/c/higgs-boson Here's the source code I'm working on: https://github.com/dnprock/SparkHiggBoson/blob/master/src/main/scala/KaggleHiggBosonLabel.scala Training data looks like this: 10,138.47,51.655,97.827,27.98,0.91,124.711,2.666,3.064,41.928,197.76,1.582,1.396,0.2,32.638,1.017,0.381,51.626,2.273,-2.414,16.824,-0.277,258.733,2,67.435,2.15,0.444,46.062,1.24,-2.475,113.497,s 11,160.937,68.768,103.235,48.146,-999,-999,-999,3.473,2.078,125.157,0.879,1.414,-999,42.014,2.039,-3.011,36.918,0.501,0.103,44.704,-1.916,164.546,1,46.226,0.725,1.158,-999,-999,-999,46.226,b My problem is Naive Bayes classifier always outputs 0 (for b) for all test data. I appreciate any help. -- Phuoc Do https://vida.io/dnprock -- Phuoc Do https://vida.io/dnprock - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple column families vs Multiple tables
bq. does not do well with anything above two or three column families Current hbase releases, such as 0.98.x, would do better than the above. 5 column families should be accommodated. Cheers On Tue, Aug 19, 2014 at 3:06 PM, Wei Liu wei@stellarloyalty.com wrote: We are doing schema design for our application, One thing we are not so clear about is multiple column families (more than 3, probably 5 - 8) vs multiple tables. In our use case, we will have the same number of rows in all these column families, but some column families may be modified more often than others, and some column families will have more columns than others (thousands vs several). The reason we are thinking about multiple column families is that it probably can give us better performance if we need to do a search with data from multiple column families. For example, search for a row with value x in column family A and with value Y in column family B. On the other hand, we saw the following paragraph in the user guide which is scary to us: HBase currently does not do well with anything above two or three column families so keep the number of column families in your schema low. Currently, flushing and compactions are done on a per Region basis so if one column family is carrying the bulk of the data bringing on flushes, the adjacent families will also be flushed though the amount of data they carry is small. When many column families the flushing and compaction interaction can make for a bunch of needless i/o loading (To be addressed by changing flushing and compaction to work on a per column family basis). For more information on compactions, see Section 9.7.6.7, “Compaction” http://hbase.apache.org/book.html#compaction. Can any one please shed some light on this topic? Thanks in advance. Thanks, Wei
Re: Multiple column families vs Multiple tables
Chutium, thanks for your advices. I will check out your links. I sent the email to the wrong email address! Sorry for the spam. Wei On Tue, Aug 19, 2014 at 4:49 PM, chutium teng@gmail.com wrote: ö_ö you should send this message to hbase user list, not spark user list... but i can give you some personal advice about this, keep column families as few as possible! at least, use some prefix of column qualifier could also be an idea. but read performance may be worse for your use case like search for a row with value x in column family A and with value Y in column family B. so it depends on which workload is important for you, if your use case is very read-heavy and you really want to use multi column families to hold a good read performance, you should try to disable region split, adjust compaction interval carefully, and so on. there is a good slide for this: http://photo.weibo.com/1431095941/wbphotos/large/mid/3735178188435939/pid/554cca85gw1eiloddlqa5j20or0ik77z more slides about hbase + coprocessor, hbase + hive and hbase + spark: http://www.weibo.com/1431095941/BeL90zozx -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-column-families-vs-Multiple-tables-tp12425p12439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is hive UDF are supported in HiveContext
there is no collect_list in hive 0.12 try this after this ticket is done https://issues.apache.org/jira/browse/SPARK-2706 i am also looking forward to this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-hive-UDF-are-supported-in-HiveContext-tp12310p12444.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: type issue: found RDD[T] expected RDD[A]
Hi, On Tue, Aug 19, 2014 at 7:01 PM, Patrick McGloin mcgloin.patr...@gmail.com wrote: I think the type of the data contained in your RDD needs to be a known case class and not abstract for createSchemaRDD. This makes sense when you think it needs to know about the fields in the object to create the schema. Exactly this. The actual message pointing to that is: inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A : Product] All case classes are automatically subclasses of Product, but otherwise you will have to extend Product and add the required methods yourself. Tobias
Re: Naive Bayes
I replaced -999.0 with 0. Predictions still have same label. Maybe negative feature really messes it up. On Tue, Aug 19, 2014 at 4:51 PM, Xiangrui Meng men...@gmail.com wrote: The ratio should be okay. Could you try to pre-process the data and map -999.0 to 0 before calling NaiveBayes? Btw, I added a check to ensure nonnegative features values: https://github.com/apache/spark/pull/2038 -Xiangrui On Tue, Aug 19, 2014 at 1:39 PM, Phuoc Do phu...@vida.io wrote: Hi Xiangrui, Training data: 42945 s out of 124659. Test data: 42722 s out of 125341. The ratio is very much the same. I tried Decision Tree. It outputs 0 to 1 decimals. I don't quite understand it yet. Would feature scaling make it work for Naive Bayes? Phuoc Do On Tue, Aug 19, 2014 at 12:51 AM, Xiangrui Meng men...@gmail.com wrote: What is the ratio of examples labeled `s` to those labeled `b`? Also, Naive Bayes doesn't work on negative feature values. It assumes term frequencies as the input. We should throw an exception on negative feature values. -Xiangrui On Tue, Aug 19, 2014 at 12:07 AM, Phuoc Do phu...@vida.io wrote: I'm trying Naive Bayes classifier for Higg Boson challenge on Kaggle: http://www.kaggle.com/c/higgs-boson Here's the source code I'm working on: https://github.com/dnprock/SparkHiggBoson/blob/master/src/main/scala/KaggleHiggBosonLabel.scala Training data looks like this: 10,138.47,51.655,97.827,27.98,0.91,124.711,2.666,3.064,41.928,197.76,1.582,1.396,0.2,32.638,1.017,0.381,51.626,2.273,-2.414,16.824,-0.277,258.733,2,67.435,2.15,0.444,46.062,1.24,-2.475,113.497,s 11,160.937,68.768,103.235,48.146,-999,-999,-999,3.473,2.078,125.157,0.879,1.414,-999,42.014,2.039,-3.011,36.918,0.501,0.103,44.704,-1.916,164.546,1,46.226,0.725,1.158,-999,-999,-999,46.226,b My problem is Naive Bayes classifier always outputs 0 (for b) for all test data. I appreciate any help. -- Phuoc Do https://vida.io/dnprock -- Phuoc Do https://vida.io/dnprock -- Phuoc Do https://vida.io/dnprock
Re: type issue: found RDD[T] expected RDD[A]
That might not be enough. Reflection is used to determine what the fields are, thus your class might actually need to have members corresponding to the fields in the table. I heard that a more generic method of inputting stuff is coming. On Tue, Aug 19, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Aug 19, 2014 at 7:01 PM, Patrick McGloin mcgloin.patr...@gmail.com wrote: I think the type of the data contained in your RDD needs to be a known case class and not abstract for createSchemaRDD. This makes sense when you think it needs to know about the fields in the object to create the schema. Exactly this. The actual message pointing to that is: inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A : Product] All case classes are automatically subclasses of Product, but otherwise you will have to extend Product and add the required methods yourself. Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What about implementing various hypothesis test for Logistic Regression in MLlib
Hi, From the documentation I think only the model fitting part is implement, what about the various hypothesis test and performance indexes used to evaluate the model fit? Regards, Xiaobo Gu
Re: type issue: found RDD[T] expected RDD[A]
Hi Evan, Patrick and Tobias, So, It worked for what I needed it to do. I followed Yana's suggestion of using parameterized type of [T : Product:ClassTag:TypeTag] more concretely, I was trying to make the query process a bit more fluent -some pseudocode but with correct types val table:SparkTable[POJO] = new SparkTable[POJO](sqlContext,extractor:String=POJO) val data= table.atLocation(hdfs://) .withName(tableName) .makeRDD(SELECT * FROM tableName) class SparkTable[T : Product : ClassTag :TypeTag](val sqlContext:SQLContext, val extractor: (String) = (T) ) { private[this] var location:Option[String] =None private[this] var name:Option[String]=None private[this] val sc = sqlContext.sparkContext def withName(name:String):SparkTable[T]={..} def atLocation(path:String):SparkTable[T]={.. } def makeRDD(sqlQuery:String):SchemaRDD={ ... import sqlContext._ val rdd:RDD[String] = sc.textFile(this.location.get) val rddT:RDD[T] = rdd.map(extractor) val schemaRDD= createSchemaRDD(rddT) schemaRDD.registerAsTable(name.get) val all = sqlContext.sql(sqlQuery) all } } Best, Amit On Tue, Aug 19, 2014 at 9:13 PM, Evan Chan velvia.git...@gmail.com wrote: That might not be enough. Reflection is used to determine what the fields are, thus your class might actually need to have members corresponding to the fields in the table. I heard that a more generic method of inputting stuff is coming. On Tue, Aug 19, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Aug 19, 2014 at 7:01 PM, Patrick McGloin mcgloin.patr...@gmail.com wrote: I think the type of the data contained in your RDD needs to be a known case class and not abstract for createSchemaRDD. This makes sense when you think it needs to know about the fields in the object to create the schema. Exactly this. The actual message pointing to that is: inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A : Product] All case classes are automatically subclasses of Product, but otherwise you will have to extend Product and add the required methods yourself. Tobias
Re: OutOfMemory Error
Hi, Any further info on this?? Do you think it would be useful if we have a in memory buffer implemented that stores the content of the new RDD. In case the buffer reaches a configured threshold, content of the buffer are spilled to the local disk. This saves us from OutOfMememory Error. Appreciate any suggestions in this regard. Many Thanks, Ghousia. On Mon, Aug 18, 2014 at 4:05 PM, Ghousia ghousia.ath...@gmail.com wrote: But this would be applicable only to operations that have a shuffle phase. This might not be applicable to a simple Map operation where a record is mapped to a new huge value, resulting in OutOfMemory Error. On Mon, Aug 18, 2014 at 12:34 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I believe spark.shuffle.memoryFraction is the one you are looking for. spark.shuffle.memoryFraction : Fraction of Java heap to use for aggregation and cogroups during shuffles, if spark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction. You can give it a try. Thanks Best Regards On Mon, Aug 18, 2014 at 12:21 PM, Ghousia ghousia.ath...@gmail.com wrote: Thanks for the answer Akhil. We are right now getting rid of this issue by increasing the number of partitions. And we are persisting RDDs to DISK_ONLY. But the issue is with heavy computations within an RDD. It would be better if we have the option of spilling the intermediate transformation results to local disk (only in case if memory consumption is high) . Do we have any such option available with Spark? If increasing the partitions is the only the way, then one might end up with OutOfMemory Errors, when working with certain algorithms where intermediate result is huge. On Mon, Aug 18, 2014 at 12:02 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Ghousia, You can try the following: 1. Increase the heap size https://spark.apache.org/docs/0.9.0/configuration.html 2. Increase the number of partitions http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine 3. You could try persisting the RDD to use DISK_ONLY http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence Thanks Best Regards On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj ghousia.ath...@gmail.com wrote: Hi, I am trying to implement machine learning algorithms on Spark. I am working on a 3 node cluster, with each node having 5GB of memory. Whenever I am working with slightly more number of records, I end up with OutOfMemory Error. Problem is, even if number of records is slightly high, the intermediate result from a transformation is huge and this results in OutOfMemory Error. To overcome this, we are partitioning the data such that each partition has only a few records. Is there any better way to fix this issue. Some thing like spilling the intermediate data to local disk? Thanks, Ghousia. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JVM heap and native allocation questions
I'm trying to use Spark to process some data using some native function's I've integrated using JNI and I pass around a lot of memory I've allocated inside these functions. I'm not very familiar with the JVM, so I have a couple of questions. (1) Performance seemed terrible until I LD_PRELOAD'ed libtcmalloc. Will this break any JVM functionality? (2) Spark workers seem to OOM pretty readily. How does Spark choose when to write back it's results (in my case s3:// via saveAsObjectFile)? I'm guessing that I can't keep the JVM heap size set to the system memory since I need to save space for the native allocations, but a heap size too small doesn't seem to work. Is there a way I can get it to write back earlier than usual so that I have more memory to spare? I tried to use repartition, but that generates a shuffle. In Hadoop I could just turn the number of mappers up and it would compute the splits accordingly. I don't see why a shuffle has to be involved. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JVM-heap-and-native-allocation-questions-tp12453.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org