Re: How to union RDD and remove duplicated keys

2015-02-13 Thread Boromir Widas
reducebyKey should work, but you need to define the ordering by using some
sort of index.

On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:



 I have multiple RDD[(String, String)] that store (docId, docText) pairs,
 e.g.



 rdd1:   (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text
 3”)

 rdd2:   (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”)

 rdd3:   (“id1”, “Long text 1 B”)



 Then, I want to merge all RDDs. If there is duplicated docids, later RDD
 should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1
 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final
 merged rdd should be



 rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”,
 “Long text 3”)



 Note that I have many such RDDs and each rdd have lots of elements. How
 can I do it efficiently?





 Ningjun





Re: How to union RDD and remove duplicated keys

2015-02-13 Thread Boromir Widas
I have not run the following, but will be on these lines -

rdd.zipWithIndex().map(x = (x._1._1, (x._1._2, x._2))).reduceByKey((a, b)
= { if(a._2  b._2) a else b }).map(x = (x._1, x._2._1))

On Fri, Feb 13, 2015 at 3:27 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Do you mean first union all RDDs together and then do a reduceByKey()?
 Suppose my unioned RDD is



 rdd :  (“id1”, “text 1”),  (“id1”, “text 2”), (“id1”, “text 3”)

 How can I use reduceByKey to return  (“id1”, “text 3”) ? I mean to take
 the last one entry for the same key

 Code snippet is appreciated because I am new to Spark.

 Ningjun



 *From:* Boromir Widas [mailto:vcsub...@gmail.com]
 *Sent:* Friday, February 13, 2015 1:28 PM
 *To:* Wang, Ningjun (LNG-NPV)
 *Cc:* user@spark.apache.org
 *Subject:* Re: How to union RDD and remove duplicated keys



 reducebyKey should work, but you need to define the ordering by using some
 sort of index.



 On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:



 I have multiple RDD[(String, String)] that store (docId, docText) pairs,
 e.g.



 rdd1:   (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text
 3”)

 rdd2:   (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”)

 rdd3:   (“id1”, “Long text 1 B”)



 Then, I want to merge all RDDs. If there is duplicated docids, later RDD
 should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1
 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final
 merged rdd should be



 rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”,
 “Long text 3”)



 Note that I have many such RDDs and each rdd have lots of elements. How
 can I do it efficiently?





 Ningjun







Re: Building Spark behind a proxy

