Re: SparkSQL: Nested Query error
Hi, I am getting an error in the Query Plan when I use the SQL statement exactly as you have suggested. Is that the exact SQL statement I should be using (I am not very familiar with SQL syntax)? I also tried using the SchemaRDD's subtract method to perform this query. usersRDD.subtract(deviceRDD).count(). The count comes out to be 1, but there are many UIDs in tusers that are not in device - so the result is not correct. I would like to know the right way to do frame this query in SparkSQL. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-Query-error-tp17691p17705.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Debugging
Hi, I have installed 2 node hadoop cluster (For example, on Unix machines A and B. A master node and data node, B is data node) I am submitting my driver programs through SPARK 1.1.0 with bin/spark-submit from Putty Client from my Windows machine. I want to debug my program from Eclipse on my local machine. I am not able to find a way to debug. Please let me know the ways to debug my driver program as well as executor programs Regards, Naveen.
Re: Spark Debugging
Its called remote debugging. You can read this article http://www.eclipse.org/jetty/documentation/current/enable-remote-debugging.html for more information. You will have to make sure that the network between your cluster and windows machine can communicate with each other. Thanks Best Regards On Thu, Oct 30, 2014 at 12:06 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Hi, I have installed 2 node hadoop cluster (For example, on Unix machines A and B. A master node and data node, B is data node) I am submitting my driver programs through SPARK 1.1.0 with bin/spark-submit from Putty Client from my Windows machine. I want to debug my program from Eclipse on my local machine. I am not able to find a way to debug. Please let me know the ways to debug my driver program as well as executor programs Regards, Naveen.
Re: GC Issues with randomSplit on large dataset
Can you be more specific about numbers? I am not sure that splitting helps so much in the end, in that it has the same effect as executing a smaller number at a time of the large number of tasks that the full cartesian join would generate. The full join is probably intractable no matter what in this case? The OOM is not necessarily directly related. It depends on where it happened, what else you are doing, how much memory you gave, etc. On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hey all – not writing to necessarily get a fix but more to get an understanding of what’s going on internally here. I wish to take a cross-product of two very large RDDs (using cartesian), the product of which is well in excess of what can be stored on disk . Clearly that is intractable, thus my solution is to do things in batches - essentially I can take the cross product of a small piece of the first data set with the entirety of the other. To do this, I calculate how many items can fit into 1 gig of memory. Next, I use RDD.random Split() to partition the first data set. The issue is that I am trying to partition an RDD of several million items into several million partitions. This throws the following error: I would like to understand the internals of what’s going on here so that I can adjust my approach accordingly. Thanks in advance. 14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.lang.String.substring(String.java:1913) at java.lang.String.subSequence(String.java:1946) at java.util.regex.Matcher.getSubSequence(Matcher.java:1245) at java.util.regex.Matcher.group(Matcher.java:490) at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675) at java.util.Formatter.parse(Formatter.java:2528) at java.util.Formatter.format(Formatter.java:2469) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:944) at org.apache.spark.rdd.RDD.init(RDD.scala:1227) at org.apache.spark.rdd.RDD.init(RDD.scala:83) at org.apache.spark.rdd.PartitionwiseSampledRDD.init(PartitionwiseSampledRDD.scala:47) at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:378) at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:377) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD.randomSplit(RDD.scala:379) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information
Re: Algebird using spark-shell
I've tried: . /bin/spark-shell --jars algebird-core_2.10-0.8.1.jar scala import com.twitter.algebird._ import com.twitter.algebird._ scala import HyperLogLog._ import HyperLogLog._ scala import com.twitter.algebird.HyperLogLogMonoid import com.twitter.algebird.HyperLogLogMonoid scala val hll = new HyperLogLogMonoid(12) hll: com.twitter.algebird.HyperLogLogMonoid = com.twitter.algebird.HyperLogLogMonoid@7bde289a https://github.com/twitter/algebird/wiki/Algebird-Examples-with-REPL -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701p17714.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NonSerializable Exception in foreachRDD
Hi all, In Spark Streaming, when I do foreachRDD on my DStreams, I get a NonSerializable exception when I try to do something like: DStream.foreachRDD( rdd = { var sc.parallelize(Seq((test, blah))) }) Is there any way around that ? Thanks, Harold
Re: BUG: when running as extends App, closures don't capture variables
Very coincidentally I ran into something equally puzzling yesterday where something was bizarrely null when it can't have been in a Spark program that extends App. I also changed to use main() and it works fine. So definitely some issue here. If nobody makes a JIRA before I get home I'll do it. On Oct 29, 2014 11:20 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! This might be a documentation issue as opposed to a coding issue, in that perhaps the correct answer is don't do that, but as this is not obvious, I am writing. The following code produces output most would not expect: package misc import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object DemoBug extends App { val conf = new SparkConf() val sc = new SparkContext(conf) val rdd = sc.parallelize(List(A,B,C,D)) val str1 = A val rslt1 = rdd.filter(x = { x != A }).count val rslt2 = rdd.filter(x = { str1 != null x != A }).count println(DemoBug: rslt1 = + rslt1 + rslt2 = + rslt2) } This produces the output: DemoBug: rslt1 = 3 rslt2 = 0 Compiled with sbt: libraryDependencies += org.apache.spark % spark-core_2.10 % 1.1.0 Run on an EC2 EMR instance with a recent image (hadoop 2.4.0, spark 1.1.0) If instead there is a proper main(), it works as expected. Thank you. Sincerely, Mike
sharing RDDs between PySpark and Scala
I'm processing some data using PySpark and I'd like to save the RDDs to disk (they are (k,v) RDDs of strings and SparseVector types) and read them in using Scala to run them through some other analysis. Is this possible? Thanks, Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sharing-RDDs-between-PySpark-and-Scala-tp17718.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Debugging
Thanks Best Regards On Thu, Oct 30, 2014 at 1:43 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Awesome. Thanks Best Regards On Thu, Oct 30, 2014 at 1:30 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Thanks Akhil, It is working. Regards, Naveen *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Thursday, October 30, 2014 1:24 PM *To:* Naveen Kumar Pokala *Subject:* Re: Spark Debugging Yes you can, but the values might not give you a complete clue. Give it a try Thanks Best Regards On Thu, Oct 30, 2014 at 1:20 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Do you mean can’t I debug line by line? Thanks and Regards Naveen *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Thursday, October 30, 2014 1:19 PM *To:* Naveen Kumar Pokala *Subject:* Re: Spark Debugging Master machine. The execution will get pause in eclipse regardless of where its executing the program, you might not get a complete flow, but you can see whats happening. Thanks Best Regards On Thu, Oct 30, 2014 at 1:10 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Hi Akhil, With below configuration I am successfully able to connect to Node A(Master node) where I am running the spark submit program, Spark may give driver program to another Node B also. Which machine IP should I use to debug the programs from eclipse. Thanks, Naveen *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Thursday, October 30, 2014 12:35 PM *To:* Naveen Kumar Pokala *Subject:* Re: Spark Debugging Hi Naveen, You should be able to connect to port also from your windows machine. Thanks Best Regards On Thu, Oct 30, 2014 at 12:32 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Hi Akhil, I have gone through the article. But I am not starting my java program with Java command, I am submitting through spark-submit and I have added like below is that ok? bin/spark-submit --class sample.spark.test.SparkJob --master yarn-cluster --driver-memory 4g --executor-memory --conf spark.executor.extraJavaOptions=-Xdebug -agentlib:jdwp=transport=dt_socket,address=,server=y,suspend=n /home/npokala/data/test.jar I am able to connect to cluster from my windows machine only through putty. I am able to access the cluster information and spark information from my browser. Is that sufficient.? Regards, Naveen. *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Thursday, October 30, 2014 12:10 PM *To:* Naveen Kumar Pokala *Cc:* user@spark.apache.org *Subject:* Re: Spark Debugging Its called remote debugging. You can read this article http://www.eclipse.org/jetty/documentation/current/enable-remote-debugging.html for more information. You will have to make sure that the network between your cluster and windows machine can communicate with each other. Thanks Best Regards On Thu, Oct 30, 2014 at 12:06 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Hi, I have installed 2 node hadoop cluster (For example, on Unix machines A and B. A master node and data node, B is data node) I am submitting my driver programs through SPARK 1.1.0 with bin/spark-submit from Putty Client from my Windows machine. I want to debug my program from Eclipse on my local machine. I am not able to find a way to debug. Please let me know the ways to debug my driver program as well as executor programs Regards, Naveen.
Spark + Tableau
I'm testing beta driver from Databricks for Tableua. And unfortunately i encounter some issues. While beeline connection works without problems, Tableau can't connect to spark thrift server. Error from driver(Tableau): Unable to connect to the ODBC Data Source. Check that the necessary drivers are installed and that the connection properties are valid. [Simba][SparkODBC] (34) Error from Spark: ETIMEDOUT. Unable to connect to the server test.server.com. Check that the server is running and that you have access privileges to the requested database. Unable to connect to the server. Check that the server is running and that you have access privileges to the requested database. Exception on Thrift server: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: org.apache.thrift.transport.TTransportException at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:182) at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) ... 4 more Is there anyone else who's testing this driver, or did anyone saw this message? Best regards Bojan Kostić -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting vector values
Hi, I'm new to Mllib and spark. I'm trying to use tf-idf and use those values for term ranking. I'm getting tf values in vector format, but how can get the values of vector? val sc = new SparkContext(conf) val documents: RDD[Seq[String]] = sc.textFile(/home/andrejs/Datasets/dbpedia/test.txt).map(_.split( ).toSeq) documents.foreach(println(_)) val hashingTF = new HashingTF() val tf: RDD[Vector] = hashingTF.transform(documents) tf.foreach(println(_)) My output is : WrappedArray(a, a, b, c) WrappedArray(e, a, c, d) (1048576,[97,99,100,101],[1.0,1.0,1.0,1.0]) (1048576,[97,98,99],[2.0,1.0,1.0]) How can I get [97,99,100,101] out, and [1.0,1.0,1.0,1.0] ? And how can I map that 100 = 1.0 ? Some help is greatly appreciated, Andrejs
Re: GC Issues with randomSplit on large dataset
The split is something like 30 million into 2 milion partitions. The reason that it becomes tractable is that after I perform the Cartesian on the split data and operate on it I don't keep the full results - I actually only keep a tiny fraction of that generated dataset - making the overall dataset tractable ( I neglected to mention this in the first email). The way the code is structured I have forced linear execution until this point so at the time of execution of the split it is the only thing happening. In terms of memory I have assigned 23gb of memory and 17gb of heap. On Oct 30, 2014 3:32 AM, Sean Owen so...@cloudera.com wrote: Can you be more specific about numbers? I am not sure that splitting helps so much in the end, in that it has the same effect as executing a smaller number at a time of the large number of tasks that the full cartesian join would generate. The full join is probably intractable no matter what in this case? The OOM is not necessarily directly related. It depends on where it happened, what else you are doing, how much memory you gave, etc. On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hey all – not writing to necessarily get a fix but more to get an understanding of what’s going on internally here. I wish to take a cross-product of two very large RDDs (using cartesian), the product of which is well in excess of what can be stored on disk . Clearly that is intractable, thus my solution is to do things in batches - essentially I can take the cross product of a small piece of the first data set with the entirety of the other. To do this, I calculate how many items can fit into 1 gig of memory. Next, I use RDD.random Split() to partition the first data set. The issue is that I am trying to partition an RDD of several million items into several million partitions. This throws the following error: I would like to understand the internals of what’s going on here so that I can adjust my approach accordingly. Thanks in advance. 14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.lang.String.substring(String.java:1913) at java.lang.String.subSequence(String.java:1946) at java.util.regex.Matcher.getSubSequence(Matcher.java:1245) at java.util.regex.Matcher.group(Matcher.java:490) at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675) at java.util.Formatter.parse(Formatter.java:2528) at java.util.Formatter.format(Formatter.java:2469) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:944) at org.apache.spark.rdd.RDD.init(RDD.scala:1227) at org.apache.spark.rdd.RDD.init(RDD.scala:83) at org.apache.spark.rdd.PartitionwiseSampledRDD.init(PartitionwiseSampledRDD.scala:47) at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:378) at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:377) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at
Returned type of Broadcast variable is byte array
As a template for creating a broadcast variable, the following code snippet within mllib was used: val bcIdf = dataset.context.broadcast(idf) dataset.mapPartitions { iter = val thisIdf = bcIdf.value The new code follows that model: import org.apache.spark.mllib.linalg.{Vector = MVector} .. assert(crows.isInstanceOf[Array[MVector]]) val bcRows = sc.broadcast(crows) val GU = mat.rows.zipWithIndex.mapPartitions { case dataIter = val arrayVect = bcRows.value // bcRows.value is seen in debugger to be of type Array[Byte] .. ?? That last line is unhappy: java.lang.ClassCastException: [B cannot be cast to [Lorg.apache.spark.mllib.linalg.Vector; So the compiler is aware that the return type of the broadcast value method should be an array of vector (which it should). However the actual type is Array[Byte]. Any insights on this?
Re: Spark + Tableau
What ODBC driver are you using? We recently got the Hortonworks JODBC drivers working on a Windows box but was having issues with Mac Sent from my iPhone On Oct 30, 2014, at 4:23 AM, Bojan Kostic blood9ra...@gmail.com wrote: I'm testing beta driver from Databricks for Tableua. And unfortunately i encounter some issues. While beeline connection works without problems, Tableau can't connect to spark thrift server. Error from driver(Tableau): Unable to connect to the ODBC Data Source. Check that the necessary drivers are installed and that the connection properties are valid. [Simba][SparkODBC] (34) Error from Spark: ETIMEDOUT. Unable to connect to the server test.server.com. Check that the server is running and that you have access privileges to the requested database. Unable to connect to the server. Check that the server is running and that you have access privileges to the requested database. Exception on Thrift server: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: org.apache.thrift.transport.TTransportException at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:182) at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) ... 4 more Is there anyone else who's testing this driver, or did anyone saw this message? Best regards Bojan Kostić -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Tableau
I use beta driver SQL ODBC from Databricks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17727.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using a Database to persist and load data from
Hi Ladies and Gents, I would like to know what are the options I have if I would like to leverage Spark code I already have written to use a DB (Vertica) as its store/datasource. The data is of tabular nature. So any relational DB can essentially be used. Do I need to develop a context? If yes, how? where can I get a good example? Thank you, Asaf
issue on applying SVM to 5 million examples.
Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Re: Algebird using spark-shell
Whats the error with the 2.10 version of algebird? On Thu, Oct 30, 2014 at 12:49 AM, thadude ohpre...@yahoo.com wrote: I've tried: . /bin/spark-shell --jars algebird-core_2.10-0.8.1.jar scala import com.twitter.algebird._ import com.twitter.algebird._ scala import HyperLogLog._ import HyperLogLog._ scala import com.twitter.algebird.HyperLogLogMonoid import com.twitter.algebird.HyperLogLogMonoid scala val hll = new HyperLogLogMonoid(12) hll: com.twitter.algebird.HyperLogLogMonoid = com.twitter.algebird.HyperLogLogMonoid@7bde289a https://github.com/twitter/algebird/wiki/Algebird-Examples-with-REPL -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701p17714.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: problem with start-slaves.sh
Roberto, I don't think shark is an issue -- I have shark server running on a node that also acts as a worker. What you can do is turn off shark server, just run start-all to start your spark cluster. then you can try bin/spark-shell --master yourmasterip and see if you can successfully run some hello world stuff. This will verify you have a working Spark cluster. Shark is just an application on top of it, so I can't imagine that's what's causing interference. But stopping it is the simplest way to check. On Wed, Oct 29, 2014 at 10:54 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: hi Yana, in my case I did not start any spark worker. However, shark was definitely running. Do you think that might be a problem? I will take a look Thank you, -- *From:* Yana Kadiyska [yana.kadiy...@gmail.com] *Sent:* Wednesday, October 29, 2014 9:45 AM *To:* Pagliari, Roberto *Cc:* user@spark.apache.org *Subject:* Re: problem with start-slaves.sh I see this when I start a worker and then try to start it again forgetting it's already running (I don't use start-slaves, I start the slaves individually with start-slave.sh). All this is telling you is that there is already a running process on that machine. You can see it if you do a ps -aef|grep worker you can look on the spark UI and see if your master shows this machine as connected to it already. If it doesn't, you might want to kill the worker process and restart it. On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build with PHive option to be able to interface with hive) I’m getting this ip_address: org.apache.spark.deploy.worker.Worker running as process . Stop it first. Am I doing something wrong? In my specific case, shark+hive is running on the nodes. Does that interfere with spark? Thank you,
RE: problem with start-slaves.sh
I also didn’t realize I was trying to bring up the 2ndNameNode as a slave.. that might be an issue as well.. Thanks, From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com] Sent: Thursday, October 30, 2014 11:27 AM To: Pagliari, Roberto Cc: user@spark.apache.org Subject: Re: problem with start-slaves.sh Roberto, I don't think shark is an issue -- I have shark server running on a node that also acts as a worker. What you can do is turn off shark server, just run start-all to start your spark cluster. then you can try bin/spark-shell --master yourmasterip and see if you can successfully run some hello world stuff. This will verify you have a working Spark cluster. Shark is just an application on top of it, so I can't imagine that's what's causing interference. But stopping it is the simplest way to check. On Wed, Oct 29, 2014 at 10:54 PM, Pagliari, Roberto rpagli...@appcomsci.commailto:rpagli...@appcomsci.com wrote: hi Yana, in my case I did not start any spark worker. However, shark was definitely running. Do you think that might be a problem? I will take a look Thank you, From: Yana Kadiyska [yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com] Sent: Wednesday, October 29, 2014 9:45 AM To: Pagliari, Roberto Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: problem with start-slaves.sh I see this when I start a worker and then try to start it again forgetting it's already running (I don't use start-slaves, I start the slaves individually with start-slave.sh). All this is telling you is that there is already a running process on that machine. You can see it if you do a ps -aef|grep worker you can look on the spark UI and see if your master shows this machine as connected to it already. If it doesn't, you might want to kill the worker process and restart it. On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto rpagli...@appcomsci.commailto:rpagli...@appcomsci.com wrote: I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build with PHive option to be able to interface with hive) I’m getting this ip_address: org.apache.spark.deploy.worker.Worker running as process . Stop it first. Am I doing something wrong? In my specific case, shark+hive is running on the nodes. Does that interfere with spark? Thank you,
Re: Spark + Tableau
When you are starting the thrift server service - are you connecting to it locally or is this on a remote server when you use beeline and/or Tableau? On Thu, Oct 30, 2014 at 8:00 AM, Bojan Kostic blood9ra...@gmail.com wrote: I use beta driver SQL ODBC from Databricks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17727.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: issue on applying SVM to 5 million examples.
Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Best way to partition RDD
Hi. I am running an application in the Spark which first loads data from Cassandra and then performs some map/reduce jobs. val srdd = sqlContext.sql(select * from mydb.mytable ) I noticed that the srdd only has one partition . no matter how big is the data loaded form Cassandra. So I perform repartition on the RDD , and then I did the map/reduce functions. But the main problem is that repartition takes so much time (almost 2 min), which is not acceptable in my use-case. Is there any better way to do repartitioning? best, /Shahab
Re: Spark + Tableau
I'm connecting to it remotly with tableau/beeline. On Thu Oct 30 16:51:13 2014 GMT+0100, Denny Lee [via Apache Spark User List] wrote: When you are starting the thrift server service - are you connecting to it locally or is this on a remote server when you use beeline and/or Tableau? On Thu, Oct 30, 2014 at 8:00 AM, Bojan Kostic blood9ra...@gmail.com wrote: I use beta driver SQL ODBC from Databricks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17727.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org ___ If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17734.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Spark + Tableau, visit http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=17720code=Ymxvb2Q5cmF2ZW5AZ21haWwuY29tfDE3NzIwfDU5NzgxNDc0Ng= -- Sent from my Jolla -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Tableau-tp17720p17737.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: GC Issues with randomSplit on large dataset
GC limit overhead exceeded is usually sign of either inadequate heap size (not the case here) or application produces garbage (temp objects) faster than garbage collector collects them - GC consumes most CPU cycles. 17G of Java heap is quite large for many application and is above safe and recommended limit (6-8GB) for Java server application. From what I saw in the stack trace I can conclude that some operations in RDD implementation are heap polluters. I am not the expert in Spark but it seems that Spark is not well optimized yet to work with reasonably large Java heaps. One of the options here is try to reduce JVM heap size and reduce data size per JVM instance. -Vladimir Rodionov On Thu, Oct 30, 2014 at 5:22 AM, Ilya Ganelin ilgan...@gmail.com wrote: The split is something like 30 million into 2 milion partitions. The reason that it becomes tractable is that after I perform the Cartesian on the split data and operate on it I don't keep the full results - I actually only keep a tiny fraction of that generated dataset - making the overall dataset tractable ( I neglected to mention this in the first email). The way the code is structured I have forced linear execution until this point so at the time of execution of the split it is the only thing happening. In terms of memory I have assigned 23gb of memory and 17gb of heap. On Oct 30, 2014 3:32 AM, Sean Owen so...@cloudera.com wrote: Can you be more specific about numbers? I am not sure that splitting helps so much in the end, in that it has the same effect as executing a smaller number at a time of the large number of tasks that the full cartesian join would generate. The full join is probably intractable no matter what in this case? The OOM is not necessarily directly related. It depends on where it happened, what else you are doing, how much memory you gave, etc. On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hey all – not writing to necessarily get a fix but more to get an understanding of what’s going on internally here. I wish to take a cross-product of two very large RDDs (using cartesian), the product of which is well in excess of what can be stored on disk . Clearly that is intractable, thus my solution is to do things in batches - essentially I can take the cross product of a small piece of the first data set with the entirety of the other. To do this, I calculate how many items can fit into 1 gig of memory. Next, I use RDD.random Split() to partition the first data set. The issue is that I am trying to partition an RDD of several million items into several million partitions. This throws the following error: I would like to understand the internals of what’s going on here so that I can adjust my approach accordingly. Thanks in advance. 14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.lang.String.substring(String.java:1913) at java.lang.String.subSequence(String.java:1946) at java.util.regex.Matcher.getSubSequence(Matcher.java:1245) at java.util.regex.Matcher.group(Matcher.java:490) at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675) at java.util.Formatter.parse(Formatter.java:2528) at java.util.Formatter.format(Formatter.java:2469) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at
Re: Manipulating RDDs within a DStream
Hi, Sorry, there's a typo there: val arr = rdd.toArray Harold On Thu, Oct 30, 2014 at 9:58 AM, Harold Nguyen har...@nexgate.com wrote: Hi all, I'd like to be able to modify values in a DStream, and then send it off to an external source like Cassandra, but I keep getting Serialization errors and am not sure how to use the correct design pattern. I was wondering if you could help me. I'd like to be able to do the following: wordCounts.foreachRDD( rdd = { val arr = record.toArray ... }) I would like to use the arr to send back to cassandra, for instance: Use it like this: val collection = sc.parallelize(Seq(a.head._1, a.head_.2)) collection.saveToCassandra() Or something like that, but as you know, I can't do this within the foreacRDD but only at the driver level. How do I use the arr variable to do something like that ? Thanks for any help, Harold
Manipulating RDDs within a DStream
Hi all, I'd like to be able to modify values in a DStream, and then send it off to an external source like Cassandra, but I keep getting Serialization errors and am not sure how to use the correct design pattern. I was wondering if you could help me. I'd like to be able to do the following: wordCounts.foreachRDD( rdd = { val arr = record.toArray ... }) I would like to use the arr to send back to cassandra, for instance: Use it like this: val collection = sc.parallelize(Seq(a.head._1, a.head_.2)) collection.saveToCassandra() Or something like that, but as you know, I can't do this within the foreacRDD but only at the driver level. How do I use the arr variable to do something like that ? Thanks for any help, Harold
Re: Algebird using spark-shell
Thanks.. I was using Scala 2.11.1 and was able to use algebird-core_2.10-0.1.11.jar with spark-shell. On Thu, Oct 30, 2014 at 8:22 AM, Ian O'Connell i...@ianoconnell.com wrote: Whats the error with the 2.10 version of algebird? On Thu, Oct 30, 2014 at 12:49 AM, thadude ohpre...@yahoo.com wrote: I've tried: . /bin/spark-shell --jars algebird-core_2.10-0.8.1.jar scala import com.twitter.algebird._ import com.twitter.algebird._ scala import HyperLogLog._ import HyperLogLog._ scala import com.twitter.algebird.HyperLogLogMonoid import com.twitter.algebird.HyperLogLogMonoid scala val hll = new HyperLogLogMonoid(12) hll: com.twitter.algebird.HyperLogLogMonoid = com.twitter.algebird.HyperLogLogMonoid@7bde289a https://github.com/twitter/algebird/wiki/Algebird-Examples-with-REPL -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701p17714.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best way to partition RDD
Hi Helena, Well... I am just running a toy example, I have one Cassandra node co-located with the Spark Master and one of Spark Workers, all in one machine. I have another node which runs the second Spark worker. /Shahab, On Thu, Oct 30, 2014 at 6:12 PM, Helena Edelson helena.edel...@datastax.com wrote: Hi Shahab, -How many spark/cassandra nodes are in your cluster? -What is your deploy topology for spark and cassandra clusters? Are they co-located? - Helena @helenaedelson On Oct 30, 2014, at 12:16 PM, shahab shahab.mok...@gmail.com wrote: Hi. I am running an application in the Spark which first loads data from Cassandra and then performs some map/reduce jobs. val srdd = sqlContext.sql(select * from mydb.mytable ) I noticed that the srdd only has one partition . no matter how big is the data loaded form Cassandra. So I perform repartition on the RDD , and then I did the map/reduce functions. But the main problem is that repartition takes so much time (almost 2 min), which is not acceptable in my use-case. Is there any better way to do repartitioning? best, /Shahab
Re: issue on applying SVM to 5 million examples.
DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Doing RDD.count in parallel , at at least parallelize it as much as possible?
Hi, I noticed that the count (of RDD) in many of my queries is the most time consuming one as it runs in the driver process rather then done by parallel worker nodes, Is there any way to perform count in parallel , at at least parallelize it as much as possible? best, /Shahab
Re: Best way to partition RDD
Shahab, Regardless, WRT cassandra and spark when using the spark cassandra connector, ‘spark.cassandra.input.split.size’ passed into the SparkConf configures the approx number of Cassandra partitions in a Spark partition (default 10). No repartitioning should be necessary with what you have below, but I don’t know if you are running on one node or a cluster. This is a good initial guide: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#configuration-options-for-adjusting-reads https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraRDD.scala#L26-L37 Cheers, Helena @helenaedelson On Oct 30, 2014, at 1:12 PM, Helena Edelson helena.edel...@datastax.com wrote: Hi Shahab, -How many spark/cassandra nodes are in your cluster? -What is your deploy topology for spark and cassandra clusters? Are they co-located? - Helena @helenaedelson On Oct 30, 2014, at 12:16 PM, shahab shahab.mok...@gmail.com wrote: Hi. I am running an application in the Spark which first loads data from Cassandra and then performs some map/reduce jobs. val srdd = sqlContext.sql(select * from mydb.mytable ) I noticed that the srdd only has one partition . no matter how big is the data loaded form Cassandra. So I perform repartition on the RDD , and then I did the map/reduce functions. But the main problem is that repartition takes so much time (almost 2 min), which is not acceptable in my use-case. Is there any better way to do repartitioning? best, /Shahab
Re: MLLib: libsvm - default value initialization
You can remove 0.5 from all non-zeros. -Xiangrui On Wed, Oct 29, 2014 at 9:20 PM, Sameer Tilak ssti...@live.com wrote: Hi All, I have my sparse data in libsvm format. val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, mllib/data/sample_libsvm_data.txt) I am running Linear regression. Let us say that my data has following entry: 1 1:0 4:1 I think it will assume 0 for indices 2 and 3, right? I would like to make default values to be 0.5 instead of 0. Is it possible? If not, I will have to switch to dense data and it will significantly increase the data size for me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
k-mean - result interpretation
Hello everyone, I'm trying to use MLlib's K-mean algorithm. I tried it on raw data, Here is a example of a line contained in my input data set: 82.9817 3281.4495 with those parameters: *numClusters*=4 *numIterations*=20 results: *WSSSE = 6.375371241589461E9* Then I normalized my data: 0.02219046937793337492 0.97780953062206662508 With the same parameters, result is now: *WSSSE= 0.04229916511906393* Is it normal that normalization improve my results? Why isn't the WSSSE normalized? Because it seems that having smaller values end to a smaller WSSSE I'm sure I missed something here! Florent -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/k-mean-result-interpretation-tp17748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Returned type of Broadcast variable is byte array
The byte array turns out to be a serialized ObjectOutputStream that contains a Tuple2[ParallelCollectionRDD,Function2]. What then should be done differently in the broadcast code (which follows the structure of an example taken from mllib)? assert(crows.isInstanceOf[Array[MVector]]) val bcRows = sc.broadcast(crows) .. val arrayVect = bcRows.value 2014-10-30 7:42 GMT-07:00 Stephen Boesch java...@gmail.com: As a template for creating a broadcast variable, the following code snippet within mllib was used: val bcIdf = dataset.context.broadcast(idf) dataset.mapPartitions { iter = val thisIdf = bcIdf.value The new code follows that model: import org.apache.spark.mllib.linalg.{Vector = MVector} .. assert(crows.isInstanceOf[Array[MVector]]) val bcRows = sc.broadcast(crows) val GU = mat.rows.zipWithIndex.mapPartitions { case dataIter = val arrayVect = bcRows.value // bcRows.value is seen in debugger to be of type Array[Byte] .. ?? That last line is unhappy: java.lang.ClassCastException: [B cannot be cast to [Lorg.apache.spark.mllib.linalg.Vector; So the compiler is aware that the return type of the broadcast value method should be an array of vector (which it should). However the actual type is Array[Byte]. Any insights on this?
stage failure: java.lang.IllegalStateException: unread block data
Hi, Got this error when running spark 1.1.0 to read Hbase 0.98.1 through simple python code in a ec2 cluster. The same program runs correctly in local mode. So this error only happens when running in a real cluster. Here's what I got, 14/10/30 17:51:53 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, node001, ANY, 1265 bytes) 14/10/30 17:51:53 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on executor node001: java.lang.IllegalStateException (unread block data) [duplicate 1] 14/10/30 17:51:53 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, node001, ANY, 1265 bytes) 14/10/30 17:51:53 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on executor node001: java.lang.IllegalStateException (unread block data) [duplicate 2] 14/10/30 17:51:53 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, node001, ANY, 1265 bytes) 14/10/30 17:51:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on executor node001: java.lang.IllegalStateException (unread block data) [duplicate 3] 14/10/30 17:51:53 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 14/10/30 17:51:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/10/30 17:51:53 INFO TaskSchedulerImpl: Cancelling stage 0 14/10/30 17:51:53 INFO DAGScheduler: Failed to run first at SerDeUtil.scala:70 Traceback (most recent call last): File /root/workspace/test/sparkhbase.py, line 22, in module conf=conf2) File /root/spark-1.1.0/python/pyspark/context.py, line 471, in newAPIHadoopRDD jconf, batchSize) File /root/spark-1.1.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark-1.1.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, node001): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2399) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1378) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1776) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) java.io.ObjectInputStream.readObject(ObjectInputStream.java:368) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:679) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?
Hi Shahab, Are you running Spark in Local, Standalone, YARN or Mesos mode? If you're running in Standalone/YARN/Mesos, then the .count() action is indeed automatically parallelized across multiple Executors. When you run a .count() on an RDD, it is actually distributing tasks to different executors to each do a local count on a local partition and then all the tasks send their sub-counts back to the driver for final aggregation. This sounds like the kind of behavior you're looking for. However, in Local mode, everything runs in a single JVM (the driver + executor), so there's no parallelization across Executors. On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote: Hi, I noticed that the count (of RDD) in many of my queries is the most time consuming one as it runs in the driver process rather then done by parallel worker nodes, Is there any way to perform count in parallel , at at least parallelize it as much as possible? best, /Shahab
Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?
By the way, in case you haven't done so, do try to .cache() the RDD before running a .count() on it as that could make a big speed improvement. On Thu, Oct 30, 2014 at 11:12 AM, Sameer Farooqui same...@databricks.com wrote: Hi Shahab, Are you running Spark in Local, Standalone, YARN or Mesos mode? If you're running in Standalone/YARN/Mesos, then the .count() action is indeed automatically parallelized across multiple Executors. When you run a .count() on an RDD, it is actually distributing tasks to different executors to each do a local count on a local partition and then all the tasks send their sub-counts back to the driver for final aggregation. This sounds like the kind of behavior you're looking for. However, in Local mode, everything runs in a single JVM (the driver + executor), so there's no parallelization across Executors. On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote: Hi, I noticed that the count (of RDD) in many of my queries is the most time consuming one as it runs in the driver process rather then done by parallel worker nodes, Is there any way to perform count in parallel , at at least parallelize it as much as possible? best, /Shahab
Re: Ambiguous references to id : what does it mean ?
Found this as I am having the same issue. I have exactly the same usage as shown in Michael's join example. I tried executing a SQL statement against the join data set with two columns that have the same name and tried to unambiguate the column name with the table alias, but I would still get an Unresolved attributes error back. Is there any way around this short of renaming the columns in the join sources? Thanks -Terry Michael Armbrust wrote Yes, but if both tagCollection and selectedVideos have a column named id then Spark SQL does not know which one you are referring to in the where clause. Here's an example with aliases: val x = testData2.as('x) val y = testData2.as('y) val join = x.join(y, Inner, Some(x.a.attr === y.a.attr)) On Wed, Jul 16, 2014 at 2:47 AM, Jaonary Rabarisoa lt; jaonary@ gt; wrote: My query is just a simple query that use the spark sql dsl : tagCollection.join(selectedVideos).where('videoId === 'id) On Tue, Jul 15, 2014 at 6:03 PM, Yin Huai lt; huaiyin.thu@ gt; wrote: Hi Jao, Seems the SQL analyzer cannot resolve the references in the Join condition. What is your query? Did you use the Hive Parser (your query was submitted through hql(...)) or the basic SQL Parser (your query was submitted through sql(...)). Thanks, Yin On Tue, Jul 15, 2014 at 8:52 AM, Jaonary Rabarisoa lt; jaonary@ gt; wrote: Hi all, When running a join operation with Spark SQL I got the following error : Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Ambiguous references to id: (id#303,List()),(id#0,List()), tree: Filter ('videoId = 'id) Join Inner, None ParquetRelation data/tags.parquet Filter (name#1 = P1/cam1) ParquetRelation data/videos.parquet What does it mean ? Cheers, jao
Re: stage failure: java.lang.IllegalStateException: unread block data
The worker side has error message as this, 14/10/30 18:29:00 INFO Worker: Asked to launch executor app-20141030182900-0006/0 for testspark_v1 14/10/30 18:29:01 INFO ExecutorRunner: Launch command: java -cp ::/root/spark-1.1.0/conf:/root/spark-1.1.0/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop2.3.0.jar -XX:MaxPermSize=128m -Dspark.driver.port=52552 -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@master:52552/user/CoarseGrainedScheduler 0 node001 4 akka.tcp://sparkWorker@node001:60184/user/Worker app-20141030182900-0006 14/10/30 18:29:03 INFO Worker: Asked to kill executor app-20141030182900-0006/0 14/10/30 18:29:03 INFO ExecutorRunner: Runner thread for executor app-20141030182900-0006/0 interrupted 14/10/30 18:29:03 INFO ExecutorRunner: Killing process! 14/10/30 18:29:03 ERROR FileAppender: Error writing stream to file /root/spark-1.1.0/work/app-20141030182900-0006/0/stderr java.io.IOException: Stream Closed at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:214) at org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38) 14/10/30 18:29:04 INFO Worker: Executor app-20141030182900-0006/0 finished with state KILLED exitStatus 143 14/10/30 18:29:04 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.180.49.228%3A52120-22#1336571562] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/10/30 18:29:04 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@node001:60184] - [akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with [akka.tcp://sparkExecutor@node001:37697]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@node001:37697] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: node001/10.180.49.228:37697 ] 14/10/30 18:29:04 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@node001:60184] - [akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with [akka.tcp://sparkExecutor@node001:37697]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@node001:37697] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: node001/10.180.49.228:37697 ] 14/10/30 18:29:04 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@node001:60184] - [akka.tcp://sparkExecutor@node001:37697]: Error [Association failed with [akka.tcp://sparkExecutor@node001:37697]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@node001:37697] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: node001/10.180.49.228:37697 ] Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stage-failure-java-lang-IllegalStateException-unread-block-data-tp17751p17755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
does updateStateByKey accept a state that is a tuple?
I'm trying to implement a Spark Streaming program to calculate the number of instances of a given key encountered and the minimum and maximum times at which it was encountered. updateStateByKey seems to be just the thing, but when I define the state to be a tuple, I get compile errors I'm not finding a way around. Perhaps it's something simple, but I'm stumped. var lines = ssc.textFileStream(dirArg) var linesArray = lines.map( line = (line.split(\t))) var newState = linesArray.map( lineArray = (lineArray(4), 1, Time((lineArray(0).toDouble*1000).toInt), Time((lineArray(0).toDouble*1000).toInt))) val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state: Option[(Int, Time, Time)]) = { val newCount = newValues.map( x = x._1).sum val newMinTime = newValues.map( x = x._2).min val newMaxTime = newValues.map( x = x._3).max val (count, minTime, maxTime) = state.getOrElse((0, Time(Int.MaxValue), Time(Int.MinValue))) Some((count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime, newMaxTime).max)) //(count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime, newMaxTime).max) } var DhcpSvrCum = newState.updateStateByKey[(Int, Time, Time)](updateDhcpState) The error I get is [info] Compiling 3 Scala sources to /Users/spr/Documents/.../target/scala-2.10/classes... [error] /Users/spr/Documents/...StatefulDhcpServersHisto.scala:95: value updateStateByKey is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int, org.apache.spark.streaming.Time, org.apache.spark.streaming.Time)] [error] var DhcpSvrCum = newState.updateStateByKey[(Int, Time, Time)](updateDhcpState) I don't understand why the String is being prepended to the tuple I expect (Int, Time, Time). In the main example (StatefulNetworkWordCount, here https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala ), the data is a stream of (String, Int) tuples created by val wordDstream = words.map(x = (x, 1)) and the updateFunc ignores the String key in its definition val updateFunc = (values: Seq[Int], state: Option[Int]) = { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } Is there some special-casing of a key with a simple (non-tuple) value? How could this work with a tuple value? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: issue on applying SVM to 5 million examples.
Thanks for all your help. I think I didn't cache the data. My previous cluster was expired and I don't have a chance to check the load balance or app manager. Below is my code. There are 18 features for each record and I am using the Scala API. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import java.util.Calendar object BenchmarkClassification { def main(args: Array[String]) { // Load and parse the data file val conf = new SparkConf() .setAppName(SVM) .set(spark.executor.memory, 8g) // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g) val sc = new SparkContext(conf) val data = sc.textFile(args(0)) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } val testData = sc.textFile(args(1)) val testParsedData = testData .map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } // Run training algorithm to build the model val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error // val labelAndPreds = testParsedData.map { point = // val prediction = model.predict(point.features) // (point.label, prediction) // } // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testParsedData.count // println(Training Error = + trainErr) println(Calendar.getInstance().getTime()) } } Thanks, Best, Peng On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote: DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Re: Algebird using spark-shell
Algebird 0.8.0 has 2.11 support if you want to run in a 2.11 env. On Thu, Oct 30, 2014 at 10:08 AM, Buntu Dev buntu...@gmail.com wrote: Thanks.. I was using Scala 2.11.1 and was able to use algebird-core_2.10-0.1.11.jar with spark-shell. On Thu, Oct 30, 2014 at 8:22 AM, Ian O'Connell i...@ianoconnell.com wrote: Whats the error with the 2.10 version of algebird? On Thu, Oct 30, 2014 at 12:49 AM, thadude ohpre...@yahoo.com wrote: I've tried: . /bin/spark-shell --jars algebird-core_2.10-0.8.1.jar scala import com.twitter.algebird._ import com.twitter.algebird._ scala import HyperLogLog._ import HyperLogLog._ scala import com.twitter.algebird.HyperLogLogMonoid import com.twitter.algebird.HyperLogLogMonoid scala val hll = new HyperLogLogMonoid(12) hll: com.twitter.algebird.HyperLogLogMonoid = com.twitter.algebird.HyperLogLogMonoid@7bde289a https://github.com/twitter/algebird/wiki/Algebird-Examples-with-REPL -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Algebird-using-spark-shell-tp17701p17714.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Out of memory with Spark Streaming
curious about why you're only seeing 50 records max per batch. how many receivers are you running? what is the rate that you're putting data onto the stream? per the default AWS kinesis configuration, the producer can do 1000 PUTs per second with max 50k bytes per PUT and max 1mb per second per shard. on the consumer side, you can only do 5 GETs per second and 2mb per second per shard. my hunch is that the 5 GETs per second is what's limiting your consumption rate. can you verify that these numbers match what you're seeing? if so, you may want to increase your shards and therefore the number of kinesis receivers. otherwise, this may require some further investigation on my part. i wanna stay on top of this if it's an issue. thanks for posting this, aniket! -chris On Fri, Sep 12, 2014 at 5:34 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi all Sorry but this was totally my mistake. In my persistence logic, I was creating async http client instance in RDD foreach but was never closing it leading to memory leaks. Apologies for wasting everyone's time. Thanks, Aniket On 12 September 2014 02:20, Tathagata Das tathagata.das1...@gmail.com wrote: Which version of spark are you running? If you are running the latest one, then could try running not a window but a simple event count on every 2 second batch, and see if you are still running out of memory? TD On Thu, Sep 11, 2014 at 10:34 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I did change it to be 1 gb. It still ran out of memory but a little later. The streaming job isnt handling a lot of data. In every 2 seconds, it doesn't get more than 50 records. Each record size is not more than 500 bytes. On Sep 11, 2014 10:54 PM, Bharat Venkat bvenkat.sp...@gmail.com wrote: You could set spark.executor.memory to something bigger than the default (512mb) On Thu, Sep 11, 2014 at 8:31 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am running a simple Spark Streaming program that pulls in data from Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, maps data and persists to a store. The program is running in local mode right now and runs out of memory after a while. I am yet to investigate heap dumps but I think Spark isn't releasing memory after processing is complete. I have even tried changing storage level to disk only. Help! Thanks, Aniket
Re: Do Spark executors restrict native heap vs JVM heap?
No, but, the JVM also does not allocate memory for native code on the heap. I dont think heap has any bearing on whether your native code can't allocate more memory except that of course the heap is also taking memory. On Oct 30, 2014 6:43 PM, Paul Wais pw...@yelp.com wrote: Dear Spark List, I have a Spark app that runs native code inside map functions. I've noticed that the native code sometimes sets errno to ENOMEM indicating a lack of available memory. However, I've verified that the /JVM/ has plenty of heap space available-- Runtime.getRuntime().freeMemory() shows gigabytes free and the native code needs only megabytes. Does spark limit the /native/ heap size somehow? Am poking through the executor code now but don't see anything obvious. Best Regards, -Paul Wais - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: k-mean - result interpretation
Yes that is exactly it. The values are not comparable since normalization is also shrinking all distances. Squared error is not an absolute metric. I haven't thought about this much but maybe you are looking for something like the silhouette coefficient? On Oct 30, 2014 5:35 PM, mgCl2 florent.jouante...@gmail.com wrote: Hello everyone, I'm trying to use MLlib's K-mean algorithm. I tried it on raw data, Here is a example of a line contained in my input data set: 82.9817 3281.4495 with those parameters: *numClusters*=4 *numIterations*=20 results: *WSSSE = 6.375371241589461E9* Then I normalized my data: 0.02219046937793337492 0.97780953062206662508 With the same parameters, result is now: *WSSSE= 0.04229916511906393* Is it normal that normalization improve my results? Why isn't the WSSSE normalized? Because it seems that having smaller values end to a smaller WSSSE I'm sure I missed something here! Florent -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/k-mean-result-interpretation-tp17748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
akka connection refused bug, fix?
Hi, I saw the same issue as this thread, http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-akka-connection-refused-td9864.html Anyone has a fix for this bug? Please?! The log info in my worker node is like, 14/10/30 20:15:18 INFO Worker: Asked to kill executor app-20141030201514-/0 14/10/30 20:15:18 INFO ExecutorRunner: Runner thread for executor app-20141030201514-/0 interrupted 14/10/30 20:15:18 INFO ExecutorRunner: Killing process! 14/10/30 20:15:18 INFO Worker: Executor app-20141030201514-/0 finished with state KILLED exitStatus 1 14/10/30 20:15:18 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.180.49.228%3A47087-2#-814958390] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/10/30 20:15:18 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@node001:42816] - [akka.tcp://sparkExecutor@node001:35811]: Error [Association failed with [akka.tcp://sparkExecutor@node001:35811]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@node001:35811] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: node001/10.180.49.228:35811 ] 14/10/30 20:15:18 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@node001:42816] - [akka.tcp://sparkExecutor@node001:35811]: Error [Association failed with [akka.tcp://sparkExecutor@node001:35811]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@node001:35811] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: node001/10.180.49.228:35811 ] 14/10/30 20:15:18 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@node001:42816] - [akka.tcp://sparkExecutor@node001:35811]: Error [Association failed with [akka.tcp://sparkExecutor@node001:35811]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@node001:35811] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: node001/10.180.49.228:35811 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/akka-connection-refused-bug-fix-tp17764.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Registering custom metrics
vHi, I've been exploring the metrics exposed by Spark and I'm wondering whether there's a way to register job-specific metrics that could be exposed through the existing metrics system. Would there be an example somewhere? BTW, documentation about how the metrics work could be improved. I found out about the default servlet and the metrics/json/ endpoint on the code. I could not find any reference to that on the dedicated doc page [1]. Probably something I could contribute if there's nobody on that at the moment. -kr, Gerard. [1] http://spark.apache.org/docs/1.1.0/monitoring.html#Metrics
Creating a SchemaRDD from RDD of thrift classes
I have one job with spark that creates some RDDs of type X and persists them in memory. The type X is an auto generated Thrift java class (not a case class though). Now in another job, I want to convert the RDD to a SchemaRDD using sqlContext.applySchema(). Can I derive a schema from the thrift definitions to convert RDD[X] to SchemaRDD[X]? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-SchemaRDD-from-RDD-of-thrift-classes-tp17766.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Creating a SchemaRDD from RDD of thrift classes
That should be possible, although I'm not super familiar with thrift. You'll probably need access to the generated metadata http://people.apache.org/~thejas/thrift-0.9/javadoc/org/apache/thrift/meta_data/package-frame.html . Shameless plug If you find yourself reading a lot of thrift data you might consider writing a library that goes against the new SQL Data Source API https://github.com/apache/spark/pull/2475, which is about to be merged in. Its essentially applySchema on steroids. This code for avro is possibly a useful reference: https://github.com/marmbrus/sql-avro On Thu, Oct 30, 2014 at 2:13 PM, ankits ankitso...@gmail.com wrote: I have one job with spark that creates some RDDs of type X and persists them in memory. The type X is an auto generated Thrift java class (not a case class though). Now in another job, I want to convert the RDD to a SchemaRDD using sqlContext.applySchema(). Can I derive a schema from the thrift definitions to convert RDD[X] to SchemaRDD[X]? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-SchemaRDD-from-RDD-of-thrift-classes-tp17766.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best way to partition RDD
Thanks Helena, very useful comment, But is ‘spark.cassandra.input.split.size only effective in Cluster not in Single node? best, /Shahab On Thu, Oct 30, 2014 at 6:26 PM, Helena Edelson helena.edel...@datastax.com wrote: Shahab, Regardless, WRT cassandra and spark when using the spark cassandra connector, ‘spark.cassandra.input.split.size’ passed into the SparkConf configures the approx number of Cassandra partitions in a Spark partition (default 10). No repartitioning should be necessary with what you have below, but I don’t know if you are running on one node or a cluster. This is a good initial guide: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#configuration-options-for-adjusting-reads https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraRDD.scala#L26-L37 Cheers, Helena @helenaedelson On Oct 30, 2014, at 1:12 PM, Helena Edelson helena.edel...@datastax.com wrote: Hi Shahab, -How many spark/cassandra nodes are in your cluster? -What is your deploy topology for spark and cassandra clusters? Are they co-located? - Helena @helenaedelson On Oct 30, 2014, at 12:16 PM, shahab shahab.mok...@gmail.com wrote: Hi. I am running an application in the Spark which first loads data from Cassandra and then performs some map/reduce jobs. val srdd = sqlContext.sql(select * from mydb.mytable ) I noticed that the srdd only has one partition . no matter how big is the data loaded form Cassandra. So I perform repartition on the RDD , and then I did the map/reduce functions. But the main problem is that repartition takes so much time (almost 2 min), which is not acceptable in my use-case. Is there any better way to do repartitioning? best, /Shahab
Re: does updateStateByKey accept a state that is a tuple?
I think I understand how to deal with this, though I don't have all the code working yet. The point is that the V of (K, V) can itself be a tuple. So the updateFunc prototype looks something like val updateDhcpState = (newValues: Seq[Tuple1[(Int, Time, Time)]], state: Option[Tuple1[(Int, Time, Time)]]) = Option[Tuple1[(Int, Time, Time)]] { ... } And I'm wondering whether those Tuple1()s are superfluous. Film at 11. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756p17769.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how idf is calculated
Hi, I'm writing a paper and I need to calculate tf-idf. Whit your help I managed to get results, I needed, but the problem is that I need to be able to explain how each number was gotten. So I tried to understand how idf was calculated and the numbers i get don't correspond to those I should get . I have 3 documents (each line a document) a a b c m m e a c d e e d j k l m m c When I calculate tf, I get this (1048576,[99,100,106,107,108,109],[1.0,1.0,1.0,1.0,1.0,2.0]) (1048576,[97,98,99,109],[2.0,1.0,1.0,2.0]) (1048576,[97,99,100,101],[1.0,1.0,1.0,3.0] idf is supposedly calculated idf = log((m + 1) / (d(t) + 1)) m -number of documents (3 in my case). d(t) - in how many documents is term present a: log(4/3) =0.1249387366 b: log(4/2) =0.3010299957 c: log(4/4) =0 d: log(4/3) =0.1249387366 e: log(4/2) =0.3010299957 l: log(4/2) =0.3010299957 m: log(4/3) =0.1249387366 When I output idf vector ` idf.idf.toArray.filter(_.(0)).distinct.foreach(println(_)) ` I get : 1.3862943611198906 0.28768207245178085 0.6931471805599453 I understand why there are only 3 numbers, because only 3 are unique : log(4/2), log(4/3), log(4/4), but I don't understand how numbers in idf where calculated Best regards, Andrejs
RE: how idf is calculated
Hi Andrejs,The calculations are a bit different to what I've come across in Mining Massive Datasets (2nd Ed. Ullman et. al., Cambridge Press) available here:http://www.mmds.org/ Their calculation of IDF is as follows: IDFi = log2(N / ni) where N is the number of documents and ni is the number of documents in which the word appears. This looks different to your IDF function. For TF, they use TFij = fij / maxk fkj That is: For document j, the term frequency of the term i in j is the number of times i appears in j divided by the maximum number of times any term appears in j. Stop words are usually excluded when considering the maximum). So, in your case, the TFa1 = 2 / 2 = 1 TFb1 = 1 / 2 = 0.5TFc1 = 1/2 = 0.5TFm1 = 2/2 = 1... IDFa = log2(3 / 2) = 0.585 So, TFa1 * IDFa = 0.585 Wikipedia mentions an adjustment to overcome biases for long documents, by calculating TFij = 0.5 + {(0.5*fij)/maxk fkj}, but that doesn't change anything for TFa1, as the value remains 1. In other words, my calculations don't agree with yours, and neither seem to agree with Spark :) Regards,Ashic. Date: Thu, 30 Oct 2014 22:13:49 + Subject: how idf is calculated From: andr...@sindicetech.com To: u...@spark.incubator.apache.org Hi,I'm writing a paper and I need to calculate tf-idf. Whit your help I managed to get results, I needed, but the problem is that I need to be able to explain how each number was gotten. So I tried to understand how idf was calculated and the numbers i get don't correspond to those I should get . I have 3 documents (each line a document)a a b c m me a c d e ed j k l m m c When I calculate tf, I get this (1048576,[99,100,106,107,108,109],[1.0,1.0,1.0,1.0,1.0,2.0])(1048576,[97,98,99,109],[2.0,1.0,1.0,2.0])(1048576,[97,99,100,101],[1.0,1.0,1.0,3.0] idf is supposedly calculated idf = log((m + 1) / (d(t) + 1))m -number of documents (3 in my case).d(t) - in how many documents is term presenta: log(4/3) =0.1249387366b: log(4/2) =0.3010299957c: log(4/4) =0d: log(4/3) =0.1249387366e: log(4/2) =0.3010299957l: log(4/2) =0.3010299957m: log(4/3) =0.1249387366 When I output idf vector ` idf.idf.toArray.filter(_.(0)).distinct.foreach(println(_)) `I get :1.38629436111989060.287682072451780850.6931471805599453 I understand why there are only 3 numbers, because only 3 are unique : log(4/2), log(4/3), log(4/4), but I don't understand how numbers in idf where calculated Best regards,Andrejs
Re: issue on applying SVM to 5 million examples.
Then caching should solve the problem. Otherwise, it is just loading and parsing data from disk for each iteration. -Xiangrui On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote: Thanks for all your help. I think I didn't cache the data. My previous cluster was expired and I don't have a chance to check the load balance or app manager. Below is my code. There are 18 features for each record and I am using the Scala API. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import java.util.Calendar object BenchmarkClassification { def main(args: Array[String]) { // Load and parse the data file val conf = new SparkConf() .setAppName(SVM) .set(spark.executor.memory, 8g) // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g) val sc = new SparkContext(conf) val data = sc.textFile(args(0)) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } val testData = sc.textFile(args(1)) val testParsedData = testData .map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } // Run training algorithm to build the model val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error // val labelAndPreds = testParsedData.map { point = // val prediction = model.predict(point.features) // (point.label, prediction) // } // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testParsedData.count // println(Training Error = + trainErr) println(Calendar.getInstance().getTime()) } } Thanks, Best, Peng On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote: DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL + Hive Cached Table Exception
Hi, While testing SparkSQL on top of our Hive metastore, I am getting some java.lang.ArrayIndexOutOfBoundsException while reusing a cached RDD table. Basically, I have a table mtable partitioned by some date field in hive and below is the scala code I am running in spark-shell: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); val rdd_mtable = sqlContext.sql(select * from mtable where date=20141028); rdd_mtable.registerTempTable(rdd_mtable); sqlContext.cacheTable(rdd_mtable); sqlContext.sql(select count(*) from rdd_mtable).collect(); -- OK sqlContext.sql(select count(*) from rdd_mtable).collect(); -- Exception So the first collect() is working just fine, however running the second collect() which I expect use the cached RDD throws some java.lang.ArrayIndexOutOfBoundsException, see the backtrace at the end of this email. It seems the columnar traversal is crashing for some reasons. FYI, I am using spark ToT (234de9232bcfa212317a8073c4a82c3863b36b14). java.lang.ArrayIndexOutOfBoundsException: 14 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108) at org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:89) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.columnar.InMemoryRelation.computeSizeInBytes(InMemoryColumnarTableScan.scala:66) at org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation.statisticsToBePropagated(InMemoryColumnarTableScan.scala:73) at org.apache.spark.sql.columnar.InMemoryRelation.withOutput(InMemoryColumnarTableScan.scala:147) at org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122) at org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:122) at org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:119) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.CacheManager$class.useCachedData(CacheManager.scala:119) at org.apache.spark.sql.SQLContext.useCachedData(SQLContext.scala:49) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:376) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:376) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:377) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:377) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:382) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:380) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:386) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:386) Thanks,
Re: akka connection refused bug, fix?
followed this http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Akka-Error-while-running-Spark-Jobs/td-p/18602 but the problem was not fixed.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/akka-connection-refused-bug-fix-tp17764p17774.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkContext UI
Hi All, When I load an RDD with: data = sc.textFile(somefile) I don't see the resulting RDD in the SparkContext gui on localhost:4040 in /storage. Is there something special I need to do to allow me to view this? I tried but scala and python shells but same result. Thanks Stuart
Re: SparkContext UI
Hey Stuart, The RDD won't show up under the Storage tab in the UI until it's been cached. Basically Spark doesn't know what the RDD will look like until it's cached, b/c up until then the RDD is just on disk (external to Spark). If you launch some transformations + an action on an RDD that is purely on disk, then Spark will read it from disk, compute against it and then write the results back to disk or show you the results at the scala/python shells. But when you run Spark workloads against purely on disk files, the RDD won't show up in Spark's Storage UI. Hope that makes sense... - Sameer On Thu, Oct 30, 2014 at 4:30 PM, Stuart Horsman stuart.hors...@gmail.com wrote: Hi All, When I load an RDD with: data = sc.textFile(somefile) I don't see the resulting RDD in the SparkContext gui on localhost:4040 in /storage. Is there something special I need to do to allow me to view this? I tried but scala and python shells but same result. Thanks Stuart
Re: SparkContext UI
Sorry too quick to pull the trigger on my original email. I should have added that I'm tried using persist() and cache() but no joy. I'm doing this: data = sc.textFile(somedata) data.cache data.count() but I still can't see anything in the storage? On 31 October 2014 10:42, Sameer Farooqui same...@databricks.com wrote: Hey Stuart, The RDD won't show up under the Storage tab in the UI until it's been cached. Basically Spark doesn't know what the RDD will look like until it's cached, b/c up until then the RDD is just on disk (external to Spark). If you launch some transformations + an action on an RDD that is purely on disk, then Spark will read it from disk, compute against it and then write the results back to disk or show you the results at the scala/python shells. But when you run Spark workloads against purely on disk files, the RDD won't show up in Spark's Storage UI. Hope that makes sense... - Sameer On Thu, Oct 30, 2014 at 4:30 PM, Stuart Horsman stuart.hors...@gmail.com wrote: Hi All, When I load an RDD with: data = sc.textFile(somefile) I don't see the resulting RDD in the SparkContext gui on localhost:4040 in /storage. Is there something special I need to do to allow me to view this? I tried but scala and python shells but same result. Thanks Stuart
Re: issue on applying SVM to 5 million examples.
Hi Xiangrui, Can you give me some code example about caching, as I am new to Spark. Thanks, Best, Peng On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote: Then caching should solve the problem. Otherwise, it is just loading and parsing data from disk for each iteration. -Xiangrui On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote: Thanks for all your help. I think I didn't cache the data. My previous cluster was expired and I don't have a chance to check the load balance or app manager. Below is my code. There are 18 features for each record and I am using the Scala API. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import java.util.Calendar object BenchmarkClassification { def main(args: Array[String]) { // Load and parse the data file val conf = new SparkConf() .setAppName(SVM) .set(spark.executor.memory, 8g) // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g) val sc = new SparkContext(conf) val data = sc.textFile(args(0)) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } val testData = sc.textFile(args(1)) val testParsedData = testData .map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } // Run training algorithm to build the model val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error // val labelAndPreds = testParsedData.map { point = // val prediction = model.predict(point.features) // (point.label, prediction) // } // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testParsedData.count // println(Training Error = + trainErr) println(Calendar.getInstance().getTime()) } } Thanks, Best, Peng On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote: DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Scaladoc
How do I build the scaladoc html files from the spark source distribution? Alex Bareta
Re: SparkContext UI
Hi Stuart, You're close! Just add a () after the cache, like: data.cache() ...and then run the .count() action on it and you should be good to see it in the Storage UI! - Sameer On Thu, Oct 30, 2014 at 4:50 PM, Stuart Horsman stuart.hors...@gmail.com wrote: Sorry too quick to pull the trigger on my original email. I should have added that I'm tried using persist() and cache() but no joy. I'm doing this: data = sc.textFile(somedata) data.cache data.count() but I still can't see anything in the storage? On 31 October 2014 10:42, Sameer Farooqui same...@databricks.com wrote: Hey Stuart, The RDD won't show up under the Storage tab in the UI until it's been cached. Basically Spark doesn't know what the RDD will look like until it's cached, b/c up until then the RDD is just on disk (external to Spark). If you launch some transformations + an action on an RDD that is purely on disk, then Spark will read it from disk, compute against it and then write the results back to disk or show you the results at the scala/python shells. But when you run Spark workloads against purely on disk files, the RDD won't show up in Spark's Storage UI. Hope that makes sense... - Sameer On Thu, Oct 30, 2014 at 4:30 PM, Stuart Horsman stuart.hors...@gmail.com wrote: Hi All, When I load an RDD with: data = sc.textFile(somefile) I don't see the resulting RDD in the SparkContext gui on localhost:4040 in /storage. Is there something special I need to do to allow me to view this? I tried but scala and python shells but same result. Thanks Stuart
Re: issue on applying SVM to 5 million examples.
sampleRDD. cache() Sent from my iPhone On Oct 30, 2014, at 5:01 PM, peng xia toxiap...@gmail.com wrote: Hi Xiangrui, Can you give me some code example about caching, as I am new to Spark. Thanks, Best, Peng On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote: Then caching should solve the problem. Otherwise, it is just loading and parsing data from disk for each iteration. -Xiangrui On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote: Thanks for all your help. I think I didn't cache the data. My previous cluster was expired and I don't have a chance to check the load balance or app manager. Below is my code. There are 18 features for each record and I am using the Scala API. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import java.util.Calendar object BenchmarkClassification { def main(args: Array[String]) { // Load and parse the data file val conf = new SparkConf() .setAppName(SVM) .set(spark.executor.memory, 8g) // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g) val sc = new SparkContext(conf) val data = sc.textFile(args(0)) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } val testData = sc.textFile(args(1)) val testParsedData = testData .map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } // Run training algorithm to build the model val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error // val labelAndPreds = testParsedData.map { point = // val prediction = model.predict(point.features) // (point.label, prediction) // } // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testParsedData.count // println(Training Error = + trainErr) println(Calendar.getInstance().getTime()) } } Thanks, Best, Peng On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote: DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Confused about class paths in spark 1.1.0
Hi, I've been trying to move up from spark 0.9.2 to 1.1.0. I'm getting a little confused with the setup for a few different use cases, grateful for any pointers... (1) spark-shell + with jars that are only required by the driver (1a) I added spark.driver.extraClassPath /mypath/to.jar to my spark-defaults.conf I launched spark-shell with: ./spark-shell Here I see on the WebUI that spark.driver.extraClassPath has been set, but I am NOT able to access any methods in the jar. (1b) I removed spark.driver.extraClassPath from my spark-default.conf I launched spark-shell with .//spark-shell --driver.class.path /mypath/to.jar Again I see that the WebUI spark.driver.extraClassPath has been set. But this time I am able to access the methods in the jar. Q: Is spark-shell not considered the driver in this case? why does using --driver.class.path on the command line have a different behavior to setting it in spark-defaults.conf ? (2) Rather than adding each jar individually, is there a way to use wildcards? Previously with SPARK_CLASS_PATH I was able to use mypath/* but with --driver.class.path it seems to require individual files. tks Shay
Re: issue on applying SVM to 5 million examples.
Thanks Jimmy. I will have a try. Thanks very much for your guys' help. Best, Peng On Thu, Oct 30, 2014 at 8:19 PM, Jimmy ji...@sellpoints.com wrote: sampleRDD. cache() Sent from my iPhone On Oct 30, 2014, at 5:01 PM, peng xia toxiap...@gmail.com wrote: Hi Xiangrui, Can you give me some code example about caching, as I am new to Spark. Thanks, Best, Peng On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote: Then caching should solve the problem. Otherwise, it is just loading and parsing data from disk for each iteration. -Xiangrui On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote: Thanks for all your help. I think I didn't cache the data. My previous cluster was expired and I don't have a chance to check the load balance or app manager. Below is my code. There are 18 features for each record and I am using the Scala API. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import java.util.Calendar object BenchmarkClassification { def main(args: Array[String]) { // Load and parse the data file val conf = new SparkConf() .setAppName(SVM) .set(spark.executor.memory, 8g) // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g) val sc = new SparkContext(conf) val data = sc.textFile(args(0)) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } val testData = sc.textFile(args(1)) val testParsedData = testData .map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } // Run training algorithm to build the model val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error // val labelAndPreds = testParsedData.map { point = // val prediction = model.predict(point.features) // (point.label, prediction) // } // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testParsedData.count // println(Training Error = + trainErr) println(Calendar.getInstance().getTime()) } } Thanks, Best, Peng On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote: DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Re: Confused about class paths in spark 1.1.0
Try using --jars instead of the driver-only options; they should work with spark-shell too but they may be less tested. Unfortunately, you do have to specify each JAR separately; you can maybe use a shell script to list a directory and get a big list, or set up a project that builds all of the dependencies into one assembly JAR. Matei On Oct 30, 2014, at 5:24 PM, Shay Seng s...@urbanengines.com wrote: Hi, I've been trying to move up from spark 0.9.2 to 1.1.0. I'm getting a little confused with the setup for a few different use cases, grateful for any pointers... (1) spark-shell + with jars that are only required by the driver (1a) I added spark.driver.extraClassPath /mypath/to.jar to my spark-defaults.conf I launched spark-shell with: ./spark-shell Here I see on the WebUI that spark.driver.extraClassPath has been set, but I am NOT able to access any methods in the jar. (1b) I removed spark.driver.extraClassPath from my spark-default.conf I launched spark-shell with .//spark-shell --driver.class.path /mypath/to.jar Again I see that the WebUI spark.driver.extraClassPath has been set. But this time I am able to access the methods in the jar. Q: Is spark-shell not considered the driver in this case? why does using --driver.class.path on the command line have a different behavior to setting it in spark-defaults.conf ? (2) Rather than adding each jar individually, is there a way to use wildcards? Previously with SPARK_CLASS_PATH I was able to use mypath/* but with --driver.class.path it seems to require individual files. tks Shay - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Confused about class paths in spark 1.1.0
-- jars does indeed work but this causes the jars to also get shipped to the workers -- which I don't want to do for efficiency reasons. I think you are saying that setting spark.driver.extraClassPath in spark-default.conf ought to have the same behavior as providing --driver.class.apth to spark-shell. Correct? If so I will file a bug report since this is definitely not the case. On Thu, Oct 30, 2014 at 5:39 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Try using --jars instead of the driver-only options; they should work with spark-shell too but they may be less tested. Unfortunately, you do have to specify each JAR separately; you can maybe use a shell script to list a directory and get a big list, or set up a project that builds all of the dependencies into one assembly JAR. Matei On Oct 30, 2014, at 5:24 PM, Shay Seng s...@urbanengines.com wrote: Hi, I've been trying to move up from spark 0.9.2 to 1.1.0. I'm getting a little confused with the setup for a few different use cases, grateful for any pointers... (1) spark-shell + with jars that are only required by the driver (1a) I added spark.driver.extraClassPath /mypath/to.jar to my spark-defaults.conf I launched spark-shell with: ./spark-shell Here I see on the WebUI that spark.driver.extraClassPath has been set, but I am NOT able to access any methods in the jar. (1b) I removed spark.driver.extraClassPath from my spark-default.conf I launched spark-shell with .//spark-shell --driver.class.path /mypath/to.jar Again I see that the WebUI spark.driver.extraClassPath has been set. But this time I am able to access the methods in the jar. Q: Is spark-shell not considered the driver in this case? why does using --driver.class.path on the command line have a different behavior to setting it in spark-defaults.conf ? (2) Rather than adding each jar individually, is there a way to use wildcards? Previously with SPARK_CLASS_PATH I was able to use mypath/* but with --driver.class.path it seems to require individual files. tks Shay
Re: Confused about class paths in spark 1.1.0
Yeah, I think you should file this as a bug. The problem is that JARs need to also be added into the Scala compiler and REPL class loader, and we probably don't do this for the ones in this driver config property. Matei On Oct 30, 2014, at 6:07 PM, Shay Seng s...@urbanengines.com wrote: -- jars does indeed work but this causes the jars to also get shipped to the workers -- which I don't want to do for efficiency reasons. I think you are saying that setting spark.driver.extraClassPath in spark-default.conf ought to have the same behavior as providing --driver.class.apth to spark-shell. Correct? If so I will file a bug report since this is definitely not the case. On Thu, Oct 30, 2014 at 5:39 PM, Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com wrote: Try using --jars instead of the driver-only options; they should work with spark-shell too but they may be less tested. Unfortunately, you do have to specify each JAR separately; you can maybe use a shell script to list a directory and get a big list, or set up a project that builds all of the dependencies into one assembly JAR. Matei On Oct 30, 2014, at 5:24 PM, Shay Seng s...@urbanengines.com mailto:s...@urbanengines.com wrote: Hi, I've been trying to move up from spark 0.9.2 to 1.1.0. I'm getting a little confused with the setup for a few different use cases, grateful for any pointers... (1) spark-shell + with jars that are only required by the driver (1a) I added spark.driver.extraClassPath /mypath/to.jar to my spark-defaults.conf I launched spark-shell with: ./spark-shell Here I see on the WebUI that spark.driver.extraClassPath has been set, but I am NOT able to access any methods in the jar. (1b) I removed spark.driver.extraClassPath from my spark-default.conf I launched spark-shell with .//spark-shell --driver.class.path /mypath/to.jar Again I see that the WebUI spark.driver.extraClassPath has been set. But this time I am able to access the methods in the jar. Q: Is spark-shell not considered the driver in this case? why does using --driver.class.path on the command line have a different behavior to setting it in spark-defaults.conf ? (2) Rather than adding each jar individually, is there a way to use wildcards? Previously with SPARK_CLASS_PATH I was able to use mypath/* but with --driver.class.path it seems to require individual files. tks Shay
Re: [scala-user] Why aggregate is inconsistent?
My other question is that Spark why not provide foldLeft: *def foldLeft[U](zeroValue: U)(op: (U, T) = T): U *but aggregate. the *def fold(zeroValue: T)(op: (T, T) = T): T* in spark is not deterministic too. On Thu, Oct 30, 2014 at 3:50 PM, Jason Zaugg jza...@gmail.com wrote: On Thu, Oct 30, 2014 at 5:39 PM, Xuefeng Wu ben...@gmail.com wrote: scala import scala.collection.GenSeq scala val seq = GenSeq(This, is, an, example) scala seq.aggregate(0)(_ + _, _ + _) res0: String = 0Thisisanexample scala seq.par.aggregate(0)(_ + _, _ + _) res1: String = 0This0is0an0example /** Aggregates the results of applying an operator to subsequent elements. * * This is a more general form of `fold` and `reduce`. It has similar * semantics, but does not require the result to be a supertype of the * element type. It traverses the elements in different partitions * sequentially, using `seqop` to update the result, and then applies * `combop` to results from different partitions. The implementation of * this operation may operate on an arbitrary number of collection * partitions, so `combop` may be invoked an arbitrary number of times. ... * @tparam Bthe type of accumulated results * @param z the initial value for the accumulated result of the partition - this * will typically be the neutral element for the `seqop` operator (e.g. * `Nil` for list concatenation or `0` for summation) and may be evaluated * more than once * @param seqop an operator used to accumulate results within a partition * @param combopan associative operator used to combine results from different partitions */ def aggregate[B](z: =B)(seqop: (B, A) = B, combop: (B, B) = B): B The contract of aggregate allows for this, if you need deterministic results you need to choose z that is a the nuetral element for combop. In your example, this would be the empty string. -jason -- ~Yours, Xuefeng Wu/吴雪峰 敬上
Re: SparkSQL + Hive Cached Table Exception
Hmmm, this looks like a bug. Can you file a JIRA? On Thu, Oct 30, 2014 at 4:04 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, While testing SparkSQL on top of our Hive metastore, I am getting some java.lang.ArrayIndexOutOfBoundsException while reusing a cached RDD table. Basically, I have a table mtable partitioned by some date field in hive and below is the scala code I am running in spark-shell: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); val rdd_mtable = sqlContext.sql(select * from mtable where date=20141028); rdd_mtable.registerTempTable(rdd_mtable); sqlContext.cacheTable(rdd_mtable); sqlContext.sql(select count(*) from rdd_mtable).collect(); -- OK sqlContext.sql(select count(*) from rdd_mtable).collect(); -- Exception So the first collect() is working just fine, however running the second collect() which I expect use the cached RDD throws some java.lang.ArrayIndexOutOfBoundsException, see the backtrace at the end of this email. It seems the columnar traversal is crashing for some reasons. FYI, I am using spark ToT (234de9232bcfa212317a8073c4a82c3863b36b14). java.lang.ArrayIndexOutOfBoundsException: 14 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108) at org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:89) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.columnar.InMemoryRelation.computeSizeInBytes(InMemoryColumnarTableScan.scala:66) at org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation.statisticsToBePropagated(InMemoryColumnarTableScan.scala:73) at org.apache.spark.sql.columnar.InMemoryRelation.withOutput(InMemoryColumnarTableScan.scala:147) at org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122) at org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:122) at org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:119) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.CacheManager$class.useCachedData(CacheManager.scala:119) at org.apache.spark.sql.SQLContext.useCachedData(SQLContext.scala:49) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:376) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:376) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:377) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:377) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:382) at
Re: Use RDD like a Iterator
RDD.toLocalIterator return the partition one by one but with all elements in the partition, which is not lazy calculated. Given the design of spark, it is very hard to maintain the state of iterator across runJob. def toLocalIterator: Iterator[T] = { def collectPartition(p: Int): Array[T] = { sc.runJob(this, (iter: Iterator[T]) = iter.toArray, Seq(p), allowLocal = false).head } (0 until partitions.length).iterator.flatMap(i = collectPartition(i)) } Thanks. Zhan Zhang On Oct 29, 2014, at 3:43 AM, Yanbo Liang yanboha...@gmail.com wrote: RDD.toLocalIterator() is the suitable solution. But I doubt whether it conform with the design principle of spark and RDD. All RDD transform is lazily computed until it end with some actions. 2014-10-29 15:28 GMT+08:00 Sean Owen so...@cloudera.com: Call RDD.toLocalIterator()? https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html On Wed, Oct 29, 2014 at 4:15 AM, Dai, Kevin yun...@ebay.com wrote: Hi, ALL I have a RDD[T], can I use it like a iterator. That means I can compute every element of this RDD lazily. Best Regards, Kevin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
SizeEstimator in Spark 1.1 and high load/object allocation when reading in data
Hi All, We have recently moved to Spark 1.1 from 0.9 for an application handling a fair number of very large datasets partitioned across multiple nodes. About half of each of these large datasets is stored in off heap byte arrays and about half in the standard Java heap. While these datasets are being loaded from our custom HDFS 2.3 RDD and before we are using even a fraction of the available Java Heap and the native off heap memory the loading slows to an absolute crawl. It appears clear from our profiling of the Spark Executor that in the Spark SizeEstimator an extremely high cpu load is being demanded along with a fast and furious allocation of Object[] instances. We do not believe we were seeing this sort of behavior in 0.9 and we have noticed rather significant changes in this part of the BlockManager code going from 0.9 to 1.1 and beyond. A GC run gets rid of all of the Object[] instances. Before we start spending large amounts of time either switching back to 0.9 or further tracing to the root cause of this, I was wondering if anyone out there had enough experience with that part of the code (or had run into the same problem) and could help us understand what sort of root causes might lay behind this strange behavior and even better what we could do to resolve them. Any help would be very much appreciated. cheers, Erik
Spark Streaming Issue not running 24/7
The problem is simple I want a to stream data 24/7 do some calculations and save the result in a csv/json file so that i could use it for visualization using dc.js/d3.js I opted for spark streaming on yarn cluster with kafka tried running it for 24/7 Using GroupByKey and updateStateByKey to have the computed historical data Initially streaming is working fine.. but after few hours i am getting 14/10/30 23:48:49 ERROR TaskSetManager: Task 2485162.0:3 failed 4 times; aborting job 14/10/30 23:48:50 ERROR JobScheduler: Error running job streaming job 141469227 ms.1 org.apache.spark.SparkException: Job aborted due to stage failure: Task 2485162.0:3 failed 4 times, most recent failure: Exception failure in TID 478548 on host 172.18.152.36: java.lang.ArrayIndexOutOfBoundsException Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) I guess its due to the GroupByKey and updateStateByKey, i tried GroupByKey(100) increased partition Also when data is in state say for eg 10th sec 1000 records are in state, 100th sec 20,000 records are in state out of which 19,000 records are not updated how to remove them from state.. UpdateStateByKey(none) how and when to do that, how we will know when to send none, and save the data before setting none? I also tried not sending any data a few hours but check the web ui i am getting task FINISHED app-20141030203943- NewApp 0 6.0 GB 2014/10/30 20:39:43 hadoop FINISHED 4.2 h This makes me confused.. In the code it says awaitTermination, but did not terminate the task.. will streaming stop if no data is received for a significant amount of time? Is there any doc available on how much time spark will run when no data is streamed? Any Doc available -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Issue-not-running-24-7-tp17791.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Doing RDD.count in parallel , at at least parallelize it as much as possible?
Hey Sameer, Wouldnt local[x] run count parallelly in each of the x threads? Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Oct 30, 2014 at 11:42 PM, Sameer Farooqui same...@databricks.com wrote: Hi Shahab, Are you running Spark in Local, Standalone, YARN or Mesos mode? If you're running in Standalone/YARN/Mesos, then the .count() action is indeed automatically parallelized across multiple Executors. When you run a .count() on an RDD, it is actually distributing tasks to different executors to each do a local count on a local partition and then all the tasks send their sub-counts back to the driver for final aggregation. This sounds like the kind of behavior you're looking for. However, in Local mode, everything runs in a single JVM (the driver + executor), so there's no parallelization across Executors. On Thu, Oct 30, 2014 at 10:25 AM, shahab shahab.mok...@gmail.com wrote: Hi, I noticed that the count (of RDD) in many of my queries is the most time consuming one as it runs in the driver process rather then done by parallel worker nodes, Is there any way to perform count in parallel , at at least parallelize it as much as possible? best, /Shahab
Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
Hi Preshant, Chester, Mohammed, I switched to Spark's Akka and now it works well. Thanks for the help! (Need to exclude Akka from Spray dependencies, or specify it as provided) Jianshi On Thu, Oct 30, 2014 at 3:17 AM, Mohammed Guller moham...@glassbeam.com wrote: I am not sure about that. Can you try a Spray version built with 2.2.x along with Spark 1.1 and include the Akka dependencies in your project’s sbt file? Mohammed *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Tuesday, October 28, 2014 8:58 PM *To:* Mohammed Guller *Cc:* user *Subject:* Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4, right? Jianshi On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com wrote: Try a version built with Akka 2.2.x Mohammed *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Tuesday, October 28, 2014 3:03 AM *To:* user *Subject:* Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: use additional ebs volumes for hsdf storage with spark-ec2
Thanks Akhil. I tried changing /root/ephemeral-hdfs/conf/hdfs-site.xml to have property namedfs.data.dir/name value/vol,/vol0,/vol1,/vol2,/vol3,/vol4,/vol5,/vol6,/vol7,/mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data/value /property and then running /root/ephemeral-hdfs/bin/stop-all.sh copy-dir /root/ephemeral-hdfs/conf/ /root/ephemeral-hdfs/bin/start-all.sh to try and make sure the new configurations taks on the entire cluster. I then ran spark to write to the local hdfs. It failed after filling the original /mnt* mounted drives,, without writing anything to the attached /vol* drives. I also tried completely stopping and restarting the cluster, but restarting resets /root/ephemeral-hdfs/conf/hdfs-site.xml to the default state. thanks Daniel On Thu, Oct 30, 2014 at 1:56 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I think you can check in the core-site.xml or hdfs-site.xml file under /root/ephemeral-hdfs/etc/hadoop/ where you can see data node dir property which will be a comma separated list of volumes. Thanks Best Regards On Thu, Oct 30, 2014 at 5:21 AM, Daniel Mahler dmah...@gmail.com wrote: I started my ec2 spark cluster with ./ec2/spark---ebs-vol-{size=100,num=8,type=gp2} -t m3.xlarge -s 10 launch mycluster I see the additional volumes attached but they do not seem to be set up for hdfs. How can I check if they are being utilized on all workers, and how can I get all workers to utilize the extra volumes for hdfs. I do not have experience using hadoop directly, only through spark. thanks Daniel
Re: NonSerializable Exception in foreachRDD
Harold, just mentioning it in case you run into it: If you are in a separate thread, there are apparently stricter limits to what you can and cannot serialize: val someVal future { // be very careful with defining RDD operations using someVal here val myLocalVal = someVal // use myLocalVal instead } On Thu, Oct 30, 2014 at 4:55 PM, Harold Nguyen har...@nexgate.com wrote: In Spark Streaming, when I do foreachRDD on my DStreams, I get a NonSerializable exception when I try to do something like: DStream.foreachRDD( rdd = { var sc.parallelize(Seq((test, blah))) }) Is this the code you are actually using? var sc.parallelize(...) doesn't really look like valid Scala to me. Tobias
Re: Using a Database to persist and load data from
AFAIK, you can read data from DB with JdbcRDD, but there is no interface for writing to DB. JdbcRDD has some restrict such as SQL must with where clause. For writing to DB, you can use mapPartitions or foreachPartition to implement. You can refer this example: http://stackoverflow.com/questions/24916852/how-can-i-connect-to-a-postgresql-database-into-apache-spark-using-scala 2014-10-30 23:01 GMT+08:00 Asaf Lahav asaf.la...@gmail.com: Hi Ladies and Gents, I would like to know what are the options I have if I would like to leverage Spark code I already have written to use a DB (Vertica) as its store/datasource. The data is of tabular nature. So any relational DB can essentially be used. Do I need to develop a context? If yes, how? where can I get a good example? Thank you, Asaf