Re: How to union RDD and remove duplicated keys
reducebyKey should work, but you need to define the ordering by using some sort of index. On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have multiple RDD[(String, String)] that store (docId, docText) pairs, e.g. rdd1: (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text 3”) rdd2: (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”) rdd3: (“id1”, “Long text 1 B”) Then, I want to merge all RDDs. If there is duplicated docids, later RDD should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final merged rdd should be rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”, “Long text 3”) Note that I have many such RDDs and each rdd have lots of elements. How can I do it efficiently? Ningjun
Re: How to union RDD and remove duplicated keys
I have not run the following, but will be on these lines - rdd.zipWithIndex().map(x = (x._1._1, (x._1._2, x._2))).reduceByKey((a, b) = { if(a._2 b._2) a else b }).map(x = (x._1, x._2._1)) On Fri, Feb 13, 2015 at 3:27 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Do you mean first union all RDDs together and then do a reduceByKey()? Suppose my unioned RDD is rdd : (“id1”, “text 1”), (“id1”, “text 2”), (“id1”, “text 3”) How can I use reduceByKey to return (“id1”, “text 3”) ? I mean to take the last one entry for the same key Code snippet is appreciated because I am new to Spark. Ningjun *From:* Boromir Widas [mailto:vcsub...@gmail.com] *Sent:* Friday, February 13, 2015 1:28 PM *To:* Wang, Ningjun (LNG-NPV) *Cc:* user@spark.apache.org *Subject:* Re: How to union RDD and remove duplicated keys reducebyKey should work, but you need to define the ordering by using some sort of index. On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have multiple RDD[(String, String)] that store (docId, docText) pairs, e.g. rdd1: (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text 3”) rdd2: (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”) rdd3: (“id1”, “Long text 1 B”) Then, I want to merge all RDDs. If there is duplicated docids, later RDD should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final merged rdd should be rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”, “Long text 3”) Note that I have many such RDDs and each rdd have lots of elements. How can I do it efficiently? Ningjun
Re: Building Spark behind a proxy
At least a part of it is due to connection refused, can you check if curl can reach the URL with proxies - [FATAL] Non-resolvable parent POM: Could not transfer artifact org.apache:apache:pom:14 from/to central ( http://repo.maven.apache.org/maven2): Error transferring file: Connection refused from http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom On Thu, Jan 29, 2015 at 11:35 AM, Soumya Simanta soumya.sima...@gmail.com wrote: On Thu, Jan 29, 2015 at 11:05 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Does the error change on build with and without the built options? What do you mean by build options? I'm just doing ./sbt/sbt assembly from $SPARK_HOME Did you try using maven? and doing the proxy settings there. No I've not tried maven yet. However, I did set proxy settings inside my .m2/setting.xml, but it didn't make any difference.
Re: Apache Spark standalone mode: number of cores
The local mode still parallelizes calculations and it is useful for debugging as it goes through the steps of serialization/deserialization as a cluster would. On Fri, Jan 23, 2015 at 5:44 PM, olegshirokikh o...@solver.com wrote: I'm trying to understand the basics of Spark internals and Spark documentation for submitting applications in local mode says for spark-submit --master setting: local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). local[*] Run Spark locally with as many worker threads as logical cores on your machine. Since all the data is stored on a single local machine, it does not benefit from distributed operations on RDDs. How does it benefit and what internally is going on when Spark utilizes several logical cores? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-standalone-mode-number-of-cores-tp21342.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GroupBy multiple attributes
Hello, I am trying to do a groupBy on 5 attributes to get results in a form like a pivot table in microsoft excel. The keys are the attribute tuples and values are double arrays(maybe very large). Based on the code below, I am getting back correct results, but would like to optimize it further(I played around with numPartitions). The two issues I see are - 1. flatMap is needed to expand the key tuples, but this also duplicates the values, and as the values are large this increases the shuffle input size for reduceByKey - is there a way to avoid the duplication? 2. reduceByKey is adding two arrays element wise, and creates a new array for every addition, is there a way to reduce by not creating a new array everytime(Similar to what accumulators do)? I am pasting a sample code, query plan and output below. Thanks. val attributeToFloatArrayRDD = sc.parallelize(Array( (A-1, B-2, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray , (A-2, B-1, C-1, D-2, E-1) - (5.0 to 1005.0 by 0.25).toArray , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray , (A-3, B-3, C-1, D-1, E-2) - (0.0 to 1000.0 by 0.25).toArray , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray , (A-4, B-3, C-1, D-1, E-1) - (8.0 to 1008.0 by 0.25).toArray , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray )) val groupToVaRRDD = attributeToFloatArrayRDD .flatMap(x = x._1 match { case (t1, t2, t3, t4, t5) = Array((t1+_top), (t1, t2), (t1, t2, t3), (t1, t2, t3, t4), (t1, t2, t3, t4, t5)).map(y = (y, x._2)) }) .reduceByKey((x, y) = { require(x.size == y.size) (x,y).zipped.map(_ + _) }) .map(x = { (x._1, x._2.sorted.take(x._2.size/20).last) }) Query Plan (16) MappedRDD[12] at map at GroupByTest.scala:81 [] | ShuffledRDD[11] at reduceByKey at GroupByTest.scala:76 [] +-(16) FlatMappedRDD[10] at flatMap at GroupByTest.scala:68 [] | ParallelCollectionRDD[9] at parallelize at GroupByTest.scala:56 [] Output GroupBy VaR (A-2,B-1) 54.75 (A-2,B-1,C-1,D-2) 54.75 (A-1,B-1) 149.25 (A-1,B-1,C-1,D-1,E-1) 149.25 (A-3,B-3,C-1) 49.75 (A-3,B-3) 49.75 (A-4,B-3,C-1,D-1,E-1) 57.75 (A-2,B-1,C-1) 54.75 (A-1,B-2,C-1,D-1,E-1) 49.75 (A-1,B-1,C-1,D-1) 149.25 (A-3,B-3,C-1,D-1,E-2) 49.75 (A-1,B-2,C-1) 49.75 (A-3,B-3,C-1,D-1) 49.75 (A-4,B-3) 57.75 (A-1,B-1,C-1) 149.25 A-1_top 199.0 (A-4,B-3,C-1,D-1) 57.75 A-2_top 54.75 (A-1,B-2) 49.75 (A-4,B-3,C-1) 57.75 A-3_top 49.75 A-4_top 57.75 (A-2,B-1,C-1,D-2,E-1) 54.75 (A-1,B-2,C-1,D-1) 49.75
Re: Launching Spark app in client mode for standalone cluster
Thanks for the pointers. The issue was due to route caching by Spray, which would always return the same value. Other than that the program is working fine. On Mon, Jan 5, 2015 at 12:44 AM, Simon Chan simonc...@gmail.com wrote: Boromir, You may like to take a look at how we make Spray and Spark working together at the PredictionIO project: https://github.com/PredictionIO/PredictionIO Simon On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work ches...@alpinenow.com wrote: Just a guess here, may not be correct. Spray needs to start akka actor system; spark context also creates an akka actor system, is it possible there are some conflict ? Sent from my iPad On Jan 4, 2015, at 7:42 PM, Boromir Widas vcsub...@gmail.com wrote: Hello, I am trying to launch a Spark app(client mode for standalone cluster) from a Spray server, using the following code. When I run it as $ java -cp class paths SprayServer the SimpleApp.getA() call from SprayService returns -1(which means it sees the logData RDD as null for HTTP requests), but the statements from within SimpleAppLoader.run() get correct values from SimpleApp.getA(). Any idea why the HTTP requests do not see the cached RDD? I have been trying to debug this for some time but not getting anywhere - any pointers will be greatly appreciated. Thanks. BEGIN SPRAY SERVER import akka.actor.{ActorSystem, Props} import akka.io.IO import spray.can.Http import akka.actor._ import spray.routing.HttpService import scala.concurrent.ops object SprayServer { def main(args: Array[String]) { // we need an ActorSystem to host our service implicit val system = ActorSystem() //create our service actor val service = system.actorOf(Props[SprayServiceActor], test-service) //bind our actor to an HTTP port IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085) ops.spawn { *SimpleAppLoader.run() * } } } class SprayServiceActor extends SprayService with Actor { // the HttpService trait (which SprayService will extend) defines // only one abstract member, which connects the services environment // to the enclosing actor or test. def actorRefFactory = context def receive = runRoute(rootRoute) } trait SprayService extends HttpService { def default = path() { println(handling default route) val numAs = *SimpleApp.getA() // DOES NOT WORK * get { complete(snum A: $numAs) } } def pingRoute = path(ping) { get { complete(pong!) } } def pongRoute = path(pong) { get { complete(pong!?) } } def rootRoute = pingRoute ~ pongRoute ~ default } // END SPRAY, BEGIN SPARK import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmit import org.apache.spark.rdd.RDD object SimpleApp { var resultString: String = Data not assigned var logData: RDD[String] = null def main(args: Array[String]) { val logFile = /home/ovik/src/spark/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() resultString = Lines with a: %s, Lines with b: %s.format(numAs, numBs) println(resultString) } def getA(): Int = { println(resultString) if(null == logData) { println( logData is null!) -1 } else { val numAs = logData.filter(line = line.contains(a)).count().toInt println(s numAs: $numAs) numAs } } } object SimpleAppLoader { def main(args: Array[String]) { run() } def run() { val clArgs = Array( --deploy-mode, client , --total-executor-cores, 2 , --class, SimpleApp , --conf, spark.shuffle.spill=false , --conf, spark.master=spark://troika:7077 , --conf, spark.driver.memory=128m , --conf, spark.executor.memory=128m , --conf, spark.eventLog.enabled=true , --conf, spark.eventLog.dir=/home/ovik/logs , SparkContext.jarOfClass(this.getClass).get) SparkSubmit.main(clArgs) val numAs = *SimpleApp.getA()// WORKS * println(snumAs is $numAs) } }
Re: Re: I think I am almost lost in the internals of Spark
I do not understand Chinese but the diagrams on that page are very helpful. On Tue, Jan 6, 2015 at 9:46 PM, eric wong win19...@gmail.com wrote: A good beginning if you are chinese. https://github.com/JerryLead/SparkInternals/tree/master/markdown 2015-01-07 10:13 GMT+08:00 bit1...@163.com bit1...@163.com: Thank you, Tobias. I will look into the Spark paper. But it looks that the paper has been moved, http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf. A web page is returned (Resource not found)when I access it. -- bit1...@163.com *From:* Tobias Pfeiffer t...@preferred.jp *Date:* 2015-01-07 09:24 *To:* Todd bit1...@163.com *CC:* user user@spark.apache.org *Subject:* Re: I think I am almost lost in the internals of Spark Hi, On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote: I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. I recommend understanding what an RDD is and how it is processed, using http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds and probably also http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (once the server is back). Understanding how an RDD is processed is probably most helpful to understand the whole of Spark. Tobias -- 王海华
Launching Spark app in client mode for standalone cluster
Hello, I am trying to launch a Spark app(client mode for standalone cluster) from a Spray server, using the following code. When I run it as $ java -cp class paths SprayServer the SimpleApp.getA() call from SprayService returns -1(which means it sees the logData RDD as null for HTTP requests), but the statements from within SimpleAppLoader.run() get correct values from SimpleApp.getA(). Any idea why the HTTP requests do not see the cached RDD? I have been trying to debug this for some time but not getting anywhere - any pointers will be greatly appreciated. Thanks. BEGIN SPRAY SERVER import akka.actor.{ActorSystem, Props} import akka.io.IO import spray.can.Http import akka.actor._ import spray.routing.HttpService import scala.concurrent.ops object SprayServer { def main(args: Array[String]) { // we need an ActorSystem to host our service implicit val system = ActorSystem() //create our service actor val service = system.actorOf(Props[SprayServiceActor], test-service) //bind our actor to an HTTP port IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085) ops.spawn { *SimpleAppLoader.run() * } } } class SprayServiceActor extends SprayService with Actor { // the HttpService trait (which SprayService will extend) defines // only one abstract member, which connects the services environment // to the enclosing actor or test. def actorRefFactory = context def receive = runRoute(rootRoute) } trait SprayService extends HttpService { def default = path() { println(handling default route) val numAs = *SimpleApp.getA() // DOES NOT WORK * get { complete(snum A: $numAs) } } def pingRoute = path(ping) { get { complete(pong!) } } def pongRoute = path(pong) { get { complete(pong!?) } } def rootRoute = pingRoute ~ pongRoute ~ default } // END SPRAY, BEGIN SPARK import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmit import org.apache.spark.rdd.RDD object SimpleApp { var resultString: String = Data not assigned var logData: RDD[String] = null def main(args: Array[String]) { val logFile = /home/ovik/src/spark/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() resultString = Lines with a: %s, Lines with b: %s.format(numAs, numBs) println(resultString) } def getA(): Int = { println(resultString) if(null == logData) { println( logData is null!) -1 } else { val numAs = logData.filter(line = line.contains(a)).count().toInt println(s numAs: $numAs) numAs } } } object SimpleAppLoader { def main(args: Array[String]) { run() } def run() { val clArgs = Array( --deploy-mode, client , --total-executor-cores, 2 , --class, SimpleApp , --conf, spark.shuffle.spill=false , --conf, spark.master=spark://troika:7077 , --conf, spark.driver.memory=128m , --conf, spark.executor.memory=128m , --conf, spark.eventLog.enabled=true , --conf, spark.eventLog.dir=/home/ovik/logs , SparkContext.jarOfClass(this.getClass).get) SparkSubmit.main(clArgs) val numAs = *SimpleApp.getA()// WORKS * println(snumAs is $numAs) } }
Re: building spark1.2 meet error
it should be under ls assembly/target/scala-2.10/* On Sat, Jan 3, 2015 at 10:11 PM, j_soft zsof...@gmail.com wrote: - thanks, it is success builded - .but where is builded zip file? I not find finished .zip or .tar.gz package 2014-12-31 19:22 GMT+08:00 xhudik [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=20958i=0: Hi J_soft, for me it is working, I didn't put -Dscala-2.10 -X parameters. I got only one warning, since I don't have hadoop 2.5 it didn't activate this profile: *larix@kovral:~/sources/spark-1.2.0 mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.0 -DskipTests clean package Found 0 infos Finished in 3 ms [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 14.177 s] [INFO] Spark Project Networking ... SUCCESS [ 14.670 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 9.030 s] [INFO] Spark Project Core . SUCCESS [04:42 min] [INFO] Spark Project Bagel SUCCESS [ 26.184 s] [INFO] Spark Project GraphX ... SUCCESS [01:07 min] [INFO] Spark Project Streaming SUCCESS [01:35 min] [INFO] Spark Project Catalyst . SUCCESS [01:48 min] [INFO] Spark Project SQL .. SUCCESS [01:55 min] [INFO] Spark Project ML Library ... SUCCESS [02:17 min] [INFO] Spark Project Tools SUCCESS [ 15.527 s] [INFO] Spark Project Hive . SUCCESS [01:43 min] [INFO] Spark Project REPL . SUCCESS [ 45.154 s] [INFO] Spark Project YARN Parent POM .. SUCCESS [ 3.885 s] [INFO] Spark Project YARN Stable API .. SUCCESS [01:00 min] [INFO] Spark Project Assembly . SUCCESS [ 50.812 s] [INFO] Spark Project External Twitter . SUCCESS [ 21.401 s] [INFO] Spark Project External Flume Sink .. SUCCESS [ 25.207 s] [INFO] Spark Project External Flume ... SUCCESS [ 34.734 s] [INFO] Spark Project External MQTT SUCCESS [ 22.617 s] [INFO] Spark Project External ZeroMQ .. SUCCESS [ 22.444 s] [INFO] Spark Project External Kafka ... SUCCESS [ 33.566 s] [INFO] Spark Project Examples . SUCCESS [01:23 min] [INFO] Spark Project YARN Shuffle Service . SUCCESS [ 4.873 s] [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 23:20 min [INFO] Finished at: 2014-12-31T12:02:32+01:00 [INFO] Final Memory: 76M/855M [INFO] [WARNING] The requested profile hadoop-2.5 could not be activated because it does not exist.* If it won't work for you. I'd try to delete all sources, download source code once more and try again ... good luck, Tomas -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/building-spark1-2-meet-error-tp20853p20927.html To unsubscribe from building spark1.2 meet error, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: building spark1.2 meet error http://apache-spark-user-list.1001560.n3.nabble.com/building-spark1-2-meet-error-tp20853p20958.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark profiler
It would be very helpful if there is any such tool, but the distributed nature may be difficult to capture. I had been trying to run a task where merging the accumulators was taking an inordinately long time and was not reflecting in the standalone cluster's web UI. What I think will be useful is to publish metrics for different lifecycle stages of a job to a system like Ganglia to zero in on bottlenecks. Perhaps the user can define some of the metrics in the config. I have been thinking of tinkering with the metrics publisher to add custom metrics to get a bigger picture and time breakdown. On Mon, Dec 29, 2014 at 10:24 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I want to find the time taken for replicating an rdd in spark cluster along with the computation time on the replicated rdd. Can someone please suggest a suitable spark profiler? Thank you
Re: Using more cores on machines
If you are looking to reduce network traffic then setting spark.deploy.spreadOut to false may help. On Mon, Dec 22, 2014 at 11:44 AM, Ashic Mahtab as...@live.com wrote: Hi Josh, I'm not looking to change the 1:1 ratio. What I'm trying to do is get both cores on two machines working, rather than one core on all four machines. With --total-executor-cores 4, I have 1 core per machine working for an app. I'm looking for something that'll let me use 2 cores per machine on 2 machines (so 4 cores in total) while not using the other two machines. Regards, Ashic. From: j...@soundcloud.com Date: Mon, 22 Dec 2014 17:36:26 +0100 Subject: Re: Using more cores on machines To: as...@live.com CC: so...@cloudera.com; user@spark.apache.org AFAIK, `--num-executors` is not available for standalone clusters. In standalone mode, you must start new workers on your node as it is a 1:1 ratio of workers to executors. On 22 December 2014 at 12:25, Ashic Mahtab as...@live.com wrote: Hi Sean, Thanks for the response. It seems --num-executors is ignored. Specifying --num-executors 2 --executor-cores 2 is giving the app all 8 cores across 4 machines. -Ashic. From: so...@cloudera.com Date: Mon, 22 Dec 2014 10:57:31 + Subject: Re: Using more cores on machines To: as...@live.com CC: user@spark.apache.org I think you want: --num-executors 2 --executor-cores 2 On Mon, Dec 22, 2014 at 10:39 AM, Ashic Mahtab as...@live.com wrote: Hi, Say we have 4 nodes with 2 cores each in stand alone mode. I'd like to dedicate 4 cores to a streaming application. I can do this via spark submit by: spark-submit --total-executor-cores 4 However, this assigns one core per machine. I would like to use 2 cores on 2 machines instead, leaving the other two machines untouched. Is this possible? Is there a downside to doing this? My thinking is that I should be able to reduce quite a bit of network traffic if all machines are not involved. Thanks, Ashic. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to emit multiple keys for the same value?
flatMap should help, it returns a Seq for every input. On Mon, Oct 20, 2014 at 12:31 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Hello, I am facing a problem with implementing this - My mapper should emit multiple keys for the same value - for every input (k, v) it should emit (k, v), (k+1, v),(k+2,v) (k+n,v). In MapReduce, it was pretty straight forward - I used a for loop and performed Context write within that. This is the code I am using in the map function of MR job: for (int k = 0; k 8; k++) { key = date + : + k; context.write(new Text(key), new DoubleWritable(value)); } I tried using the map function, but i'm stuck in understanding how I can use the loop within it. It would be great if someone can suggest me what could be possible ways to do it. Thanks in advance. -- Regards, Haripriya Ayyalasomayajula
Re: object in an rdd: serializable?
make it a case class should work. On Thu, Oct 16, 2014 at 8:30 PM, ll duy.huynh@gmail.com wrote: i got an exception complaining about serializable. the sample code is below... class HelloWorld(val count: Int) { ... ... } object Test extends App { ... val data = sc.parallelize(List(new HelloWorld(1), new HelloWorld(2))) ... } what is the best way to serialize HelloWorld so that it can be contained in an RDD? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/object-in-an-rdd-serializable-tp16638.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor and BlockManager memory size
Hey Larry, I have been trying to figure this out for standalone clusters as well. http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-Block-Manager-td12833.html has an answer as to what block manager is for. From the documentation, what I understood was if you assign X GB to each executor, spark.storage.memoryFraction(default 0.6) * X is assigned to the BlockManager and the rest for the JVM itself(?). However, as you see, 26.8G is assigned to the BM, and assuming 0.6 memoryFraction, this means the executor sees ~44.7G of memory, I am not sure what happens to the difference(5.3G). On Thu, Oct 9, 2014 at 9:40 PM, Larry Xiao xia...@sjtu.edu.cn wrote: Hi all, I'm confused about Executor and BlockManager, why they have different memory. 14/10/10 08:50:02 INFO AppClient$ClientActor: Executor added: app-20141010085001-/2 on worker-20141010004933-brick6-35657 (brick6:35657) with 6 cores 14/10/10 08:50:02 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141010085001-/2 on hostPort brick6:35657 with 6 cores, 50.0 GB RAM 14/10/10 08:50:07 INFO BlockManagerMasterActor: Registering block manager brick6:53296 with 26.8 GB RAM and on the WebUI, Executor IDAddressRDD Blocks Memory UsedDisk UsedActive TasksFailed Tasks Complete TasksTotal TasksTask TimeInput Shuffle ReadShuffle Write 0brick3:3760700.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 1brick1:5949300.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 2brick6:5329600.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 3brick5:3854300.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 4brick2:4493700.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B 5brick4:4679800.0 B / 26.8 GB0.0 B60 060 ms0.0 B0.0 B0.0 B driverbrick0:5769200.0 B / 274.6 MB0.0 B 000 00 ms0.0 B0.0 B0.0 B As I understand it, a worker consist of a daemon and an executor, and executor takes charge both execution and storage. So does it mean that 26.8 GB is saved for storage and the rest is for execution? Another question is that, throughout execution, it seems that the blockmanager is always almost free. 14/10/05 14:33:44 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory on brick2:57501 (size: 1669.0 B, free: 21.2 GB) I don't know what I'm missing here. Best regards, Larry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Handling tree reduction algorithm with Spark in parallel
Thanks Matei, will check out the MLLib implementation. On Wed, Oct 1, 2014 at 2:24 PM, Andy Twigg andy.tw...@gmail.com wrote: Yes, that makes sense. It's similar to the all reduce pattern in vw. On Wednesday, 1 October 2014, Matei Zaharia matei.zaha...@gmail.com wrote: Some of the MLlib algorithms do tree reduction in 1.1: http://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html. You can check out how they implemented it -- it is a series of reduce operations. Matei On Oct 1, 2014, at 11:02 AM, Boromir Widas vcsub...@gmail.com wrote: Thanks a lot Andy and Debashish, your suggestions were of great help. On Tue, Sep 30, 2014 at 6:44 PM, Debasish Das debasish.da...@gmail.com wrote: If the tree is too big build it on graphxbut it will need thorough analysis so that the partitions are well balanced... On Tue, Sep 30, 2014 at 2:45 PM, Andy Twigg andy.tw...@gmail.com wrote: Hi Boromir, Assuming the tree fits in memory, and what you want to do is parallelize the computation, the 'obvious' way is the following: * broadcast the tree T to each worker (ok since it fits in memory) * construct an RDD for the deepest level - each element in the RDD is (parent,data_at_node) * aggregate this by key (=parent) - RDD[parent,data] * map each element (p, data) - (parent(p), data) using T * repeat until you have an RDD of size = 1 (assuming T is connected) If T cannot fit in memory, or is very deep, then there are more exotic techniques, but hopefully this suffices. Andy -- http://www.cs.ox.ac.uk/people/andy.twigg/ On 30 September 2014 14:12, Boromir Widas vcsub...@gmail.com wrote: Hello Folks, I have been trying to implement a tree reduction algorithm recently in spark but could not find suitable parallel operations. Assuming I have a general tree like the following - I have to do the following - 1) Do some computation at each leaf node to get an array of doubles.(This can be pre computed) 2) For each non leaf node, starting with the root node compute the sum of these arrays for all child nodes. So to get the array for node B, I need to get the array for E, which is the sum of G + H. // Start Snippet case class Node(name: String, children: Array[Node], values: Array[Double]) // read in the tree here def getSumOfChildren(node: Node) : Array[Double] = { if(node.isLeafNode) { return node.values } foreach(child in node.children) { // can use an accumulator here node.values = (node.values, getSumOfChildren(child)).zipped.map(_+_) } node.values } // End Snippet Any pointers to how this can be done in parallel to use all cores will be greatly appreciated. Thanks, Boromir.
Handling tree reduction algorithm with Spark in parallel
Hello Folks, I have been trying to implement a tree reduction algorithm recently in spark but could not find suitable parallel operations. Assuming I have a general tree like the following - I have to do the following - 1) Do some computation at each leaf node to get an array of doubles.(This can be pre computed) 2) For each non leaf node, starting with the root node compute the sum of these arrays for all child nodes. So to get the array for node B, I need to get the array for E, which is the sum of G + H. // Start Snippet case class Node(name: String, children: Array[Node], values: Array[Double]) // read in the tree here def getSumOfChildren(node: Node) : Array[Double] = { if(node.isLeafNode) { return node.values } foreach(child in node.children) { // can use an accumulator here node.values = (node.values, getSumOfChildren(child)).zipped.map(_+_) } node.values } // End Snippet Any pointers to how this can be done in parallel to use all cores will be greatly appreciated. Thanks, Boromir.
Re: Memory under-utilization
Perhaps your job does not use more than 9g. Even though the dashboard shows 64g the process only uses whats needed and grows to 64g max. On Tue, Sep 16, 2014 at 5:40 PM, francisco ftanudj...@nextag.com wrote: Hi, I'm a Spark newbie. We had installed spark-1.0.2-bin-cdh4 on a 'super machine' with 256gb memory and 48 cores. Tried to allocate a task with 64gb memory but for whatever reason Spark is only using around 9gb max. Submitted spark job with the following command: /bin/spark-submit -class SimpleApp --master local[16] --executor-memory 64G /var/tmp/simple-project_2.10-1.0.jar /data/lucene/ns.gz When I run 'top' command I see only 9gb of memory is used by the spark process PID USER PR NI VIRT RES SHR S %CPU %MEMTIME+ COMMAND 3047005 fran 30 10 8785m 703m 18m S 112.9 0.3 48:19.63 java Any idea why this is happening? I've also tried to set the memory programatically using new SparkConf().set(spark.executor.memory, 64g) but that also didn't do anything. Is there some limitation when running in 'local' mode? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-under-utilization-tp14396.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Memory under-utilization
I see, what does http://localhost:4040/executors/ show for memory usage? I personally find it easier to work with a standalone cluster with a single worker by using the sbin/start-master.sh and then connecting to the master. On Tue, Sep 16, 2014 at 6:04 PM, francisco ftanudj...@nextag.com wrote: Thanks for the reply. I doubt that's the case though ... the executor kept having to do a file dump because memory is full. ... 14/09/16 15:00:18 WARN ExternalAppendOnlyMap: Spilling in-memory map of 67 MB to disk (668 times so far) 14/09/16 15:00:21 WARN ExternalAppendOnlyMap: Spilling in-memory map of 66 MB to disk (669 times so far) 14/09/16 15:00:24 WARN ExternalAppendOnlyMap: Spilling in-memory map of 70 MB to disk (670 times so far) 14/09/16 15:00:31 WARN ExternalAppendOnlyMap: Spilling in-memory map of 127 MB to disk (671 times so far) 14/09/16 15:00:43 WARN ExternalAppendOnlyMap: Spilling in-memory map of 67 MB to disk (672 times so far) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-under-utilization-tp14396p14399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Compiler issues for multiple map on RDD
Hello Folks, I am trying to chain a couple of map operations and it seems the second map fails with a mismatch in arguments(event though the compiler prints them to be the same.) I checked the function and variable types using :t and they look ok to me. Have you seen this earlier? I am posting the code, data and output below. Any pointers will be greatly appreciated. Thanks, Boromir. /// SCRIPT val data = sc.textFile(data/testpv.csv) case class KVV(key: String, valvec: Array[Double]) def mapToKV(line: String) : KVV = { val splits = line.split(,) val key = splits(0).trim val valvec = splits.drop(1).map(x = x.trim.toDouble) val kvv = KVV(key, valvec) return kvv } val kvs = data.map(line = mapToKV(line)) def mapKVtoKVL(kvv: KVV) : KVV = { return kvv } val tvar = kvs.map(x = mapKVtoKVL(x)) /// SAMPLE DATA in testpv.csv x,1.1,1.2,1.3 y,2.1,2.2,2.3 /// REPL OUTPUT scala val data = sc.textFile(data/testpv.csv) 14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with curMem=0, maxMem=308713881 14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 143.1 KB, free 294.3 MB) data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala case class KVV(key: String, valvec: Array[Double]) defined class KVV scala scala def mapToKV(line: String) : KVV = { | val splits = line.split(,) | val key = splits(0).trim | val valvec = splits.drop(1).map(x = x.trim.toDouble) | | val kvv = KVV(key, valvec) | return kvv | } mapToKV: (line: String)KVV scala val kvs = data.map(line = mapToKV(line)) kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18 scala scala def mapKVtoKVL(kvv: KVV) : KVV = { | return kvv | } mapKVtoKVL: (kvv: KVV)KVV scala val tvar = kvs.map(x = mapKVtoKVL(x)) console:22: error: type mismatch; found : KVV required: KVV val tvar = kvs.map(x = mapKVtoKVL(x)) ^