2015-01-29 Thread Boromir Widas
At least a part of it is due to connection refused, can you check if curl
can reach the URL with proxies -
[FATAL] Non-resolvable parent POM: Could not transfer artifact
org.apache:apache:pom:14 from/to central (
http://repo.maven.apache.org/maven2): Error transferring file: Connection
refused from
http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom

On Thu, Jan 29, 2015 at 11:35 AM, Soumya Simanta soumya.sima...@gmail.com
wrote:



 On Thu, Jan 29, 2015 at 11:05 AM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 Does  the error change on build with and without the built options?

 What do you mean by build options? I'm just doing ./sbt/sbt assembly from
 $SPARK_HOME


 Did you try using maven? and doing the proxy settings there.


  No I've not tried maven yet. However, I did set proxy settings inside my
 .m2/setting.xml, but it didn't make any difference.





Re: Apache Spark standalone mode: number of cores

2015-01-23 Thread Boromir Widas
The local mode still parallelizes calculations and it is useful for
debugging as it goes through the steps of serialization/deserialization as
a cluster would.

On Fri, Jan 23, 2015 at 5:44 PM, olegshirokikh o...@solver.com wrote:

 I'm trying to understand the basics of Spark internals and Spark
 documentation for submitting applications in local mode says for
 spark-submit --master setting:

 local[K] Run Spark locally with K worker threads (ideally, set this to the
 number of cores on your machine).

 local[*] Run Spark locally with as many worker threads as logical cores on
 your machine.
 Since all the data is stored on a single local machine, it does not benefit
 from distributed operations on RDDs.

 How does it benefit and what internally is going on when Spark utilizes
 several logical cores?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-standalone-mode-number-of-cores-tp21342.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




GroupBy multiple attributes

2015-01-23 Thread Boromir Widas
Hello,

I am trying to do a groupBy on 5 attributes to get results in a form like a
pivot table in microsoft excel. The keys are the attribute tuples and
values are double arrays(maybe very large). Based on the code below, I am
getting back correct results, but would like to optimize it further(I
played around with numPartitions).

The two issues I see are -
1. flatMap is needed to expand the key tuples, but this also duplicates the
values, and as the values are large this increases the shuffle input size
for reduceByKey - is there a way to avoid the duplication?
2. reduceByKey is adding two arrays element wise, and creates a new array
for every addition, is there a way to reduce by not creating a new array
everytime(Similar to what accumulators do)?

I am pasting a sample code, query plan and output below.

Thanks.

val attributeToFloatArrayRDD = sc.parallelize(Array(
  (A-1, B-2, C-1, D-1, E-1)   - (0.0 to 1000.0 by 0.25).toArray
  , (A-2, B-1, C-1, D-2, E-1) - (5.0 to 1005.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray
  , (A-3, B-3, C-1, D-1, E-2) - (0.0 to 1000.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray
  , (A-4, B-3, C-1, D-1, E-1) - (8.0 to 1008.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray
))


val groupToVaRRDD = attributeToFloatArrayRDD
  .flatMap(x = x._1 match {
case (t1, t2, t3, t4, t5) = Array((t1+_top), (t1, t2), (t1,
t2, t3), (t1, t2, t3, t4), (t1, t2, t3, t4, t5)).map(y = (y, x._2))
  })
  .reduceByKey((x, y) = {
require(x.size == y.size)
(x,y).zipped.map(_ + _)
  })
  .map(x = {
(x._1, x._2.sorted.take(x._2.size/20).last)
  })


 Query Plan

(16) MappedRDD[12] at map at GroupByTest.scala:81 []

 | ShuffledRDD[11] at reduceByKey at GroupByTest.scala:76 []

 +-(16) FlatMappedRDD[10] at flatMap at GroupByTest.scala:68 []

| ParallelCollectionRDD[9] at parallelize at GroupByTest.scala:56 []

 Output


GroupBy VaR
(A-2,B-1)   54.75
(A-2,B-1,C-1,D-2)   54.75
(A-1,B-1)   149.25
(A-1,B-1,C-1,D-1,E-1)   149.25
(A-3,B-3,C-1)   49.75
(A-3,B-3)   49.75
(A-4,B-3,C-1,D-1,E-1)   57.75
(A-2,B-1,C-1)   54.75
(A-1,B-2,C-1,D-1,E-1)   49.75
(A-1,B-1,C-1,D-1)   149.25
(A-3,B-3,C-1,D-1,E-2)   49.75
(A-1,B-2,C-1)   49.75
(A-3,B-3,C-1,D-1)   49.75
(A-4,B-3)   57.75
(A-1,B-1,C-1)   149.25
A-1_top 199.0
(A-4,B-3,C-1,D-1)   57.75
A-2_top 54.75
(A-1,B-2)   49.75
(A-4,B-3,C-1)   57.75
A-3_top 49.75
A-4_top 57.75
(A-2,B-1,C-1,D-2,E-1)   54.75
(A-1,B-2,C-1,D-1)   49.75


Re: Launching Spark app in client mode for standalone cluster

2015-01-06 Thread Boromir Widas
Thanks for the pointers. The issue was due to route caching by Spray, which
would always return the same value. Other than that the program is working
fine.

On Mon, Jan 5, 2015 at 12:44 AM, Simon Chan simonc...@gmail.com wrote:

 Boromir,

 You may like to take a look at how we make Spray and Spark working
 together at the PredictionIO project:
 https://github.com/PredictionIO/PredictionIO



 Simon

 On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work ches...@alpinenow.com
 wrote:

 Just a guess here, may not be correct.

   Spray needs to start akka actor system; spark context also creates an
 akka actor system, is it possible there are some conflict ?



 Sent from my iPad

 On Jan 4, 2015, at 7:42 PM, Boromir Widas vcsub...@gmail.com wrote:

 Hello,

 I am trying to launch a Spark app(client mode for standalone cluster)
 from a Spray server, using the following code.

 When I run it as

 $ java -cp class paths SprayServer

 the SimpleApp.getA() call from  SprayService returns -1(which means it
 sees the logData RDD as null for HTTP requests), but the statements from
 within SimpleAppLoader.run() get correct values from SimpleApp.getA().

 Any idea why the HTTP requests do not see the cached RDD? I have been
 trying to debug this for some time but not getting anywhere - any pointers
 will be greatly appreciated.

 Thanks.

  BEGIN SPRAY SERVER

 import akka.actor.{ActorSystem, Props}



 import akka.io.IO



 import spray.can.Http







 import akka.actor._



 import spray.routing.HttpService



 import scala.concurrent.ops







 object SprayServer {



   def main(args: Array[String]) {



 // we need an ActorSystem to host our service



 implicit val system = ActorSystem()







 //create our service actor



 val service = system.actorOf(Props[SprayServiceActor],
 test-service)







 //bind our actor to an HTTP port



 IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085)







 ops.spawn {



   *SimpleAppLoader.run() *



 }



   }



 }







 class SprayServiceActor extends SprayService with Actor {



   // the HttpService trait (which SprayService will extend) defines



   // only one abstract member, which connects the services environment



   // to the enclosing actor or test.



   def actorRefFactory = context







   def receive = runRoute(rootRoute)



 }







 trait SprayService extends HttpService {







   def default = path() {



 println(handling default route)



 val numAs = *SimpleApp.getA()   // DOES NOT WORK   *




 get { complete(snum A: $numAs) }



   }







   def pingRoute = path(ping) {



 get { complete(pong!) }



   }







   def pongRoute = path(pong) {



 get { complete(pong!?) }



   }







   def rootRoute = pingRoute ~ pongRoute ~ default



 }







 // END SPRAY, BEGIN SPARK







 import org.apache.spark.SparkContext



 import org.apache.spark.SparkContext._



 import org.apache.spark.SparkConf



 import org.apache.spark.deploy.SparkSubmit



 import org.apache.spark.rdd.RDD







 object SimpleApp {



   var resultString: String = Data not assigned



   var logData: RDD[String] = null



   def main(args: Array[String]) {



 val logFile = /home/ovik/src/spark/README.md // Should be some file
 on your system


 val conf = new SparkConf().setAppName(Simple Application)



 val sc = new SparkContext(conf)



 logData = sc.textFile(logFile, 2).cache()



 val numAs = logData.filter(line = line.contains(a)).count()



 val numBs = logData.filter(line = line.contains(b)).count()



 resultString = Lines with a: %s, Lines with b: %s.format(numAs,
 numBs)


 println(resultString)



   }



   def getA(): Int = {



 println(resultString)



 if(null == logData) {



   println( logData is null!)



   -1



 } else {



   val numAs = logData.filter(line =
 line.contains(a)).count().toInt



   println(s numAs: $numAs)



   numAs



 }



   }



 }







 object SimpleAppLoader {



   def main(args: Array[String]) {



 run()



   }







   def run() {







 val clArgs = Array(



   --deploy-mode, client



   , --total-executor-cores, 2



   , --class, SimpleApp



   , --conf, spark.shuffle.spill=false



   , --conf, spark.master=spark://troika:7077



   , --conf, spark.driver.memory=128m



   , --conf, spark.executor.memory=128m



   , --conf, spark.eventLog.enabled=true



   , --conf, spark.eventLog.dir=/home/ovik/logs



   , SparkContext.jarOfClass(this.getClass).get)







 SparkSubmit.main(clArgs)







 val numAs = *SimpleApp.getA()// WORKS *








 println(snumAs is $numAs)



   }



 }










Re: Re: I think I am almost lost in the internals of Spark

2015-01-06 Thread Boromir Widas
I do not understand Chinese but the diagrams on that page are very helpful.

On Tue, Jan 6, 2015 at 9:46 PM, eric wong win19...@gmail.com wrote:

 A good beginning if you are chinese.

 https://github.com/JerryLead/SparkInternals/tree/master/markdown

 2015-01-07 10:13 GMT+08:00 bit1...@163.com bit1...@163.com:

 Thank you, Tobias. I will look into  the Spark paper. But it looks that
 the paper has been moved,
 http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf.
 A web page is returned (Resource not found)when I access it.

 --
 bit1...@163.com


 *From:* Tobias Pfeiffer t...@preferred.jp
 *Date:* 2015-01-07 09:24
 *To:* Todd bit1...@163.com
 *CC:* user user@spark.apache.org
 *Subject:* Re: I think I am almost lost in the internals of Spark
 Hi,

 On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote:

 I am a bit new to Spark, except that I tried simple things like word
 count, and the examples given in the spark sql programming guide.
 Now, I am investigating the internals of Spark, but I think I am almost
 lost, because I could not grasp a whole picture what spark does when it
 executes the word count.


 I recommend understanding what an RDD is and how it is processed, using

 http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds
 and probably also
   http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
   (once the server is back).
 Understanding how an RDD is processed is probably most helpful to
 understand the whole of Spark.

 Tobias




 --
 王海华



Launching Spark app in client mode for standalone cluster

2015-01-04 Thread Boromir Widas
Hello,

I am trying to launch a Spark app(client mode for standalone cluster) from
a Spray server, using the following code.

When I run it as

$ java -cp class paths SprayServer

the SimpleApp.getA() call from  SprayService returns -1(which means it sees
the logData RDD as null for HTTP requests), but the statements from within
SimpleAppLoader.run() get correct values from SimpleApp.getA().

Any idea why the HTTP requests do not see the cached RDD? I have been
trying to debug this for some time but not getting anywhere - any pointers
will be greatly appreciated.

Thanks.

 BEGIN SPRAY SERVER

import akka.actor.{ActorSystem, Props}



import akka.io.IO



import spray.can.Http







import akka.actor._



import spray.routing.HttpService



import scala.concurrent.ops







object SprayServer {



  def main(args: Array[String]) {



// we need an ActorSystem to host our service



implicit val system = ActorSystem()







//create our service actor



val service = system.actorOf(Props[SprayServiceActor], test-service)







//bind our actor to an HTTP port



IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085)







ops.spawn {



  *SimpleAppLoader.run() *



}



  }



}







class SprayServiceActor extends SprayService with Actor {



  // the HttpService trait (which SprayService will extend) defines



  // only one abstract member, which connects the services environment



  // to the enclosing actor or test.



  def actorRefFactory = context







  def receive = runRoute(rootRoute)



}







trait SprayService extends HttpService {







  def default = path() {



println(handling default route)



val numAs = *SimpleApp.getA()   // DOES NOT WORK   *




get { complete(snum A: $numAs) }



  }







  def pingRoute = path(ping) {



get { complete(pong!) }



  }







  def pongRoute = path(pong) {



get { complete(pong!?) }



  }







  def rootRoute = pingRoute ~ pongRoute ~ default



}







// END SPRAY, BEGIN SPARK







import org.apache.spark.SparkContext



import org.apache.spark.SparkContext._



import org.apache.spark.SparkConf



import org.apache.spark.deploy.SparkSubmit



import org.apache.spark.rdd.RDD







object SimpleApp {



  var resultString: String = Data not assigned



  var logData: RDD[String] = null



  def main(args: Array[String]) {



val logFile = /home/ovik/src/spark/README.md // Should be some file
on your system


val conf = new SparkConf().setAppName(Simple Application)



val sc = new SparkContext(conf)



logData = sc.textFile(logFile, 2).cache()



val numAs = logData.filter(line = line.contains(a)).count()



val numBs = logData.filter(line = line.contains(b)).count()



resultString = Lines with a: %s, Lines with b: %s.format(numAs,
numBs)


println(resultString)



  }



  def getA(): Int = {



println(resultString)



if(null == logData) {



  println( logData is null!)



  -1



} else {



  val numAs = logData.filter(line = line.contains(a)).count().toInt



  println(s numAs: $numAs)



  numAs



}



  }



}







object SimpleAppLoader {



  def main(args: Array[String]) {



run()



  }







  def run() {







val clArgs = Array(



  --deploy-mode, client



  , --total-executor-cores, 2



  , --class, SimpleApp



  , --conf, spark.shuffle.spill=false



  , --conf, spark.master=spark://troika:7077



  , --conf, spark.driver.memory=128m



  , --conf, spark.executor.memory=128m



  , --conf, spark.eventLog.enabled=true



  , --conf, spark.eventLog.dir=/home/ovik/logs



  , SparkContext.jarOfClass(this.getClass).get)







SparkSubmit.main(clArgs)







val numAs = *SimpleApp.getA()// WORKS *







println(snumAs is $numAs)



  }



}


Re: building spark1.2 meet error

2015-01-03 Thread Boromir Widas
it should be under
 ls assembly/target/scala-2.10/*

On Sat, Jan 3, 2015 at 10:11 PM, j_soft zsof...@gmail.com wrote:


- thanks, it is success builded
- .but where is builded zip file? I not find finished .zip or .tar.gz
package


 2014-12-31 19:22 GMT+08:00 xhudik [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=20958i=0:

 Hi J_soft,

 for me it is working, I didn't put -Dscala-2.10 -X parameters. I got only
 one warning, since I don't have hadoop 2.5 it didn't activate this profile:







































 *larix@kovral:~/sources/spark-1.2.0 mvn -Pyarn -Phadoop-2.5
 -Dhadoop.version=2.5.0 -DskipTests clean package   Found 0 infos
 Finished in 3 ms [INFO]
 
 [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM
 ... SUCCESS [ 14.177 s] [INFO] Spark Project
 Networking ... SUCCESS [ 14.670 s] [INFO] Spark
 Project Shuffle Streaming Service  SUCCESS [  9.030 s] [INFO]
 Spark Project Core . SUCCESS [04:42 min]
 [INFO] Spark Project Bagel  SUCCESS [
 26.184 s] [INFO] Spark Project GraphX ...
 SUCCESS [01:07 min] [INFO] Spark Project Streaming
  SUCCESS [01:35 min] [INFO] Spark Project
 Catalyst . SUCCESS [01:48 min] [INFO] Spark
 Project SQL .. SUCCESS [01:55 min] [INFO]
 Spark Project ML Library ... SUCCESS [02:17 min]
 [INFO] Spark Project Tools  SUCCESS [
 15.527 s] [INFO] Spark Project Hive .
 SUCCESS [01:43 min] [INFO] Spark Project REPL
 . SUCCESS [ 45.154 s] [INFO] Spark Project
 YARN Parent POM .. SUCCESS [  3.885 s] [INFO] Spark
 Project YARN Stable API .. SUCCESS [01:00 min] [INFO]
 Spark Project Assembly . SUCCESS [ 50.812 s]
 [INFO] Spark Project External Twitter . SUCCESS [
 21.401 s] [INFO] Spark Project External Flume Sink ..
 SUCCESS [ 25.207 s] [INFO] Spark Project External Flume
 ... SUCCESS [ 34.734 s] [INFO] Spark Project External
 MQTT  SUCCESS [ 22.617 s] [INFO] Spark Project
 External ZeroMQ .. SUCCESS [ 22.444 s] [INFO] Spark
 Project External Kafka ... SUCCESS [ 33.566 s] [INFO]
 Spark Project Examples . SUCCESS [01:23 min]
 [INFO] Spark Project YARN Shuffle Service . SUCCESS [
  4.873 s] [INFO]
 
 [INFO] BUILD SUCCESS [INFO]
 
 [INFO] Total time: 23:20 min [INFO] Finished at: 2014-12-31T12:02:32+01:00
 [INFO] Final Memory: 76M/855M [INFO]
 
 [WARNING] The requested profile hadoop-2.5 could not be activated because
 it does not exist.*


 If it won't work for you. I'd try to delete all sources, download source
 code once more and try again ...

 good luck, Tomas


 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/building-spark1-2-meet-error-tp20853p20927.html
  To unsubscribe from building spark1.2 meet error, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
 View this message in context: Re: building spark1.2 meet error
 http://apache-spark-user-list.1001560.n3.nabble.com/building-spark1-2-meet-error-tp20853p20958.html

 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Spark profiler

2014-12-29 Thread Boromir Widas
It would be very helpful if there is any such tool, but the distributed
nature may be difficult to capture.

I had been trying to run a task where merging the accumulators was taking
an inordinately long time and was not reflecting in the standalone
cluster's web UI.
What I think will be useful is to publish metrics for different lifecycle
stages of a job to a system like Ganglia to zero in on bottlenecks. Perhaps
the user can define some of the metrics in the config.

I have been thinking of tinkering with the metrics publisher to add custom
metrics to get a bigger picture and time breakdown.

On Mon, Dec 29, 2014 at 10:24 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Hi,

 I want to find the time taken for replicating an rdd in spark cluster
 along with the computation time on the replicated rdd.

 Can someone please suggest a suitable spark profiler?

 Thank you



Re: Using more cores on machines

2014-12-22 Thread Boromir Widas
If you are looking to reduce network traffic then setting
spark.deploy.spreadOut
to false may help.

On Mon, Dec 22, 2014 at 11:44 AM, Ashic Mahtab as...@live.com wrote:

 Hi Josh,
 I'm not looking to change the 1:1 ratio.

 What I'm trying to do is get both cores on two machines working, rather
 than one core on all four machines. With --total-executor-cores 4, I have 1
 core per machine working for an app. I'm looking for something that'll let
 me use 2 cores per machine on 2 machines (so 4 cores in total) while not
 using the other two machines.

 Regards,
 Ashic.

  From: j...@soundcloud.com
  Date: Mon, 22 Dec 2014 17:36:26 +0100
  Subject: Re: Using more cores on machines
  To: as...@live.com
  CC: so...@cloudera.com; user@spark.apache.org

 
  AFAIK, `--num-executors` is not available for standalone clusters. In
  standalone mode, you must start new workers on your node as it is a
  1:1 ratio of workers to executors.
 
 
  On 22 December 2014 at 12:25, Ashic Mahtab as...@live.com wrote:
   Hi Sean,
   Thanks for the response.
  
   It seems --num-executors is ignored. Specifying --num-executors 2
   --executor-cores 2 is giving the app all 8 cores across 4 machines.
  
   -Ashic.
  
   From: so...@cloudera.com
   Date: Mon, 22 Dec 2014 10:57:31 +
   Subject: Re: Using more cores on machines
   To: as...@live.com
   CC: user@spark.apache.org
  
  
   I think you want:
  
   --num-executors 2 --executor-cores 2
  
   On Mon, Dec 22, 2014 at 10:39 AM, Ashic Mahtab as...@live.com
 wrote:
Hi,
Say we have 4 nodes with 2 cores each in stand alone mode. I'd like
 to
dedicate 4 cores to a streaming application. I can do this via spark
submit
by:
   
spark-submit  --total-executor-cores 4
   
However, this assigns one core per machine. I would like to use 2
 cores
on 2
machines instead, leaving the other two machines untouched. Is this
possible? Is there a downside to doing this? My thinking is that I
should be
able to reduce quite a bit of network traffic if all machines are
 not
involved.
   
   
Thanks,
Ashic.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: How to emit multiple keys for the same value?

2014-10-20 Thread Boromir Widas
flatMap should help, it returns a Seq for every input.

On Mon, Oct 20, 2014 at 12:31 PM, HARIPRIYA AYYALASOMAYAJULA 
aharipriy...@gmail.com wrote:

 Hello,

 I am facing a problem with implementing this - My mapper should emit
 multiple keys for the same value - for every input (k, v) it should emit
 (k, v), (k+1, v),(k+2,v) (k+n,v).
 In MapReduce, it was pretty straight forward - I used a for loop and
 performed Context write within that.

 This is the code I am using in the map function of MR job:

 for (int k = 0; k   8; k++)

  {

  key = date + :  + k;

  context.write(new Text(key), new DoubleWritable(value));

}

   I tried using the map function, but i'm stuck in understanding how I can
 use the loop within it.

 It would be great if someone can suggest me what could be possible ways to
 do it.

 Thanks in advance.

 --
 Regards,
 Haripriya Ayyalasomayajula




Re: object in an rdd: serializable?

2014-10-16 Thread Boromir Widas
make it a case class should work.

On Thu, Oct 16, 2014 at 8:30 PM, ll duy.huynh@gmail.com wrote:

 i got an exception complaining about serializable.  the sample code is
 below...

 class HelloWorld(val count: Int) {
   ...
   ...
 }

 object Test extends App {
   ...
   val data = sc.parallelize(List(new HelloWorld(1), new HelloWorld(2)))
   ...
 }

 what is the best way to serialize HelloWorld so that it can be contained in
 an RDD?

 thanks!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/object-in-an-rdd-serializable-tp16638.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Executor and BlockManager memory size

2014-10-10 Thread Boromir Widas
Hey Larry,

I have been trying to figure this out for standalone clusters as well.
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-Block-Manager-td12833.html
has an answer as to what block manager is for.

From the documentation, what I understood was if you assign X GB to each
executor, spark.storage.memoryFraction(default 0.6) * X is assigned to the
BlockManager and the rest for the JVM itself(?).
However, as you see, 26.8G is assigned to the BM, and assuming 0.6
memoryFraction, this means the executor sees ~44.7G of memory, I am not
sure what happens to the difference(5.3G).


On Thu, Oct 9, 2014 at 9:40 PM, Larry Xiao xia...@sjtu.edu.cn wrote:

 Hi all,

 I'm confused about Executor and BlockManager, why they have different
 memory.

 14/10/10 08:50:02 INFO AppClient$ClientActor: Executor added:
 app-20141010085001-/2 on worker-20141010004933-brick6-35657
 (brick6:35657) with 6 cores
 14/10/10 08:50:02 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20141010085001-/2 on hostPort brick6:35657 with 6 cores, 50.0 GB RAM

 14/10/10 08:50:07 INFO BlockManagerMasterActor: Registering block manager
 brick6:53296 with 26.8 GB RAM

 and on the WebUI,

 Executor IDAddressRDD Blocks  Memory UsedDisk UsedActive
 TasksFailed Tasks  Complete TasksTotal TasksTask TimeInput
   Shuffle ReadShuffle Write
 0brick3:3760700.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 1brick1:5949300.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 2brick6:5329600.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 3brick5:3854300.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 4brick2:4493700.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 5brick4:4679800.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 driverbrick0:5769200.0 B / 274.6 MB0.0 B  000
   00 ms0.0 B0.0 B0.0 B

 As I understand it, a worker consist of a daemon and an executor, and
 executor takes charge both execution and storage.
 So does it mean that 26.8 GB is saved for storage and the rest is for
 execution?

 Another question is that, throughout execution, it seems that the
 blockmanager is always almost free.

 14/10/05 14:33:44 INFO BlockManagerInfo: Added broadcast_21_piece0 in
 memory on brick2:57501 (size: 1669.0 B, free: 21.2 GB)

 I don't know what I'm missing here.

 Best regards,
 Larry

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Handling tree reduction algorithm with Spark in parallel

2014-10-03 Thread Boromir Widas
Thanks Matei, will check out the MLLib implementation.

On Wed, Oct 1, 2014 at 2:24 PM, Andy Twigg andy.tw...@gmail.com wrote:

 Yes, that makes sense. It's similar to the all reduce pattern in vw.


 On Wednesday, 1 October 2014, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Some of the MLlib algorithms do tree reduction in 1.1:
 http://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html.
 You can check out how they implemented it -- it is a series of reduce
 operations.

 Matei

 On Oct 1, 2014, at 11:02 AM, Boromir Widas vcsub...@gmail.com wrote:

 Thanks a lot Andy and Debashish, your suggestions were of great help.

 On Tue, Sep 30, 2014 at 6:44 PM, Debasish Das debasish.da...@gmail.com
 wrote:

 If the tree is too big build it on graphxbut it will need thorough
 analysis so that the partitions are well balanced...

 On Tue, Sep 30, 2014 at 2:45 PM, Andy Twigg andy.tw...@gmail.com
 wrote:

 Hi Boromir,

 Assuming the tree fits in memory, and what you want to do is
 parallelize the computation, the 'obvious' way is the following:

 * broadcast the tree T to each worker (ok since it fits in memory)
 * construct an RDD for the deepest level - each element in the RDD is
 (parent,data_at_node)
 * aggregate this by key (=parent) - RDD[parent,data]
 * map each element (p, data) - (parent(p), data) using T
 * repeat until you have an RDD of size = 1 (assuming T is connected)

 If T cannot fit in memory, or is very deep, then there are more exotic
 techniques, but hopefully this suffices.

 Andy


 --
 http://www.cs.ox.ac.uk/people/andy.twigg/

 On 30 September 2014 14:12, Boromir Widas vcsub...@gmail.com wrote:

 Hello Folks,

 I have been trying to implement a tree reduction algorithm recently in
 spark but could not find suitable parallel operations. Assuming I have a
 general tree like the following -



 I have to do the following -
 1) Do some computation at each leaf node to get an array of
 doubles.(This can be pre computed)
 2) For each non leaf node, starting with the root node compute the sum
 of these arrays for all child nodes. So to get the array for node B, I 
 need
 to get the array for E, which is the sum of G + H.

 // Start Snippet
 case class Node(name: String, children: Array[Node], values:
 Array[Double])

 // read in the tree here

 def getSumOfChildren(node: Node) : Array[Double] = {
 if(node.isLeafNode) {
   return node.values
}
 foreach(child in node.children) {
// can use an accumulator here
node.values = (node.values,
 getSumOfChildren(child)).zipped.map(_+_)
}
node.values
 }
 // End Snippet

 Any pointers to how this can be done in parallel to use all cores will
 be greatly appreciated.

 Thanks,
 Boromir.








Handling tree reduction algorithm with Spark in parallel

2014-09-30 Thread Boromir Widas
Hello Folks,

I have been trying to implement a tree reduction algorithm recently in
spark but could not find suitable parallel operations. Assuming I have a
general tree like the following -



I have to do the following -
1) Do some computation at each leaf node to get an array of doubles.(This
can be pre computed)
2) For each non leaf node, starting with the root node compute the sum of
these arrays for all child nodes. So to get the array for node B, I need to
get the array for E, which is the sum of G + H.

// Start Snippet
case class Node(name: String, children: Array[Node], values: Array[Double])

// read in the tree here

def getSumOfChildren(node: Node) : Array[Double] = {
if(node.isLeafNode) {
  return node.values
   }
foreach(child in node.children) {
   // can use an accumulator here
   node.values = (node.values, getSumOfChildren(child)).zipped.map(_+_)
   }
   node.values
}
// End Snippet

Any pointers to how this can be done in parallel to use all cores will be
greatly appreciated.

Thanks,
Boromir.


Re: Memory under-utilization

2014-09-16 Thread Boromir Widas
Perhaps your job does not use more than 9g. Even though the dashboard shows
64g the process only uses whats needed and grows to 64g max.

On Tue, Sep 16, 2014 at 5:40 PM, francisco ftanudj...@nextag.com wrote:

 Hi, I'm a Spark newbie.

 We had installed spark-1.0.2-bin-cdh4 on a 'super machine' with 256gb
 memory
 and 48 cores.

 Tried to allocate a task with 64gb memory but for whatever reason Spark is
 only using around 9gb max.

 Submitted spark job with the following command:
 
 /bin/spark-submit -class SimpleApp --master local[16] --executor-memory 64G
 /var/tmp/simple-project_2.10-1.0.jar /data/lucene/ns.gz
 

 When I run 'top' command I see only 9gb of memory is used by the spark
 process

 PID USER  PR  NI  VIRT  RES  SHR S %CPU %MEMTIME+  COMMAND
 3047005 fran  30  10 8785m 703m  18m S 112.9  0.3  48:19.63 java


 Any idea why this is happening? I've also tried to set the memory
 programatically using
  new SparkConf().set(spark.executor.memory, 64g)  but that also
 didn't
 do anything.

 Is there some limitation when running in 'local' mode?

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Memory-under-utilization-tp14396.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Memory under-utilization

2014-09-16 Thread Boromir Widas
I see, what does http://localhost:4040/executors/ show for memory usage?

I personally find it easier to work with a standalone cluster with a single
worker by using the sbin/start-master.sh and then connecting to the master.

On Tue, Sep 16, 2014 at 6:04 PM, francisco ftanudj...@nextag.com wrote:

 Thanks for the reply.

 I doubt that's the case though ...  the executor kept having to do a file
 dump because memory is full.

 ...
 14/09/16 15:00:18 WARN ExternalAppendOnlyMap: Spilling in-memory map of 67
 MB to disk (668 times so far)
 14/09/16 15:00:21 WARN ExternalAppendOnlyMap: Spilling in-memory map of 66
 MB to disk (669 times so far)
 14/09/16 15:00:24 WARN ExternalAppendOnlyMap: Spilling in-memory map of 70
 MB to disk (670 times so far)
 14/09/16 15:00:31 WARN ExternalAppendOnlyMap: Spilling in-memory map of 127
 MB to disk (671 times so far)
 14/09/16 15:00:43 WARN ExternalAppendOnlyMap: Spilling in-memory map of 67
 MB to disk (672 times so far)
 ...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Memory-under-utilization-tp14396p14399.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Compiler issues for multiple map on RDD

2014-09-15 Thread Boromir Widas
Hello Folks,

I am trying to chain a couple of map operations and it seems the second map
fails with a mismatch in arguments(event though the compiler prints them to
be the same.) I checked the function and variable types using :t and they
look ok to me.

Have you seen this earlier? I am posting the code, data and output below.

Any pointers will be greatly appreciated.

Thanks,
Boromir.

/// SCRIPT
val data = sc.textFile(data/testpv.csv)

case class KVV(key: String, valvec: Array[Double])

def mapToKV(line: String) : KVV = {
val splits = line.split(,)
val key = splits(0).trim
val valvec = splits.drop(1).map(x = x.trim.toDouble)

val kvv = KVV(key, valvec)
return kvv
}

val kvs = data.map(line = mapToKV(line))

def mapKVtoKVL(kvv: KVV) : KVV = {
return kvv
}
val tvar = kvs.map(x = mapKVtoKVL(x))

/// SAMPLE DATA in testpv.csv
x,1.1,1.2,1.3
y,2.1,2.2,2.3

/// REPL OUTPUT
scala val data = sc.textFile(data/testpv.csv)
14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with
curMem=0, maxMem=308713881
14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to
memory (estimated size 143.1 KB, free 294.3 MB)
data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
console:12

scala case class KVV(key: String, valvec: Array[Double])
defined class KVV

scala

scala def mapToKV(line: String) : KVV = {
 | val splits = line.split(,)
 | val key = splits(0).trim
 | val valvec = splits.drop(1).map(x = x.trim.toDouble)
 |
 | val kvv = KVV(key, valvec)
 | return kvv
 | }
mapToKV: (line: String)KVV

scala val kvs = data.map(line = mapToKV(line))
kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18

scala

scala def mapKVtoKVL(kvv: KVV) : KVV = {
 | return kvv
 | }
mapKVtoKVL: (kvv: KVV)KVV

scala val tvar = kvs.map(x = mapKVtoKVL(x))
console:22: error: type mismatch;
 found   : KVV
 required: KVV
   val tvar = kvs.map(x = mapKVtoKVL(x))
  ^