Re: How to union RDD and remove duplicated keys

2015-02-13 Thread Boromir Widas
reducebyKey should work, but you need to define the ordering by using some
sort of index.

On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) wrote:

 I have multiple RDD[(String, String)] that store (docId, docText) pairs,

 rdd1:   (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text

 rdd2:   (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”)

 rdd3:   (“id1”, “Long text 1 B”)

 Then, I want to merge all RDDs. If there is duplicated docids, later RDD
 should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1
 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final
 merged rdd should be

 rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”,
 “Long text 3”)

 Note that I have many such RDDs and each rdd have lots of elements. How
 can I do it efficiently?


On Fri, Feb 13, 2015 at 3:27 PM, Wang, Ningjun (LNG-NPV) wrote:

 *From:* Boromir Widas []
Re: Building Spark behind a proxy

2015-01-29 Thread Boromir Widas
At least a part of it is due to connection refused, can you check if curl
can reach the URL with proxies -
[FATAL] Non-resolvable parent POM: Could not transfer artifact
org.apache:apache:pom:14 from/to central ( Error transferring file: Connection
refused from

On Thu, Jan 29, 2015 at 11:35 AM, Soumya Simanta

 On Thu, Jan 29, 2015 at 11:05 AM, Arush Kharbanda wrote:

 Does  the error change on build with and without the built options?

 What do you mean by build options? I'm just doing ./sbt/sbt assembly from

 Did you try using maven? and doing the proxy settings there.

  No I've not tried maven yet. However, I did set proxy settings inside my
 .m2/setting.xml, but it didn't make any difference.

Re: Apache Spark standalone mode: number of cores

2015-01-23 Thread Boromir Widas
The local mode still parallelizes calculations and it is useful for
debugging as it goes through the steps of serialization/deserialization as
a cluster would.

On Fri, Jan 23, 2015 at 5:44 PM, olegshirokikh wrote:

 I'm trying to understand the basics of Spark internals and Spark
 documentation for submitting applications in local mode says for
 spark-submit --master setting:

 local[K] Run Spark locally with K worker threads (ideally, set this to the
 number of cores on your machine).

 local[*] Run Spark locally with as many worker threads as logical cores on
 your machine.
 Since all the data is stored on a single local machine, it does not benefit
 from distributed operations on RDDs.

 How does it benefit and what internally is going on when Spark utilizes
 several logical cores?

GroupBy multiple attributes

2015-01-23 Thread Boromir Widas

I am trying to do a groupBy on 5 attributes to get results in a form like a
pivot table in microsoft excel. The keys are the attribute tuples and
values are double arrays(maybe very large). Based on the code below, I am
getting back correct results, but would like to optimize it further(I
played around with numPartitions).

The two issues I see are -
1. flatMap is needed to expand the key tuples, but this also duplicates the
values, and as the values are large this increases the shuffle input size
for reduceByKey - is there a way to avoid the duplication?
2. reduceByKey is adding two arrays element wise, and creates a new array
for every addition, is there a way to reduce by not creating a new array
everytime(Similar to what accumulators do)?

I am pasting a sample code, query plan and output below.


val attributeToFloatArrayRDD = sc.parallelize(Array(
  (A-1, B-2, C-1, D-1, E-1)   - (0.0 to 1000.0 by 0.25).toArray
  , (A-2, B-1, C-1, D-2, E-1) - (5.0 to 1005.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray
  , (A-3, B-3, C-1, D-1, E-2) - (0.0 to 1000.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray
  , (A-4, B-3, C-1, D-1, E-1) - (8.0 to 1008.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray

val groupToVaRRDD = attributeToFloatArrayRDD
  .flatMap(x = x._1 match {
case (t1, t2, t3, t4, t5) = Array((t1+_top), (t1, t2), (t1,
t2, t3), (t1, t2, t3, t4), (t1, t2, t3, t4, t5)).map(y = (y, x._2))
  .reduceByKey((x, y) = {
require(x.size == y.size)
(x,y) + _)
  .map(x = {
(x._1, x._2.sorted.take(x._2.size/20).last)

 Query Plan

(16) MappedRDD[12] at map at GroupByTest.scala:81 []

 | ShuffledRDD[11] at reduceByKey at GroupByTest.scala:76 []

 +-(16) FlatMappedRDD[10] at flatMap at GroupByTest.scala:68 []

| ParallelCollectionRDD[9] at parallelize at GroupByTest.scala:56 []


GroupBy VaR
(A-2,B-1)   54.75
(A-2,B-1,C-1,D-2)   54.75
(A-1,B-1)   149.25
(A-1,B-1,C-1,D-1,E-1)   149.25
(A-3,B-3,C-1)   49.75
(A-3,B-3)   49.75
(A-4,B-3,C-1,D-1,E-1)   57.75
(A-2,B-1,C-1)   54.75
(A-1,B-2,C-1,D-1,E-1)   49.75
(A-1,B-1,C-1,D-1)   149.25
(A-3,B-3,C-1,D-1,E-2)   49.75
(A-1,B-2,C-1)   49.75
(A-3,B-3,C-1,D-1)   49.75
(A-4,B-3)   57.75
(A-1,B-1,C-1)   149.25
A-1_top 199.0
(A-4,B-3,C-1,D-1)   57.75
A-2_top 54.75
(A-1,B-2)   49.75
(A-4,B-3,C-1)   57.75
A-3_top 49.75
A-4_top 57.75
(A-2,B-1,C-1,D-2,E-1)   54.75
(A-1,B-2,C-1,D-1)   49.75

Re: Launching Spark app in client mode for standalone cluster

2015-01-06 Thread Boromir Widas
Thanks for the pointers. The issue was due to route caching by Spray, which
would always return the same value. Other than that the program is working

On Mon, Jan 5, 2015 at 12:44 AM, Simon Chan wrote:


 You may like to take a look at how we make Spray and Spark working
 together at the PredictionIO project:


 On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work

 Just a guess here, may not be correct.

   Spray needs to start akka actor system; spark context also creates an
 akka actor system, is it possible there are some conflict ?

 Sent from my iPad

 On Jan 4, 2015, at 7:42 PM, Boromir Widas wrote:


 I am trying to launch a Spark app(client mode for standalone cluster)
 from a Spray server, using the following code.

 When I run it as

 $ java -cp class paths SprayServer

 the SimpleApp.getA() call from  SprayService returns -1(which means it
 sees the logData RDD as null for HTTP requests), but the statements from
 within get correct values from SimpleApp.getA().

 Any idea why the HTTP requests do not see the cached RDD? I have been
 trying to debug this for some time but not getting anywhere - any pointers
 will be greatly appreciated.



 import{ActorSystem, Props}


 import spray.can.Http


 import spray.routing.HttpService

 import scala.concurrent.ops

 object SprayServer {

   def main(args: Array[String]) {

 // we need an ActorSystem to host our service

 implicit val system = ActorSystem()

 //create our service actor

 val service = system.actorOf(Props[SprayServiceActor],

 //bind our actor to an HTTP port

 IO(Http) ! Http.Bind(service, interface =, port = 8085)

 ops.spawn {

   * *




 class SprayServiceActor extends SprayService with Actor {

   // the HttpService trait (which SprayService will extend) defines

   // only one abstract member, which connects the services environment

   // to the enclosing actor or test.

   def actorRefFactory = context

   def receive = runRoute(rootRoute)


 trait SprayService extends HttpService {

   def default = path() {

 println(handling default route)

 val numAs = *SimpleApp.getA()   // DOES NOT WORK   *

 get { complete(snum A: $numAs) }


   def pingRoute = path(ping) {

 get { complete(pong!) }


   def pongRoute = path(pong) {

 get { complete(pong!?) }


   def rootRoute = pingRoute ~ pongRoute ~ default



 import org.apache.spark.SparkContext

 import org.apache.spark.SparkContext._

 import org.apache.spark.SparkConf

 import org.apache.spark.deploy.SparkSubmit

 import org.apache.spark.rdd.RDD

 object SimpleApp {

   var resultString: String = Data not assigned

   var logData: RDD[String] = null

   def main(args: Array[String]) {

 val logFile = /home/ovik/src/spark/ // Should be some file
 on your system

 val conf = new SparkConf().setAppName(Simple Application)

 val sc = new SparkContext(conf)

 logData = sc.textFile(logFile, 2).cache()

 val numAs = logData.filter(line = line.contains(a)).count()

 val numBs = logData.filter(line = line.contains(b)).count()

 resultString = Lines with a: %s, Lines with b: %s.format(numAs,



   def getA(): Int = {


 if(null == logData) {

   println( logData is null!)


 } else {

   val numAs = logData.filter(line =

   println(s numAs: $numAs)





 object SimpleAppLoader {

   def main(args: Array[String]) {



   def run() {

 val clArgs = Array(

   --deploy-mode, client

   , --total-executor-cores, 2

   , --class, SimpleApp

   , --conf, spark.shuffle.spill=false

   , --conf, spark.master=spark://troika:7077

   , --conf, spark.driver.memory=128m

   , --conf, spark.executor.memory=128m

   , --conf, spark.eventLog.enabled=true

   , --conf, spark.eventLog.dir=/home/ovik/logs

   , SparkContext.jarOfClass(this.getClass).get)


 val numAs = *SimpleApp.getA()// WORKS *

 println(snumAs is $numAs)



Re: Re: I think I am almost lost in the internals of Spark

2015-01-06 Thread Boromir Widas
I do not understand Chinese but the diagrams on that page are very helpful.

On Tue, Jan 6, 2015 at 9:46 PM, eric wong wrote:

 A good beginning if you are chinese.

 2015-01-07 10:13 GMT+08:00

 Thank you, Tobias. I will look into  the Spark paper. But it looks that
 the paper has been moved,
 A web page is returned (Resource not found)when I access it.


 *From:* Tobias Pfeiffer
 *Date:* 2015-01-07 09:24
 *To:* Todd
 *CC:* user
 *Subject:* Re: I think I am almost lost in the internals of Spark

 On Tue, Jan 6, 2015 at 11:24 PM, Todd wrote:

 I am a bit new to Spark, except that I tried simple things like word
 count, and the examples given in the spark sql programming guide.
 Now, I am investigating the internals of Spark, but I think I am almost
 lost, because I could not grasp a whole picture what spark does when it
 executes the word count.

 I recommend understanding what an RDD is and how it is processed, using
 and probably also
   (once the server is back).
 Understanding how an RDD is processed is probably most helpful to
 understand the whole of Spark.



Launching Spark app in client mode for standalone cluster

2015-01-04 Thread Boromir Widas

I am trying to launch a Spark app(client mode for standalone cluster) from
a Spray server, using the following code.

When I run it as

$ java -cp class paths SprayServer

the SimpleApp.getA() call from  SprayService returns -1(which means it sees
the logData RDD as null for HTTP requests), but the statements from within get correct values from SimpleApp.getA().

Any idea why the HTTP requests do not see the cached RDD? I have been
trying to debug this for some time but not getting anywhere - any pointers
will be greatly appreciated.



import{ActorSystem, Props}


import spray.can.Http


import spray.routing.HttpService

import scala.concurrent.ops

object SprayServer {

  def main(args: Array[String]) {

// we need an ActorSystem to host our service

implicit val system = ActorSystem()

//create our service actor

val service = system.actorOf(Props[SprayServiceActor], test-service)

//bind our actor to an HTTP port

IO(Http) ! Http.Bind(service, interface =, port = 8085)

ops.spawn {

  * *




class SprayServiceActor extends SprayService with Actor {

  // the HttpService trait (which SprayService will extend) defines

  // only one abstract member, which connects the services environment

  // to the enclosing actor or test.

  def actorRefFactory = context

  def receive = runRoute(rootRoute)


trait SprayService extends HttpService {

  def default = path() {

println(handling default route)

val numAs = *SimpleApp.getA()   // DOES NOT WORK   *

get { complete(snum A: $numAs) }


  def pingRoute = path(ping) {

get { complete(pong!) }


  def pongRoute = path(pong) {

get { complete(pong!?) }


  def rootRoute = pingRoute ~ pongRoute ~ default



import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

import org.apache.spark.deploy.SparkSubmit

import org.apache.spark.rdd.RDD

object SimpleApp {

  var resultString: String = Data not assigned

  var logData: RDD[String] = null

  def main(args: Array[String]) {

val logFile = /home/ovik/src/spark/ // Should be some file
on your system

val conf = new SparkConf().setAppName(Simple Application)

val sc = new SparkContext(conf)

logData = sc.textFile(logFile, 2).cache()

val numAs = logData.filter(line = line.contains(a)).count()

val numBs = logData.filter(line = line.contains(b)).count()

resultString = Lines with a: %s, Lines with b: %s.format(numAs,



  def getA(): Int = {


if(null == logData) {

  println( logData is null!)


} else {

  val numAs = logData.filter(line = line.contains(a)).count().toInt

  println(s numAs: $numAs)





object SimpleAppLoader {

  def main(args: Array[String]) {



  def run() {

val clArgs = Array(

  --deploy-mode, client

  , --total-executor-cores, 2

  , --class, SimpleApp

  , --conf, spark.shuffle.spill=false

  , --conf, spark.master=spark://troika:7077

  , --conf, spark.driver.memory=128m

  , --conf, spark.executor.memory=128m

  , --conf, spark.eventLog.enabled=true

  , --conf, spark.eventLog.dir=/home/ovik/logs

  , SparkContext.jarOfClass(this.getClass).get)


val numAs = *SimpleApp.getA()// WORKS *

println(snumAs is $numAs)



Re: building spark1.2 meet error

2015-01-03 Thread Boromir Widas
it should be under
 ls assembly/target/scala-2.10/*

On Sat, Jan 3, 2015 at 10:11 PM, j_soft wrote:

- thanks, it is success builded
- .but where is builded zip file? I not find finished .zip or .tar.gz

 2014-12-31 19:22 GMT+08:00 xhudik [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=20958i=0:

 Hi J_soft,

 for me it is working, I didn't put -Dscala-2.10 -X parameters. I got only
 one warning, since I don't have hadoop 2.5 it didn't activate this profile:

 *larix@kovral:~/sources/spark-1.2.0 mvn -Pyarn -Phadoop-2.5
 -Dhadoop.version=2.5.0 -DskipTests clean package   Found 0 infos
 Finished in 3 ms [INFO]
 [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM
 ... SUCCESS [ 14.177 s] [INFO] Spark Project
 Networking ... SUCCESS [ 14.670 s] [INFO] Spark
 Project Shuffle Streaming Service  SUCCESS [  9.030 s] [INFO]
 Spark Project Core . SUCCESS [04:42 min]
 [INFO] Spark Project Bagel  SUCCESS [
 26.184 s] [INFO] Spark Project GraphX ...
 SUCCESS [01:07 min] [INFO] Spark Project Streaming
  SUCCESS [01:35 min] [INFO] Spark Project
 Catalyst . SUCCESS [01:48 min] [INFO] Spark
 Project SQL .. SUCCESS [01:55 min] [INFO]
 Spark Project ML Library ... SUCCESS [02:17 min]
 [INFO] Spark Project Tools  SUCCESS [
 15.527 s] [INFO] Spark Project Hive .
 SUCCESS [01:43 min] [INFO] Spark Project REPL
 . SUCCESS [ 45.154 s] [INFO] Spark Project
 YARN Parent POM .. SUCCESS [  3.885 s] [INFO] Spark
 Project YARN Stable API .. SUCCESS [01:00 min] [INFO]
 Spark Project Assembly . SUCCESS [ 50.812 s]
 [INFO] Spark Project External Twitter . SUCCESS [
 21.401 s] [INFO] Spark Project External Flume Sink ..
 SUCCESS [ 25.207 s] [INFO] Spark Project External Flume
 ... SUCCESS [ 34.734 s] [INFO] Spark Project External
 MQTT  SUCCESS [ 22.617 s] [INFO] Spark Project
 External ZeroMQ .. SUCCESS [ 22.444 s] [INFO] Spark
 Project External Kafka ... SUCCESS [ 33.566 s] [INFO]
 Spark Project Examples . SUCCESS [01:23 min]
 [INFO] Spark Project YARN Shuffle Service . SUCCESS [
  4.873 s] [INFO]
 [INFO] Total time: 23:20 min [INFO] Finished at: 2014-12-31T12:02:32+01:00
 [INFO] Final Memory: 76M/855M [INFO]
 [WARNING] The requested profile hadoop-2.5 could not be activated because
 it does not exist.*

 If it won't work for you. I'd try to delete all sources, download source
 code once more and try again ...

 good luck, Tomas

Re: Spark profiler

2014-12-29 Thread Boromir Widas
It would be very helpful if there is any such tool, but the distributed
nature may be difficult to capture.

I had been trying to run a task where merging the accumulators was taking
an inordinately long time and was not reflecting in the standalone
cluster's web UI.
What I think will be useful is to publish metrics for different lifecycle
stages of a job to a system like Ganglia to zero in on bottlenecks. Perhaps
the user can define some of the metrics in the config.

I have been thinking of tinkering with the metrics publisher to add custom
metrics to get a bigger picture and time breakdown.

On Mon, Dec 29, 2014 at 10:24 AM, rapelly kartheek


 I want to find the time taken for replicating an rdd in spark cluster
 along with the computation time on the replicated rdd.

 Can someone please suggest a suitable spark profiler?

 Thank you

Re: Using more cores on machines

2014-12-22 Thread Boromir Widas
If you are looking to reduce network traffic then setting
to false may help.

On Mon, Dec 22, 2014 at 11:44 AM, Ashic Mahtab wrote:

 Hi Josh,
 I'm not looking to change the 1:1 ratio.

 What I'm trying to do is get both cores on two machines working, rather
 than one core on all four machines. With --total-executor-cores 4, I have 1
 core per machine working for an app. I'm looking for something that'll let
 me use 2 cores per machine on 2 machines (so 4 cores in total) while not
 using the other two machines.


  Date: Mon, 22 Dec 2014 17:36:26 +0100
  Subject: Re: Using more cores on machines

  AFAIK, `--num-executors` is not available for standalone clusters. In
  standalone mode, you must start new workers on your node as it is a
  1:1 ratio of workers to executors.
  On 22 December 2014 at 12:25, Ashic Mahtab wrote:
   Hi Sean,
   Thanks for the response.
   It seems --num-executors is ignored. Specifying --num-executors 2
   --executor-cores 2 is giving the app all 8 cores across 4 machines.
   Date: Mon, 22 Dec 2014 10:57:31 +
   Subject: Re: Using more cores on machines
   I think you want:
   --num-executors 2 --executor-cores 2
   On Mon, Dec 22, 2014 at 10:39 AM, Ashic Mahtab
Say we have 4 nodes with 2 cores each in stand alone mode. I'd like
dedicate 4 cores to a streaming application. I can do this via spark
spark-submit  --total-executor-cores 4
However, this assigns one core per machine. I would like to use 2
on 2
machines instead, leaving the other two machines untouched. Is this
possible? Is there a downside to doing this? My thinking is that I
should be
able to reduce quite a bit of network traffic if all machines are
Re: How to emit multiple keys for the same value?

2014-10-20 Thread Boromir Widas
flatMap should help, it returns a Seq for every input.

On Mon, Oct 20, 2014 at 12:31 PM, HARIPRIYA AYYALASOMAYAJULA wrote:


 I am facing a problem with implementing this - My mapper should emit
 multiple keys for the same value - for every input (k, v) it should emit
 (k, v), (k+1, v),(k+2,v) (k+n,v).
 In MapReduce, it was pretty straight forward - I used a for loop and
 performed Context write within that.

 This is the code I am using in the map function of MR job:

 for (int k = 0; k   8; k++)


  key = date + :  + k;

  context.write(new Text(key), new DoubleWritable(value));


   I tried using the map function, but i'm stuck in understanding how I can
 use the loop within it.

 It would be great if someone can suggest me what could be possible ways to
 do it.

 Thanks in advance.

 Haripriya Ayyalasomayajula

Re: object in an rdd: serializable?

2014-10-16 Thread Boromir Widas
make it a case class should work.

On Thu, Oct 16, 2014 at 8:30 PM, ll wrote:

 i got an exception complaining about serializable.  the sample code is

 class HelloWorld(val count: Int) {

 object Test extends App {
   val data = sc.parallelize(List(new HelloWorld(1), new HelloWorld(2)))

 what is the best way to serialize HelloWorld so that it can be contained in
 an RDD?


Re: Executor and BlockManager memory size

2014-10-10 Thread Boromir Widas
Hey Larry,

I have been trying to figure this out for standalone clusters as well.
has an answer as to what block manager is for.

From the documentation, what I understood was if you assign X GB to each
executor, 0.6) * X is assigned to the
BlockManager and the rest for the JVM itself(?).
However, as you see, 26.8G is assigned to the BM, and assuming 0.6
memoryFraction, this means the executor sees ~44.7G of memory, I am not
sure what happens to the difference(5.3G).

On Thu, Oct 9, 2014 at 9:40 PM, Larry Xiao wrote:

 Hi all,

 I'm confused about Executor and BlockManager, why they have different

 14/10/10 08:50:02 INFO AppClient$ClientActor: Executor added:
 app-20141010085001-/2 on worker-20141010004933-brick6-35657
 (brick6:35657) with 6 cores
 14/10/10 08:50:02 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20141010085001-/2 on hostPort brick6:35657 with 6 cores, 50.0 GB RAM

 14/10/10 08:50:07 INFO BlockManagerMasterActor: Registering block manager
 brick6:53296 with 26.8 GB RAM

 and on the WebUI,

 Executor IDAddressRDD Blocks  Memory UsedDisk UsedActive
 TasksFailed Tasks  Complete TasksTotal TasksTask TimeInput
   Shuffle ReadShuffle Write
 0brick3:3760700.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 1brick1:5949300.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 2brick6:5329600.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 3brick5:3854300.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 4brick2:4493700.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 5brick4:4679800.0 B / 26.8 GB0.0 B60  060
 ms0.0 B0.0 B0.0 B
 driverbrick0:5769200.0 B / 274.6 MB0.0 B  000
   00 ms0.0 B0.0 B0.0 B

 As I understand it, a worker consist of a daemon and an executor, and
 executor takes charge both execution and storage.
 So does it mean that 26.8 GB is saved for storage and the rest is for

 Another question is that, throughout execution, it seems that the
 blockmanager is always almost free.

 14/10/05 14:33:44 INFO BlockManagerInfo: Added broadcast_21_piece0 in
 memory on brick2:57501 (size: 1669.0 B, free: 21.2 GB)

 I don't know what I'm missing here.

 Best regards,

Re: Handling tree reduction algorithm with Spark in parallel

2014-10-03 Thread Boromir Widas
Thanks Matei, will check out the MLLib implementation.

On Wed, Oct 1, 2014 at 2:24 PM, Andy Twigg wrote:

 Yes, that makes sense. It's similar to the all reduce pattern in vw.

 On Wednesday, 1 October 2014, Matei Zaharia

 Some of the MLlib algorithms do tree reduction in 1.1:
 You can check out how they implemented it -- it is a series of reduce


 On Oct 1, 2014, at 11:02 AM, Boromir Widas wrote:

 Thanks a lot Andy and Debashish, your suggestions were of great help.

 On Tue, Sep 30, 2014 at 6:44 PM, Debasish Das

 If the tree is too big build it on graphxbut it will need thorough
 analysis so that the partitions are well balanced...

 On Tue, Sep 30, 2014 at 2:45 PM, Andy Twigg

 Hi Boromir,

 Assuming the tree fits in memory, and what you want to do is
 parallelize the computation, the 'obvious' way is the following:

 * broadcast the tree T to each worker (ok since it fits in memory)
 * construct an RDD for the deepest level - each element in the RDD is
 * aggregate this by key (=parent) - RDD[parent,data]
 * map each element (p, data) - (parent(p), data) using T
 * repeat until you have an RDD of size = 1 (assuming T is connected)

 If T cannot fit in memory, or is very deep, then there are more exotic
 techniques, but hopefully this suffices.



 On 30 September 2014 14:12, Boromir Widas wrote:

 Hello Folks,

 I have been trying to implement a tree reduction algorithm recently in
 spark but could not find suitable parallel operations. Assuming I have a
 general tree like the following -

 I have to do the following -
 1) Do some computation at each leaf node to get an array of
 doubles.(This can be pre computed)
 2) For each non leaf node, starting with the root node compute the sum
 of these arrays for all child nodes. So to get the array for node B, I 
 to get the array for E, which is the sum of G + H.

 // Start Snippet
 case class Node(name: String, children: Array[Node], values:

 // read in the tree here

 def getSumOfChildren(node: Node) : Array[Double] = {
 if(node.isLeafNode) {
   return node.values
 foreach(child in node.children) {
// can use an accumulator here
node.values = (node.values,
 // End Snippet

 Any pointers to how this can be done in parallel to use all cores will
 be greatly appreciated.


Handling tree reduction algorithm with Spark in parallel

2014-09-30 Thread Boromir Widas
Hello Folks,

I have been trying to implement a tree reduction algorithm recently in
spark but could not find suitable parallel operations. Assuming I have a
general tree like the following -

I have to do the following -
1) Do some computation at each leaf node to get an array of doubles.(This
can be pre computed)
2) For each non leaf node, starting with the root node compute the sum of
these arrays for all child nodes. So to get the array for node B, I need to
get the array for E, which is the sum of G + H.

// Start Snippet
case class Node(name: String, children: Array[Node], values: Array[Double])

// read in the tree here

def getSumOfChildren(node: Node) : Array[Double] = {
if(node.isLeafNode) {
  return node.values
foreach(child in node.children) {
   // can use an accumulator here
   node.values = (node.values, getSumOfChildren(child))
// End Snippet

Any pointers to how this can be done in parallel to use all cores will be
greatly appreciated.


Re: Memory under-utilization

2014-09-16 Thread Boromir Widas
Perhaps your job does not use more than 9g. Even though the dashboard shows
64g the process only uses whats needed and grows to 64g max.

On Tue, Sep 16, 2014 at 5:40 PM, francisco wrote:

 Hi, I'm a Spark newbie.

 We had installed spark-1.0.2-bin-cdh4 on a 'super machine' with 256gb
 and 48 cores.

 Tried to allocate a task with 64gb memory but for whatever reason Spark is
 only using around 9gb max.

 Submitted spark job with the following command:
 /bin/spark-submit -class SimpleApp --master local[16] --executor-memory 64G
 /var/tmp/simple-project_2.10-1.0.jar /data/lucene/ns.gz

 When I run 'top' command I see only 9gb of memory is used by the spark

 3047005 fran  30  10 8785m 703m  18m S 112.9  0.3  48:19.63 java

 Any idea why this is happening? I've also tried to set the memory
 programatically using
  new SparkConf().set(spark.executor.memory, 64g)  but that also
 do anything.

 Is there some limitation when running in 'local' mode?


Re: Memory under-utilization

2014-09-16 Thread Boromir Widas
I see, what does http://localhost:4040/executors/ show for memory usage?

I personally find it easier to work with a standalone cluster with a single
worker by using the sbin/ and then connecting to the master.

On Tue, Sep 16, 2014 at 6:04 PM, francisco wrote:

 Thanks for the reply.

 I doubt that's the case though ...  the executor kept having to do a file
 dump because memory is full.

 14/09/16 15:00:18 WARN ExternalAppendOnlyMap: Spilling in-memory map of 67
 MB to disk (668 times so far)
 14/09/16 15:00:21 WARN ExternalAppendOnlyMap: Spilling in-memory map of 66
 MB to disk (669 times so far)
 14/09/16 15:00:24 WARN ExternalAppendOnlyMap: Spilling in-memory map of 70
 MB to disk (670 times so far)
 14/09/16 15:00:31 WARN ExternalAppendOnlyMap: Spilling in-memory map of 127
 MB to disk (671 times so far)
 14/09/16 15:00:43 WARN ExternalAppendOnlyMap: Spilling in-memory map of 67
 MB to disk (672 times so far)

Compiler issues for multiple map on RDD

2014-09-15 Thread Boromir Widas
Hello Folks,

I am trying to chain a couple of map operations and it seems the second map
fails with a mismatch in arguments(event though the compiler prints them to
be the same.) I checked the function and variable types using :t and they
look ok to me.

Have you seen this earlier? I am posting the code, data and output below.

Any pointers will be greatly appreciated.


val data = sc.textFile(data/testpv.csv)

case class KVV(key: String, valvec: Array[Double])

def mapToKV(line: String) : KVV = {
val splits = line.split(,)
val key = splits(0).trim
val valvec = splits.drop(1).map(x = x.trim.toDouble)

val kvv = KVV(key, valvec)
return kvv

val kvs = = mapToKV(line))

def mapKVtoKVL(kvv: KVV) : KVV = {
return kvv
val tvar = = mapKVtoKVL(x))

/// SAMPLE DATA in testpv.csv

scala val data = sc.textFile(data/testpv.csv)
14/09/15 10:53:23 INFO MemoryStore: ensureFreeSpace(146579) called with
curMem=0, maxMem=308713881
14/09/15 10:53:23 INFO MemoryStore: Block broadcast_0 stored as values to
memory (estimated size 143.1 KB, free 294.3 MB)
data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at

scala case class KVV(key: String, valvec: Array[Double])
defined class KVV


scala def mapToKV(line: String) : KVV = {
 | val splits = line.split(,)
 | val key = splits(0).trim
 | val valvec = splits.drop(1).map(x = x.trim.toDouble)
 | val kvv = KVV(key, valvec)
 | return kvv
 | }
mapToKV: (line: String)KVV

scala val kvs = = mapToKV(line))
kvs: org.apache.spark.rdd.RDD[KVV] = MappedRDD[2] at map at console:18


scala def mapKVtoKVL(kvv: KVV) : KVV = {
 | return kvv
 | }
mapKVtoKVL: (kvv: KVV)KVV

scala val tvar = = mapKVtoKVL(x))
console:22: error: type mismatch;
 found   : KVV
 required: KVV
   val tvar = = mapKVtoKVL(x))