Implementing FIRST_VALUE, LEAD, LAG in Spark

2015-02-13 Thread Dmitry Tolpeko
Hello,

To convert existing Map Reduce jobs to Spark, I need to implement window
functions such as FIRST_VALUE, LEAD, LAG and so on. For example,
FIRST_VALUE function:

Source (1st column is key):

A, A1
A, A2
A, A3
B, B1
B, B2
C, C1

and the result should be

A, A1, A1
A, A2, A1
A, A3, A1
B, B1, B1
B, B2, B1
C, C1, C1

You can see that the first value in a group is repeated in each row.

My current Spark/Scala code:

def firstValue(b: Iterable[String]) : List[(String, String)] = {
  val c = scala.collection.mutable.MutableList[(String, String)]()
  var f = 
  b.foreach(d = { if(f.isEmpty()) f = d; c += d - f})
  c.toList
}

val data=sc.parallelize(List(
   (A, A1),
   (A, A2),
   (A, A3),
   (B, B1),
   (B, B2),
   (C, C1)))

data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect

So I create a new list after groupByKey. Is it right approach to do this in
Spark? Are there any other options? Please point me to any drawbacks.

Thanks,

Dmitry


Interact with streams in a non-blocking way

2015-02-13 Thread jamborta
Hi all,

I am trying to come up with a workflow where I can query streams
asynchronously. The problem I have is a ssc.awaitTermination() line blocks
the whole thread, so it is not straightforward to me whether it is possible
to get hold of objects from streams once they are started. any suggestion on
what is the best way to implement this?

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Interact with streams in a non-blocking way

2015-02-13 Thread Sean Owen
You call awaitTermination() in the main thread, and indeed it blocks
there forever. From there Spark Streaming takes over, and is invoking
the operations you set up. Your operations have access to the data of
course. That's the model; you don't make external threads that reach
in to Spark Streaming's objects, but can easily create operations that
take whatever actions you want and invoke them in Streaming.

On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I am trying to come up with a workflow where I can query streams
 asynchronously. The problem I have is a ssc.awaitTermination() line blocks
 the whole thread, so it is not straightforward to me whether it is possible
 to get hold of objects from streams once they are started. any suggestion on
 what is the best way to implement this?

 thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can spark job server be used to visualize streaming data?

2015-02-13 Thread Mayur Rustagi
Frankly no good/standard way to visualize streaming data. So far I have
found HBase as good intermediate store to store data from streams 
visualize it by a play based framework  d3.js.
Regards
Mayur


On Fri Feb 13 2015 at 4:22:58 PM Kevin (Sangwoo) Kim kevin...@apache.org
wrote:

 I'm not very sure for CDH 5.3,
 but now Zeppelin works for Spark 1.2 as spark-repl has been published in
 Spark 1.2.1
 Please try again!

 On Fri Feb 13 2015 at 3:55:19 PM Su She suhsheka...@gmail.com wrote:

 Thanks Kevin for the link, I have had issues trying to install zeppelin
 as I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please
 correct me if I am mistaken.

 On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim kevin...@apache.org
  wrote:

 Apache Zeppelin also has a scheduler and then you can reload your chart
 periodically,
 Check it out:
 http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html




 On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

   One method I’ve used is to publish each batch to a message bus or
 queue with a custom UI listening on the other end, displaying the results
 in d3.js or some other app. As far as I’m aware there isn’t a tool that
 will directly take a DStream.

  Spark Notebook seems to have some support for updating graphs
 periodically. I haven’t used it myself yet so not sure how well it works.
 See here: https://github.com/andypetrella/spark-notebook

   From: Su She
 Date: Thursday, February 12, 2015 at 1:55 AM
 To: Felix C
 Cc: Kelvin Chu, user@spark.apache.org

 Subject: Re: Can spark job server be used to visualize streaming data?

   Hello Felix,

  I am already streaming in very simple data using Kafka (few messages
 / second, each record only has 3 columns...really simple, but looking to
 scale once I connect everything). I am processing it in Spark Streaming and
 am currently writing word counts to hdfs. So the part where I am confused
 is...

 Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data -
 Spark Word Count - *How do I visualize?*

  is there a viz tool that I can set up to visualize JavaPairDStreams?
 or do I have to write to hbase/hdfs first?

  Thanks!

 On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com
 wrote:

  What kind of data do you have? Kafka is a popular source to use with
 spark streaming.
 But, spark streaming also support reading from a file. Its called
 basic source

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers

 --- Original Message ---

 From: Su She suhsheka...@gmail.com
 Sent: February 11, 2015 10:23 AM
 To: Felix C felixcheun...@hotmail.com
 Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Thank you Felix and Kelvin. I think I'll def be using the k-means
 tools in mlib.

  It seems the best way to stream data is by storing in hbase and then
 using an api in my viz to extract data? Does anyone have any thoughts on
 this?

   Thanks!


 On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com
 wrote:

  Checkout

 https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

 In there are links to how that is done.


 --- Original Message ---

 From: Kelvin Chu 2dot7kel...@gmail.com
 Sent: February 10, 2015 12:48 PM
 To: Su She suhsheka...@gmail.com
 Cc: user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Hi Su,

  Out of the box, no. But, I know people integrate it with Spark
 Streaming to do real-time visualization. It will take some work though.

  Kelvin

 On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote:

  Hello Everyone,

  I was reading this blog post:
 http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/

  and was wondering if this approach can be taken to visualize
 streaming data...not just historical data?

  Thank you!

  -Suh








SparkSQL and star schema

2015-02-13 Thread Paolo Platter
Hi,

is SparkSQL + Parquet suitable to replicate a star schema ?

Paolo Platter
AgileLab CTO



Re: An interesting and serious problem I encountered

2015-02-13 Thread Sean Owen
A number of comments:

310GB is probably too large for an executor. You probably want many
smaller executors per machine. But this is not your problem.

You didn't say where the OutOfMemoryError occurred. Executor or driver?

Tuple2 is a Scala type, and a general type. It is appropriate for
general pairs. You're asking about optimizing for a primitive array,
yes, but of course Spark handles other types.

I don't quite understand your test result. An array doesn't change
size because it's referred to in a Tuple2. You are still dealing with
a primitive array.

There is no general answer to your question. Usually you have to
consider the overhead of Java references, which does matter
significantly, but there is no constant multiplier of course. It's up
to you if it matters to implement more efficient data structures. Here
however you're using just about the most efficient rep of an array of
integers.

I think you have plenty of memory in general, so the question is what
was throwing the memory error? I'd also confirm that the configuration
your executors actually used is what you expect to rule out config
problems.

On Fri, Feb 13, 2015 at 6:26 AM, Landmark fangyixiang...@gmail.com wrote:
 Hi foks,

 My Spark cluster has 8 machines, each of which has 377GB physical memory,
 and thus the total maximum memory can be used for Spark is more than
 2400+GB. In my program, I have to deal with 1 billion of (key, value) pairs,
 where the key is an integer and the value is an integer array with 43
 elements.  Therefore, the memory cost of this raw dataset is [(1+43) *
 10 * 4] / (1024 * 1024 * 1024) = 164GB.

 Since I have to use this dataset repeatedly, I have to cache it in memory.
 Some key parameter settings are:
 spark.storage.fraction=0.6
 spark.driver.memory=30GB
 spark.executor.memory=310GB.

 But it failed on running a simple countByKey() and the error message is
 java.lang.OutOfMemoryError: Java heap space Does this mean a Spark
 cluster of 2400+GB memory cannot keep 164GB raw data in memory?

 The codes of my program is as follows:

 def main(args: Array[String]):Unit = {
 val sc = new SparkContext(new SparkConfig());

 val rdd = sc.parallelize(0 until 10, 25600).map(i = (i, new
 Array[Int](43))).cache();
 println(The number of keys is  + rdd.countByKey());

 //some other operations following here ...
 }




 To figure out the issue, I evaluated the memory cost of key-value pairs and
 computed their memory cost using SizeOf.jar. The codes are as follows:

 val arr = new Array[Int](43);
 println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr)));

 val tuple = (1, arr.clone);
 println(SizeOf.humanReadable(SizeOf.deepSizeOf(tuple)));

 The output is:
 192.0b
 992.0b


 *Hard to believe, but it is true!! This result means, to store a key-value
 pair, Tuple2 needs more than 5+ times memory than the simplest method with
 array. Even though it may take 5+ times memory, its size is less than
 1000GB, which is still much less than the total memory size of my cluster,
 i.e., 2400+GB. I really do not understand why this happened.*

 BTW, if the number of pairs is 1 million, it works well. If the arr contains
 only 1 integer, to store a pair, Tuples needs around 10 times memory.

 So I have some questions:
 1. Why does Spark choose such a poor data structure, Tuple2, for key-value
 pairs? Is there any better data structure for storing (key, value)  pairs
 with less memory cost ?
 2. Given a dataset with size of M, in general Spark how many times of memory
 to handle it?


 Best,
 Landmark




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Distributed Join

2015-02-13 Thread Ashish Mukherjee
Hello,

I have the following scenario and was wondering if I can use Spark to
address it.

I want to query two different data stores (say, ElasticSearch and MySQL)
and then merge the two result sets based on a join key between the two. Is
it appropriate to use Spark to do this join, if the intermediate data sets
are large? (This is a No-ETL scenario)

I was thinking of two possibilities -

1) Send the intermediate data sets to Spark through a stream and get Spark
to do the join. The complexity here is that there would be multiple
concurrent streams to deal with. If I don't use streams, there would be
intermediate disk writes and data transfer to the Spark master.

2) Don't use Spark and do the same with some in-memory distributed engine
like MemSQL or Redis.

What's the experts' view on this?

Regards,
Ashish


Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-13 Thread Sean Owen
It is a wrapper whose API is logically the same, but whose method
signature make more sense in Java. You can call the Scala API in Java
without too much trouble, but it gets messy when you have to manually
grapple with ClassTag from Java for example.

There is not an implicit conversion since it is used from Java, which
doesn't have implicits.

On Fri, Feb 13, 2015 at 5:57 AM, Vladimir Protsenko
protsenk...@gmail.com wrote:
 Thank's for reply. I solved porblem with importing
 org.apache.spark.SparkContext._ by Imran Rashid suggestion.

 In the sake of interest, does JavaPairRDD intended for use from java? What
 is the purpose of this class? Does my rdd implicitly converted to it in some
 circumstances?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can spark job server be used to visualize streaming data?

2015-02-13 Thread Kevin (Sangwoo) Kim
I'm not very sure for CDH 5.3,
but now Zeppelin works for Spark 1.2 as spark-repl has been published in
Spark 1.2.1
Please try again!

On Fri Feb 13 2015 at 3:55:19 PM Su She suhsheka...@gmail.com wrote:

 Thanks Kevin for the link, I have had issues trying to install zeppelin as
 I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please
 correct me if I am mistaken.

 On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim kevin...@apache.org
 wrote:

 Apache Zeppelin also has a scheduler and then you can reload your chart
 periodically,
 Check it out:
 http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html




 On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

   One method I’ve used is to publish each batch to a message bus or
 queue with a custom UI listening on the other end, displaying the results
 in d3.js or some other app. As far as I’m aware there isn’t a tool that
 will directly take a DStream.

  Spark Notebook seems to have some support for updating graphs
 periodically. I haven’t used it myself yet so not sure how well it works.
 See here: https://github.com/andypetrella/spark-notebook

   From: Su She
 Date: Thursday, February 12, 2015 at 1:55 AM
 To: Felix C
 Cc: Kelvin Chu, user@spark.apache.org

 Subject: Re: Can spark job server be used to visualize streaming data?

   Hello Felix,

  I am already streaming in very simple data using Kafka (few messages /
 second, each record only has 3 columns...really simple, but looking to
 scale once I connect everything). I am processing it in Spark Streaming and
 am currently writing word counts to hdfs. So the part where I am confused
 is...

 Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data -
 Spark Word Count - *How do I visualize?*

  is there a viz tool that I can set up to visualize JavaPairDStreams?
 or do I have to write to hbase/hdfs first?

  Thanks!

 On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com
 wrote:

  What kind of data do you have? Kafka is a popular source to use with
 spark streaming.
 But, spark streaming also support reading from a file. Its called basic
 source

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers

 --- Original Message ---

 From: Su She suhsheka...@gmail.com
 Sent: February 11, 2015 10:23 AM
 To: Felix C felixcheun...@hotmail.com
 Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Thank you Felix and Kelvin. I think I'll def be using the k-means
 tools in mlib.

  It seems the best way to stream data is by storing in hbase and then
 using an api in my viz to extract data? Does anyone have any thoughts on
 this?

   Thanks!


 On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com
 wrote:

  Checkout

 https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

 In there are links to how that is done.


 --- Original Message ---

 From: Kelvin Chu 2dot7kel...@gmail.com
 Sent: February 10, 2015 12:48 PM
 To: Su She suhsheka...@gmail.com
 Cc: user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Hi Su,

  Out of the box, no. But, I know people integrate it with Spark
 Streaming to do real-time visualization. It will take some work though.

  Kelvin

 On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote:

  Hello Everyone,

  I was reading this blog post:
 http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/

  and was wondering if this approach can be taken to visualize
 streaming data...not just historical data?

  Thank you!

  -Suh








Tuning number of partitions per CPU

2015-02-13 Thread Igor Petrov
Hello,

In Spark programming guide
(http://spark.apache.org/docs/1.2.0/programming-guide.html) there is a
recommendation:
Typically you want 2-4 partitions for each CPU in your cluster.

We have a Spark Master and two Spark workers each with 18 cores and 18 GB of
RAM.
In our application we use JdbcRDD to load data from a DB and then cache it.
We load entities from a single table, now we have 76 million of entities
(entity size in memory is about 160 bytes). We call count() during
application startup to force entities loading. Here are our measurements for
count() operation (cores x partitions = time):
36x36 = 6.5 min
36x72 = 7.7 min
36x108 = 9.4 min

So despite recommendations the most efficient setup is one partition per
core. What is the reason for above recommendation?

Java 8, Apache Spark 1.1.0




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Tuning-number-of-partitions-per-CPU-tp21642.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tuning number of partitions per CPU

2015-02-13 Thread Sean Owen
18 cores or 36? doesn't probably matter.
For this case where you have some overhead per partition of setting up
the DB connection, it may indeed not help to chop up the data more
finely than your total parallelism. Although that would imply quite an
overhead. Are you doing any other expensive initialization per
partition in your code?
You might check some other basic things, like, are you bottlenecked on
the DB (probably not) and are there task stragglers drawing out the
completion time.

On Fri, Feb 13, 2015 at 11:06 AM, Igor Petrov igorpetrov...@gmail.com wrote:
 Hello,

 In Spark programming guide
 (http://spark.apache.org/docs/1.2.0/programming-guide.html) there is a
 recommendation:
 Typically you want 2-4 partitions for each CPU in your cluster.

 We have a Spark Master and two Spark workers each with 18 cores and 18 GB of
 RAM.
 In our application we use JdbcRDD to load data from a DB and then cache it.
 We load entities from a single table, now we have 76 million of entities
 (entity size in memory is about 160 bytes). We call count() during
 application startup to force entities loading. Here are our measurements for
 count() operation (cores x partitions = time):
 36x36 = 6.5 min
 36x72 = 7.7 min
 36x108 = 9.4 min

 So despite recommendations the most efficient setup is one partition per
 core. What is the reason for above recommendation?

 Java 8, Apache Spark 1.1.0




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Tuning-number-of-partitions-per-CPU-tp21642.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Which version to use for shuffle service if I'm going to run multiple versions of Spark

2015-02-13 Thread Jianshi Huang
Get it. Thanks Reynold and Andrew!

Jianshi

On Thu, Feb 12, 2015 at 12:25 AM, Andrew Or and...@databricks.com wrote:

 Hi Jianshi,

 For YARN, there may be an issue with how a recently patch changes the
 accessibility of the shuffle files by the external shuffle service:
 https://issues.apache.org/jira/browse/SPARK-5655. It is likely that you
 will hit this with 1.2.1, actually. For this reason I would have to
 recommend that you use 1.2.2 when it is released, but for now you should
 use 1.2.0 for this specific use case.

 -Andrew

 2015-02-10 23:38 GMT-08:00 Reynold Xin r...@databricks.com:

 I think we made the binary protocol compatible across all versions, so you
 should be fine with using any one of them. 1.2.1 is probably the best since
 it is the most recent stable release.

 On Tue, Feb 10, 2015 at 8:43 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I need to use branch-1.2 and sometimes master builds of Spark for my
 project. However the officially supported Spark version by our Hadoop admin
 is only 1.2.0.

 So, my question is which version/build of spark-yarn-shuffle.jar should
 I use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0)

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Shuffle on joining two RDDs

2015-02-13 Thread Karlson
Does that mean partitioning does not work in Python? Or does this only 
effect joining?


On 2015-02-12 19:27, Davies Liu wrote:
The feature works as expected in Scala/Java, but not implemented in 
Python.


On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com 
wrote:

I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com 
wrote:


ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. 
 It
could be that pyspark doesn't properly support narrow dependencies, 
or maybe
you need to be more explicit about the partitioner.  I am looking 
into the
pyspark api but you might have some better guesses here than I 
thought.


My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those 
fields
are hidden deeper inside and are not user-visible.  But I think a 
better way
(in scala, anyway) is to look at rdd.dependencies.  its a little 
tricky,

though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but 
that
should happen naturally if you just specify the same number of 
partitions,
you'll get equal HashPartitioners.  There is a little difference in 
the

scala  python api that I missed here.  For partitionBy in scala, you
actually need to specify the partitioner, but not in python.  However 
I

thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from 
python.
Hopefully somebody knows a simpler way to confirm narrow 
dependencies??


val d = sc.parallelize(1 to 1e6.toInt).map{x = x - 
x}.groupByKey(64)
val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - 
x}.groupByKey(64)

scala d.partitioner == d2.partitioner
res2: Boolean = true
val joined = d.join(d2)
val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - 
x}.groupByKey(100)

val badJoined = d.join(d3)

d.setName(d)
d2.setName(d2)
d3.setName(d3)
joined.setName(joined)
badJoined.setName(badJoined)


//unfortunatley, just looking at the immediate dependencies of 
joined 

badJoined is misleading, b/c join actually creates
// one more step after the shuffle
scala joined.dependencies
res20: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@74751ac8)
//even with the join that does require a shuffle, we still see a
OneToOneDependency, but thats just a simple flatMap step
scala badJoined.dependencies
res21: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@1cf356cc)






 //so lets make a helper function to get all the dependencies 
recursively


def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
  val deps = rdd.dependencies
  deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)}
}


//full dependencies of the good join

scala flattenDeps(joined).foreach{println}
(joined FlatMappedValuesRDD[9] at join at
console:16,org.apache.spark.OneToOneDependency@74751ac8)
(MappedValuesRDD[8] at join at
console:16,org.apache.spark.OneToOneDependency@623264af)
(CoGroupedRDD[7] at join at
console:16,org.apache.spark.OneToOneDependency@5a704f86)
(CoGroupedRDD[7] at join at
console:16,org.apache.spark.OneToOneDependency@37514cd)
(d ShuffledRDD[3] at groupByKey at
console:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
console:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d2 ShuffledRDD[6] at groupByKey at
console:12,org.apache.spark.ShuffleDependency@5960236d)
(MappedRDD[5] at map at
console:12,org.apache.spark.OneToOneDependency@36b5f6f2)



//full dependencies of the bad join -- notice the ShuffleDependency!

scala flattenDeps(badJoined).foreach{println}
(badJoined FlatMappedValuesRDD[15] at join at
console:16,org.apache.spark.OneToOneDependency@1cf356cc)
(MappedValuesRDD[14] at join at
console:16,org.apache.spark.OneToOneDependency@5dea4db)
(CoGroupedRDD[13] at join at
console:16,org.apache.spark.ShuffleDependency@5c1928df)
(CoGroupedRDD[13] at join at
console:16,org.apache.spark.OneToOneDependency@77ca77b5)
(d ShuffledRDD[3] at groupByKey at
console:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
console:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d3 ShuffledRDD[12] at groupByKey at
console:12,org.apache.spark.ShuffleDependency@d794984)
(MappedRDD[11] at map at
console:12,org.apache.spark.OneToOneDependency@15c98005)




On Thu, Feb 12, 2015 at 10:05 AM, Karlson ksonsp...@siberie.de 
wrote:


Hi Imran,

thanks for your quick reply.

Actually I am doing this:

rddA = rddA.partitionBy(n).cache()
rddB = rddB.partitionBy(n).cache()

followed by

rddA.count()
rddB.count()

then joinedRDD = 

Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-13 Thread Vladimir Protsenko
Thank you for clarification, Sean.

2015-02-13 14:16 GMT+04:00 Sean Owen so...@cloudera.com:

 It is a wrapper whose API is logically the same, but whose method
 signature make more sense in Java. You can call the Scala API in Java
 without too much trouble, but it gets messy when you have to manually
 grapple with ClassTag from Java for example.

 There is not an implicit conversion since it is used from Java, which
 doesn't have implicits.

 On Fri, Feb 13, 2015 at 5:57 AM, Vladimir Protsenko
 protsenk...@gmail.com wrote:
  Thank's for reply. I solved porblem with importing
  org.apache.spark.SparkContext._ by Imran Rashid suggestion.
 
  In the sake of interest, does JavaPairRDD intended for use from java?
 What
  is the purpose of this class? Does my rdd implicitly converted to it in
 some
  circumstances?



Re: Interact with streams in a non-blocking way

2015-02-13 Thread Tamas Jambor
Thanks for the reply, I am trying to setup a streaming as a service
approach, using the framework that is used for spark-jobserver. for that I
would need to handle asynchronous  operations that are initiated from
outside of the stream. Do you think it is not possible?

On Fri Feb 13 2015 at 10:14:18 Sean Owen so...@cloudera.com wrote:

 You call awaitTermination() in the main thread, and indeed it blocks
 there forever. From there Spark Streaming takes over, and is invoking
 the operations you set up. Your operations have access to the data of
 course. That's the model; you don't make external threads that reach
 in to Spark Streaming's objects, but can easily create operations that
 take whatever actions you want and invoke them in Streaming.

 On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote:
  Hi all,
 
  I am trying to come up with a workflow where I can query streams
  asynchronously. The problem I have is a ssc.awaitTermination() line
 blocks
  the whole thread, so it is not straightforward to me whether it is
 possible
  to get hold of objects from streams once they are started. any
 suggestion on
  what is the best way to implement this?
 
  thanks,
 
 
 
  --
  View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Interact-with-streams-in-a-
 non-blocking-way-tp21640.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Interact with streams in a non-blocking way

2015-02-13 Thread Sean Owen
Sure it's possible, but you would use Streaming to update some shared
state, and create another service that accessed that shared state too.

On Fri, Feb 13, 2015 at 11:57 AM, Tamas Jambor jambo...@gmail.com wrote:
 Thanks for the reply, I am trying to setup a streaming as a service
 approach, using the framework that is used for spark-jobserver. for that I
 would need to handle asynchronous  operations that are initiated from
 outside of the stream. Do you think it is not possible?

 On Fri Feb 13 2015 at 10:14:18 Sean Owen so...@cloudera.com wrote:

 You call awaitTermination() in the main thread, and indeed it blocks
 there forever. From there Spark Streaming takes over, and is invoking
 the operations you set up. Your operations have access to the data of
 course. That's the model; you don't make external threads that reach
 in to Spark Streaming's objects, but can easily create operations that
 take whatever actions you want and invoke them in Streaming.

 On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote:
  Hi all,
 
  I am trying to come up with a workflow where I can query streams
  asynchronously. The problem I have is a ssc.awaitTermination() line
  blocks
  the whole thread, so it is not straightforward to me whether it is
  possible
  to get hold of objects from streams once they are started. any
  suggestion on
  what is the best way to implement this?
 
  thanks,
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle on joining two RDDs

2015-02-13 Thread Karlson
In 
https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, 
wouldn't it help to change the lines


vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))

to

vs = rdd.mapValues(lambda v: (1, v))
ws = other.mapValues(lambda v: (2, v))

?
As I understand, this would preserve the original partitioning.


On 2015-02-13 12:43, Karlson wrote:

Does that mean partitioning does not work in Python? Or does this only
effect joining?

On 2015-02-12 19:27, Davies Liu wrote:
The feature works as expected in Scala/Java, but not implemented in 
Python.


On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com 
wrote:

I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com 
wrote:


ah, sorry I am not too familiar w/ pyspark, sorry I missed that 
part.  It
could be that pyspark doesn't properly support narrow dependencies, 
or maybe
you need to be more explicit about the partitioner.  I am looking 
into the
pyspark api but you might have some better guesses here than I 
thought.


My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, 
those fields
are hidden deeper inside and are not user-visible.  But I think a 
better way
(in scala, anyway) is to look at rdd.dependencies.  its a little 
tricky,

though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but 
that
should happen naturally if you just specify the same number of 
partitions,
you'll get equal HashPartitioners.  There is a little difference in 
the
scala  python api that I missed here.  For partitionBy in scala, 
you
actually need to specify the partitioner, but not in python.  
However I

thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from 
python.
Hopefully somebody knows a simpler way to confirm narrow 
dependencies??


val d = sc.parallelize(1 to 1e6.toInt).map{x = x - 
x}.groupByKey(64)
val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - 
x}.groupByKey(64)

scala d.partitioner == d2.partitioner
res2: Boolean = true
val joined = d.join(d2)
val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - 
x}.groupByKey(100)

val badJoined = d.join(d3)

d.setName(d)
d2.setName(d2)
d3.setName(d3)
joined.setName(joined)
badJoined.setName(badJoined)


//unfortunatley, just looking at the immediate dependencies of 
joined 

badJoined is misleading, b/c join actually creates
// one more step after the shuffle
scala joined.dependencies
res20: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@74751ac8)
//even with the join that does require a shuffle, we still see a
OneToOneDependency, but thats just a simple flatMap step
scala badJoined.dependencies
res21: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@1cf356cc)






 //so lets make a helper function to get all the dependencies 
recursively


def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
  val deps = rdd.dependencies
  deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)}
}


//full dependencies of the good join

scala flattenDeps(joined).foreach{println}
(joined FlatMappedValuesRDD[9] at join at
console:16,org.apache.spark.OneToOneDependency@74751ac8)
(MappedValuesRDD[8] at join at
console:16,org.apache.spark.OneToOneDependency@623264af)
(CoGroupedRDD[7] at join at
console:16,org.apache.spark.OneToOneDependency@5a704f86)
(CoGroupedRDD[7] at join at
console:16,org.apache.spark.OneToOneDependency@37514cd)
(d ShuffledRDD[3] at groupByKey at
console:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
console:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d2 ShuffledRDD[6] at groupByKey at
console:12,org.apache.spark.ShuffleDependency@5960236d)
(MappedRDD[5] at map at
console:12,org.apache.spark.OneToOneDependency@36b5f6f2)



//full dependencies of the bad join -- notice the 
ShuffleDependency!


scala flattenDeps(badJoined).foreach{println}
(badJoined FlatMappedValuesRDD[15] at join at
console:16,org.apache.spark.OneToOneDependency@1cf356cc)
(MappedValuesRDD[14] at join at
console:16,org.apache.spark.OneToOneDependency@5dea4db)
(CoGroupedRDD[13] at join at
console:16,org.apache.spark.ShuffleDependency@5c1928df)
(CoGroupedRDD[13] at join at
console:16,org.apache.spark.OneToOneDependency@77ca77b5)
(d ShuffledRDD[3] at groupByKey at
console:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
console:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d3 ShuffledRDD[12] at groupByKey at

how to get SchemaRDD SQL exceptions i.e. table not found exception

2015-02-13 Thread sachin Singh
Hi,
can some one guide how to get SQL Exception trapped for query executed using
SchemaRDD,
i mean suppose table not found

thanks in advance,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-get-SchemaRDD-SQL-exceptions-i-e-table-not-found-exception-tp21645.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Tuning number of partitions per CPU

2015-02-13 Thread Puneet Kumar Ojha
Use below configuration if u r using 1.2 version:-

SET spark.shuffle.consolidateFiles=true;
SET spark.rdd.compress=true;
SET spark.default.parallelism=1000;
SET spark.deploy.defaultCores=54;

Thanks
Puneet.

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Friday, February 13, 2015 4:46 PM
To: Igor Petrov
Cc: user@spark.apache.org
Subject: Re: Tuning number of partitions per CPU

18 cores or 36? doesn't probably matter.
For this case where you have some overhead per partition of setting up the DB 
connection, it may indeed not help to chop up the data more finely than your 
total parallelism. Although that would imply quite an overhead. Are you doing 
any other expensive initialization per partition in your code?
You might check some other basic things, like, are you bottlenecked on the DB 
(probably not) and are there task stragglers drawing out the completion time.

On Fri, Feb 13, 2015 at 11:06 AM, Igor Petrov igorpetrov...@gmail.com wrote:
 Hello,

 In Spark programming guide
 (http://spark.apache.org/docs/1.2.0/programming-guide.html) there is a
 recommendation:
 Typically you want 2-4 partitions for each CPU in your cluster.

 We have a Spark Master and two Spark workers each with 18 cores and 18 
 GB of RAM.
 In our application we use JdbcRDD to load data from a DB and then cache it.
 We load entities from a single table, now we have 76 million of 
 entities (entity size in memory is about 160 bytes). We call count() 
 during application startup to force entities loading. Here are our 
 measurements for
 count() operation (cores x partitions = time):
 36x36 = 6.5 min
 36x72 = 7.7 min
 36x108 = 9.4 min

 So despite recommendations the most efficient setup is one partition 
 per core. What is the reason for above recommendation?

 Java 8, Apache Spark 1.1.0




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Tuning-number-of-p
 artitions-per-CPU-tp21642.html Sent from the Apache Spark User List 
 mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Columnar-Oriented RDDs

2015-02-13 Thread Night Wolf
Hi all,

I'd like to build/use column oriented RDDs in some of my Spark code. A
normal Spark RDD is stored as row oriented object if I understand
correctly.

I'd like to leverage some of the advantages of a columnar memory format.
Shark (used to) and SparkSQL uses a columnar storage format using primitive
arrays for each column.

I'd be interested to know more about this approach and how I could build my
own custom columnar-oriented RDD which I can use outside of Spark SQL.

Could anyone give me some pointers on where to look to do something like
this, either from scratch or using whats there in the SparkSQL libs or
elsewhere. I know Evan Chan in a presentation made mention of building a
custom RDD of column-oriented blocks of data.

Cheers,
~N


Re: running spark project using java -cp command

2015-02-13 Thread Christophe Préaud
You can also export the variable SPARK_PRINT_LAUNCH_COMMAND before launching a 
spark-submit command to display the java command that will be launched, e.g.:

export SPARK_PRINT_LAUNCH_COMMAND=1
/opt/spark/bin/spark-submit --master yarn --deploy-mode cluster --class 
kelkoo.SparkAppTemplate --jars 
hdfs://prod-cluster/user/preaudc/jars/apps/joda-convert-1.6.jar,hdfs://prod-cluster/user/preaudc/jars/apps/joda-time-2.3.jar,hdfs://prod-cluster/user/preaudc/jars/apps/logReader-1.0.22.jar
 --driver-memory 512M --driver-library-path /opt/hadoop/lib/native 
--driver-class-path /usr/share/java/mysql-connector-java.jar --executor-memory 
1G --executor-cores 1 --queue spark-batch --num-executors 2 
hdfs://prod-cluster/user/preaudc/jars/apps/logProcessing-1.0.10.jar --log_dir 
/user/kookel/logs --country fr a b c
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Spark Command: /usr/lib/jvm/java-openjdk/bin/java -cp 
:/usr/share/java/mysql-connector-java.jar:/opt/spark/conf:/opt/spark/lib/spark-assembly-hadoop.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/etc/hadoop:/etc/hadoop
 -XX:MaxPermSize=128m -Djava.library.path=/opt/hadoop/lib/native -Xms512M 
-Xmx512M org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode 
cluster --class kelkoo.SparkAppTemplate --jars 
hdfs://prod-cluster/user/preaudc/jars/apps/joda-convert-1.6.jar,hdfs://prod-cluster/user/preaudc/jars/apps/joda-time-2.3.jar,hdfs://prod-cluster/user/preaudc/jars/apps/logReader-1.0.22.jar
 --driver-memory 512M --driver-library-path /opt/hadoop/lib/native 
--driver-class-path /usr/share/java/mysql-connector-java.jar --executor-memory 
1G --executor-cores 1 --queue spark-batch --num-executors 2 
hdfs://prod-cluster/user/preaudc/jars/apps/logProcessing-1.0.10.jar --log_dir 
/user
/kookel/logs --country fr a b c

(...)



Christophe.

On 10/02/2015 07:26, Akhil Das wrote:
Yes like this:

/usr/lib/jvm/java-7-openjdk-i386/bin/java -cp 
::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar
 -XX:MaxPermSize=128m -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit 
--class org.apache.spark.repl.Main spark-shell

It launches spark-shell.


Thanks
Best Regards

On Tue, Feb 10, 2015 at 11:36 AM, Hafiz Mujadid 
hafizmujadi...@gmail.commailto:hafizmujadi...@gmail.com wrote:
hi experts!

Is there any way to run spark application using java -cp command ?


thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/running-spark-project-using-java-cp-command-tp21567.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org





Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 158 Ter Rue du Temple 75003 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Documentation error in MLlib - Clustering?

2015-02-13 Thread Emre Sevinc
Hello,

I was trying the streaming kmeans clustering example in the official
documentation at:

   http://spark.apache.org/docs/1.2.0/mllib-clustering.html

But I've got a type error when I tried to compile the code:

[error]  found   :
org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint][error]
 required: org.apache.spark.streaming.dstream.DStream[(?,
org.apache.spark.mllib.linalg.Vector)][error]
model.predictOnValues(testData).print()[error]
  ^[error] one error found[error] (compile:compile) Compilation failed


And it seems like the solution is to use

   model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print
()

as shown in
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala

instead of

   model.predictOnValues(testData).print()

as written in the documentation.

I just wanted to draw the attention to this, so that one of the maintainers
can fix the documentation.

-- 
Emre Sevinç


Re: Counters in Spark

2015-02-13 Thread Mark Hamstra
Except that transformations don't have an exactly-once guarantee, so this
way of doing counters may produce different answers across various forms of
failures and speculative execution.

On Fri, Feb 13, 2015 at 8:56 AM, Sean McNamara sean.mcnam...@webtrends.com
wrote:

  .map is just a transformation, so no work will actually be performed
 until something takes action against it.  Try adding a .count(), like so:

  inputRDD.map { x = {
  counter += 1
} }.count()

  In case it is helpful, here are the docs on what exactly the
 transformations and actions are:
 http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations
 http://spark.apache.org/docs/1.2.0/programming-guide.html#actions

  Cheers,

  Sean


  On Feb 13, 2015, at 9:50 AM, nitinkak001 nitinkak...@gmail.com wrote:

 I am trying to implement counters in Spark and I guess Accumulators are the
 way to do it.

 My motive is to update a counter in map function and access/reset it in the
 driver code. However the /println/ statement at the end still yields value
 0(It should 9). Am I doing something wrong?

 def main(args : Array[String]){

val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching)
val sc = new SparkContext(conf)
var counter = sc.accumulable(0, Counter)
var inputFilePath = args(0)
val inputRDD = sc.textFile(inputFilePath)

inputRDD.map { x = {
  counter += 1
} }
println(counter.value)
 }




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Documentation error in MLlib - Clustering?

2015-02-13 Thread Sean Owen
Agree, it's correct in the linear methods doc page too. You can open a
PR for this simple 'typo' fix; I imagine it just wasn't updated or
fixed with the others.

On Fri, Feb 13, 2015 at 4:26 PM, Emre Sevinc emre.sev...@gmail.com wrote:
 Hello,

 I was trying the streaming kmeans clustering example in the official
 documentation at:

http://spark.apache.org/docs/1.2.0/mllib-clustering.html

 But I've got a type error when I tried to compile the code:

 [error]  found   :
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint]
 [error]  required: org.apache.spark.streaming.dstream.DStream[(?,
 org.apache.spark.mllib.linalg.Vector)]
 [error] model.predictOnValues(testData).print()
 [error]   ^
 [error] one error found
 [error] (compile:compile) Compilation failed


 And it seems like the solution is to use

model.predictOnValues(testData.map(lp = (lp.label,
 lp.features))).print()

 as shown in
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala

 instead of

model.predictOnValues(testData).print()

 as written in the documentation.

 I just wanted to draw the attention to this, so that one of the maintainers
 can fix the documentation.

 --
 Emre Sevinç

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Counters in Spark

2015-02-13 Thread nitinkak001
I am trying to implement counters in Spark and I guess Accumulators are the
way to do it.

My motive is to update a counter in map function and access/reset it in the
driver code. However the /println/ statement at the end still yields value
0(It should 9). Am I doing something wrong?

def main(args : Array[String]){

val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching)
val sc = new SparkContext(conf)
var counter = sc.accumulable(0, Counter)
var inputFilePath = args(0)
val inputRDD = sc.textFile(inputFilePath)

inputRDD.map { x = {
  counter += 1
} }
println(counter.value)
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Counters in Spark

2015-02-13 Thread Sean McNamara
.map is just a transformation, so no work will actually be performed until 
something takes action against it.  Try adding a .count(), like so:

inputRDD.map { x = {
 counter += 1
   } }.count()

In case it is helpful, here are the docs on what exactly the transformations 
and actions are:
http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations
http://spark.apache.org/docs/1.2.0/programming-guide.html#actions

Cheers,

Sean


On Feb 13, 2015, at 9:50 AM, nitinkak001 
nitinkak...@gmail.commailto:nitinkak...@gmail.com wrote:

I am trying to implement counters in Spark and I guess Accumulators are the
way to do it.

My motive is to update a counter in map function and access/reset it in the
driver code. However the /println/ statement at the end still yields value
0(It should 9). Am I doing something wrong?

def main(args : Array[String]){

   val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching)
   val sc = new SparkContext(conf)
   var counter = sc.accumulable(0, Counter)
   var inputFilePath = args(0)
   val inputRDD = sc.textFile(inputFilePath)

   inputRDD.map { x = {
 counter += 1
   } }
   println(counter.value)
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.comhttp://Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




Re: Cluster launch

2015-02-13 Thread Brad
I'm playing around with Spark on Windows and have to worker nodes running by
starting them manually using a script that contains the following

set SPARK_HOME=C:\dev\programs\spark-1.2.0
set SPARK_MASTER_IP=master.brad.com
spark-class org.apache.spark.deploy.worker.Worker
spark://master.brad.com:7077

I then create a copy of this script with a different SPARK_HOME defined to
run my second worker from.

This works, however I am unable to configure a different log4j.properties
file for each worker which is a pain as they then both log to the same log
file. The issue seems to be that it's the conf directory on the master that
sends the JVM arguments to each worker to execute a Task. Therefore the same
-Dlog4j.configuration=file:... argument is sent to both.

Does anyone know how the JVM command sent to the workers form the master can
read an environment variable, so this can be defined per worker?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cluster-launch-tp1484p21647.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: An interesting and serious problem I encountered

2015-02-13 Thread Ye Xianjin
Hi, 

I believe SizeOf.jar may calculate the wrong size for you.
 Spark has a util call SizeEstimator located in 
org.apache.spark.util.SizeEstimator. And some one extracted it out in 
https://github.com/phatak-dev/java-sizeof/blob/master/src/main/scala/com/madhukaraphatak/sizeof/SizeEstimator.scala
You can try that out in the scala repl. 
The size for Array[Int](43) is 192bytes (12 bytes object size + 4 bytes length 
variable + (43 * 4 round to 176 bytes))
 And the size for (1, Array[Int](43)) is 240 bytes {
   Tuple2 Object: 12 bytes object size + 4 bytes filed _1 + 4 byes field _2 = 
round to 24 bytes
   1 =  java.lang.Number 12  bytes = round to 16 bytes - java.lang.Integer: 
16 bytes + 4 bytes int = round to 24 bytes ( Integer extends Number. I thought 
Scala Tuple2 will specialized Int and this should be 4, but it seems not)
   Array = 192 bytes
}

So, 24 + 24 + 192 = 240 bytes.
This is my calculation based on the spark SizeEstimator. 

However I am not sure what an Integer will occupy for 64 bits JVM with 
compressedOps on. It should be 12 + 4 = 16 bytes, then that means the 
SizeEstimator gives the wrong result. @Sean what do you think?
-- 
Ye Xianjin
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Friday, February 13, 2015 at 2:26 PM, Landmark wrote:

 Hi foks,
 
 My Spark cluster has 8 machines, each of which has 377GB physical memory,
 and thus the total maximum memory can be used for Spark is more than
 2400+GB. In my program, I have to deal with 1 billion of (key, value) pairs,
 where the key is an integer and the value is an integer array with 43
 elements. Therefore, the memory cost of this raw dataset is [(1+43) *
 10 * 4] / (1024 * 1024 * 1024) = 164GB. 
 
 Since I have to use this dataset repeatedly, I have to cache it in memory.
 Some key parameter settings are: 
 spark.storage.fraction=0.6
 spark.driver.memory=30GB
 spark.executor.memory=310GB.
 
 But it failed on running a simple countByKey() and the error message is
 java.lang.OutOfMemoryError: Java heap space Does this mean a Spark
 cluster of 2400+GB memory cannot keep 164GB raw data in memory? 
 
 The codes of my program is as follows:
 
 def main(args: Array[String]):Unit = {
 val sc = new SparkContext(new SparkConfig());
 
 val rdd = sc.parallelize(0 until 10, 25600).map(i = (i, new
 Array[Int](43))).cache();
 println(The number of keys is  + rdd.countByKey());
 
 //some other operations following here ...
 }
 
 
 
 
 To figure out the issue, I evaluated the memory cost of key-value pairs and
 computed their memory cost using SizeOf.jar. The codes are as follows:
 
 val arr = new Array[Int](43);
 println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr)));
 
 val tuple = (1, arr.clone);
 println(SizeOf.humanReadable(SizeOf.deepSizeOf(tuple)));
 
 The output is:
 192.0b
 992.0b
 
 
 *Hard to believe, but it is true!! This result means, to store a key-value
 pair, Tuple2 needs more than 5+ times memory than the simplest method with
 array. Even though it may take 5+ times memory, its size is less than
 1000GB, which is still much less than the total memory size of my cluster,
 i.e., 2400+GB. I really do not understand why this happened.*
 
 BTW, if the number of pairs is 1 million, it works well. If the arr contains
 only 1 integer, to store a pair, Tuples needs around 10 times memory.
 
 So I have some questions:
 1. Why does Spark choose such a poor data structure, Tuple2, for key-value
 pairs? Is there any better data structure for storing (key, value) pairs
 with less memory cost ?
 2. Given a dataset with size of M, in general Spark how many times of memory
 to handle it?
 
 
 Best,
 Landmark
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com 
 (http://Nabble.com).
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 (mailto:user-unsubscr...@spark.apache.org)
 For additional commands, e-mail: user-h...@spark.apache.org 
 (mailto:user-h...@spark.apache.org)
 
 




How to union RDD and remove duplicated keys

2015-02-13 Thread Wang, Ningjun (LNG-NPV)

I have multiple RDD[(String, String)] that store (docId, docText) pairs, e.g.

rdd1:   (id1, Long text 1), (id2, Long text 2), (id3, Long text 3)
rdd2:   (id1, Long text 1 A), (id2, Long text 2 A)
rdd3:   (id1, Long text 1 B)

Then, I want to merge all RDDs. If there is duplicated docids, later RDD should 
overwrite previous RDD. In the above case, rdd2 will overwrite rddd1 for id1 
and id2, then rdd3 will overwrite rdd2 for id1. The final merged rdd should 
be

rddFinal: (id1, Long text 1 B), (id2, Long text 2 A), (id3, Long 
text 3)

Note that I have many such RDDs and each rdd have lots of elements. How can I 
do it efficiently?


Ningjun



Size exceeds Integer.MAX_VALUE exception when broadcasting large variable

2015-02-13 Thread soila
I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get the
following exception when the size of the broadcast variable exceeds 2GB. Any
ideas on how I can resolve this issue?

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at
org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
at
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
at
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to union RDD and remove duplicated keys

2015-02-13 Thread Boromir Widas
reducebyKey should work, but you need to define the ordering by using some
sort of index.

On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:



 I have multiple RDD[(String, String)] that store (docId, docText) pairs,
 e.g.



 rdd1:   (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text
 3”)

 rdd2:   (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”)

 rdd3:   (“id1”, “Long text 1 B”)



 Then, I want to merge all RDDs. If there is duplicated docids, later RDD
 should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1
 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final
 merged rdd should be



 rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”,
 “Long text 3”)



 Note that I have many such RDDs and each rdd have lots of elements. How
 can I do it efficiently?





 Ningjun





Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable

2015-02-13 Thread Sean Owen
I think you've hit the nail on the head. Since the serialization
ultimately creates a byte array, and arrays can have at most ~2
billion elements in the JVM, the broadcast can be at most ~2GB.

At that scale, you might consider whether you really have to broadcast
these values, or want to handle them as RDDs and join and so on.

Or consider whether you can break it up into several broadcasts?


On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote:
 I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get the
 following exception when the size of the broadcast variable exceeds 2GB. Any
 ideas on how I can resolve this issue?

 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99)
 at
 org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147)
 at
 org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
 at
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
 at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
 at
 org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle on joining two RDDs

2015-02-13 Thread Imran Rashid
yeah I thought the same thing at first too, I suggested something
equivalent w/ preservesPartitioning = true, but that isn't enough.  the
join is done by union-ing the two transformed rdds, which is very different
from the way it works under the hood in scala to enable narrow
dependencies.  It really needs a bigger change to pyspark.  I filed this
issue: https://issues.apache.org/jira/browse/SPARK-5785

(and the somewhat related issue about documentation:
https://issues.apache.org/jira/browse/SPARK-5786)

partitioning should still work in pyspark, you still need some notion of
distributing work, and the pyspark functions have a partitionFunc to decide
that.  But, I am not an authority on pyspark, so perhaps there are more
holes I'm not aware of ...

Imran

On Fri, Feb 13, 2015 at 8:36 AM, Karlson ksonsp...@siberie.de wrote:

 In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
 wouldn't it help to change the lines

 vs = rdd.map(lambda (k, v): (k, (1, v)))
 ws = other.map(lambda (k, v): (k, (2, v)))

 to

 vs = rdd.mapValues(lambda v: (1, v))
 ws = other.mapValues(lambda v: (2, v))

 ?
 As I understand, this would preserve the original partitioning.



 On 2015-02-13 12:43, Karlson wrote:

 Does that mean partitioning does not work in Python? Or does this only
 effect joining?

 On 2015-02-12 19:27, Davies Liu wrote:

 The feature works as expected in Scala/Java, but not implemented in
 Python.

 On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com
 wrote:

 I wonder if the issue is that these lines just need to add
 preservesPartitioning = true
 ?

 https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

 I am getting the feeling this is an issue w/ pyspark


 On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com
 wrote:


 ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.
 It
 could be that pyspark doesn't properly support narrow dependencies, or
 maybe
 you need to be more explicit about the partitioner.  I am looking into
 the
 pyspark api but you might have some better guesses here than I thought.

 My suggestion to do

 joinedRdd.getPartitions.foreach{println}

 was just to see if the partition was a NarrowCoGroupSplitDep or a
 ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
 fields
 are hidden deeper inside and are not user-visible.  But I think a
 better way
 (in scala, anyway) is to look at rdd.dependencies.  its a little
 tricky,
 though, you need to look deep into the lineage (example at the end).

 Sean -- yes it does require both RDDs have the same partitioner, but
 that
 should happen naturally if you just specify the same number of
 partitions,
 you'll get equal HashPartitioners.  There is a little difference in the
 scala  python api that I missed here.  For partitionBy in scala, you
 actually need to specify the partitioner, but not in python.  However I
 thought it would work like groupByKey, which does just take an int.


 Here's a code example in scala -- not sure what is available from
 python.
 Hopefully somebody knows a simpler way to confirm narrow dependencies??

  val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64)
 val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x -
 x}.groupByKey(64)
 scala d.partitioner == d2.partitioner
 res2: Boolean = true
 val joined = d.join(d2)
 val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x -
 x}.groupByKey(100)
 val badJoined = d.join(d3)

 d.setName(d)
 d2.setName(d2)
 d3.setName(d3)
 joined.setName(joined)
 badJoined.setName(badJoined)


 //unfortunatley, just looking at the immediate dependencies of joined
 
 badJoined is misleading, b/c join actually creates
 // one more step after the shuffle
 scala joined.dependencies
 res20: Seq[org.apache.spark.Dependency[_]] =
 List(org.apache.spark.OneToOneDependency@74751ac8)
 //even with the join that does require a shuffle, we still see a
 OneToOneDependency, but thats just a simple flatMap step
 scala badJoined.dependencies
 res21: Seq[org.apache.spark.Dependency[_]] =
 List(org.apache.spark.OneToOneDependency@1cf356cc)





  //so lets make a helper function to get all the dependencies
 recursively

 def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
   val deps = rdd.dependencies
   deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)}
 }


 //full dependencies of the good join

 scala flattenDeps(joined).foreach{println}
 (joined FlatMappedValuesRDD[9] at join at
 console:16,org.apache.spark.OneToOneDependency@74751ac8)
 (MappedValuesRDD[8] at join at
 console:16,org.apache.spark.OneToOneDependency@623264af)
 (CoGroupedRDD[7] at join at
 console:16,org.apache.spark.OneToOneDependency@5a704f86)
 (CoGroupedRDD[7] at join at
 console:16,org.apache.spark.OneToOneDependency@37514cd)
 (d ShuffledRDD[3] at groupByKey at
 console:12,org.apache.spark.ShuffleDependency@7ba8a080)
 (MappedRDD[2] at map at
 

Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable

2015-02-13 Thread Imran Rashid
unfortunately this is a known issue:
https://issues.apache.org/jira/browse/SPARK-1476

as Sean suggested, you need to think of some other way of doing the same
thing, even if its just breaking your one big broadcast var into a few
smaller ones

On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen so...@cloudera.com wrote:

 I think you've hit the nail on the head. Since the serialization
 ultimately creates a byte array, and arrays can have at most ~2
 billion elements in the JVM, the broadcast can be at most ~2GB.

 At that scale, you might consider whether you really have to broadcast
 these values, or want to handle them as RDDs and join and so on.

 Or consider whether you can break it up into several broadcasts?


 On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote:
  I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get
 the
  following exception when the size of the broadcast variable exceeds 2GB.
 Any
  ideas on how I can resolve this issue?
 
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
  org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99)
  at
  org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147)
  at
  org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
  at
  org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
  at
  org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
  at
  org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
  at
 
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
  at
 
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
  at
 
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
  at
 
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
  at
 
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
  at
 org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkSQL doesn't seem to like $'s in column names

2015-02-13 Thread Michael Armbrust
Try using `backticks` to escape non-standard characters.

On Fri, Feb 13, 2015 at 11:30 AM, Corey Nolet cjno...@gmail.com wrote:

 I don't remember Oracle ever enforcing that I couldn't include a $ in a
 column name, but I also don't thinking I've ever tried.

 When using sqlContext.sql(...), I have a SELECT * from myTable WHERE
 locations_$homeAddress = '123 Elm St'

 It's telling me $ is invalid. Is this a bug?



Spark standalone and HDFS 2.6

2015-02-13 Thread Grandl Robert
Hi guys,
Probably a dummy question. Do you know how to compile Spark 0.9 to easily 
integrate with HDFS 2.6.0 ? 

I was trying 
sbt/sbt -Pyarn -Phadoop-2.6 assembly  
ormvn -Dhadoop.version=2.6.0 -DskipTests clean package
but none of these approaches succeeded. 

Thanks,Robert


SparkSQL doesn't seem to like $'s in column names

2015-02-13 Thread Corey Nolet
I don't remember Oracle ever enforcing that I couldn't include a $ in a
column name, but I also don't thinking I've ever tried.

When using sqlContext.sql(...), I have a SELECT * from myTable WHERE
locations_$homeAddress = '123 Elm St'

It's telling me $ is invalid. Is this a bug?


New ColumnType For Decimal Caching

2015-02-13 Thread Manoj Samel
Thanks Michael for the pointer  Sorry for the delayed reply.

Taking a quick inventory of scope of change - Is the column type for
Decimal caching needed only in the caching layer (4 files
in org.apache.spark.sql.columnar - ColumnAccessor.scala,
ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala)

Or do other SQL components also need to be touched ?

Hoping for a quick feedback of top of your head ...

Thanks,



On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust mich...@databricks.com
wrote:

 You could add a new ColumnType
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
 .

 PRs welcome :)

 On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 As a test, I have same data loaded as another parquet - except with the 2
 decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
 the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
 time of uncached query.

 Would it be possible for Spark to store in-memory decimal in some form of
 long with decoration ?

 For the immediate future, is there any hook that we can use to provide
 custom caching / processing for the decimal type in RDD so other semantic
 does not changes ?

 Thanks,




 On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Could you share which data types are optimized in the in-memory storage
 and how are they optimized ?

 On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com
  wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com
  wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10
 partitions) with zero disk usage. Tasks for subsequent select on this 
 table
 in cache shows minimal overheads (GC, queueing, shuffle write etc. 
 etc.),
 so overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer =
 org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB. For
 mapPartition phase of query on uncached table shows input size of 351 
 MB.
 However, after the table is cached, the storage shows the cache size as
 12GB. So the in-memory representation seems much bigger than on-disk, 
 even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of
 12GB (full size of cache table) and takes twice the time as mapPartition
 for uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading the 
 data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel 
 manojsamelt...@gmail.com wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time
 slow since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is slower 
 than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,












Re: Columnar-Oriented RDDs

2015-02-13 Thread Michael Armbrust
Shark's in-memory code was ported to Spark SQL and is used by default when
you run .cache on a SchemaRDD or CACHE TABLE.

I'd also look at parquet which is more efficient and handles nested data
better.

On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf nightwolf...@gmail.com wrote:

 Hi all,

 I'd like to build/use column oriented RDDs in some of my Spark code. A
 normal Spark RDD is stored as row oriented object if I understand
 correctly.

 I'd like to leverage some of the advantages of a columnar memory format.
 Shark (used to) and SparkSQL uses a columnar storage format using primitive
 arrays for each column.

 I'd be interested to know more about this approach and how I could build
 my own custom columnar-oriented RDD which I can use outside of Spark SQL.

 Could anyone give me some pointers on where to look to do something like
 this, either from scratch or using whats there in the SparkSQL libs or
 elsewhere. I know Evan Chan in a presentation made mention of building a
 custom RDD of column-oriented blocks of data.

 Cheers,
 ~N



Has Spark 1.2.0 changed EC2 persistent-hdfs?

2015-02-13 Thread Joe Wass
I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour
appears to have changed.

My launch script is

spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5
--ebs-vol-size=1000 launch myproject

When I ssh into master I get:

$ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  2.9G  5.0G  37% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.3G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G   33M 1000G   1% /vol0

that /vol0 is the place I want (and assume) persistent-hdfs to go. But when
I look at the size I get:

$ persistent-hdfs/bin/start-all.sh
$ persistent-hdfs/bin/hadoop dfsadmin -report
Warning: $HADOOP_HOME is deprecated.

Configured Capacity: 42275430400 (39.37 GB)
Present Capacity: 2644878 (24.63 GB)
DFS Remaining: 26448601088 (24.63 GB)
DFS Used: 143360 (140 KB)
DFS Used%: 0%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0

-
Datanodes available: 5 (5 total, 0 dead)

Name: 10.46.11.156:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165372416 (2.95 GB)
DFS Remaining: 5289684992(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.41.51.155:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165364224 (2.95 GB)
DFS Remaining: 5289693184(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.38.30.254:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165249536 (2.95 GB)
DFS Remaining: 5289807872(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.204.134.84:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165343744 (2.95 GB)
DFS Remaining: 5289713664(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.33.15.134:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165356032 (2.95 GB)
DFS Remaining: 5289701376(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


That's tiny. My suspicions are aroused when I see:

$ ls /vol
persistent-hdfs

/vol is on the small /dev/xvda1 not the large EBS /dev/xvds

I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change
the volume:

property
  namehadoop.tmp.dir/name
  value/vol0/persistent-hdfs/value  !-- was /vol/persistent-hdfs --
/property

And then

persistent-hdfs/bin/stop-all.sh  persistent-hdfs/bin/start-all.sh

but when I do that, the persistent HDFS won't start for whatever reason. I
can't run

$ persistent-hdfs/bin/hadoop dfsadmin -report

15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 0 time(s).
15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 1 time(s).
15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 2 time(s).

So, it looks like I can't use the EBS for persistent-hdfs. I was doing it
before, so something must have changed in the last couple of weeks (last
time I was using 1.1.0).

Is this a bug? Has the behaviour of AWS changed? Am I doing something
stupid? How do I fix it?

Thanks in advance!

Joe


Re: Spark standalone and HDFS 2.6

2015-02-13 Thread Grandl Robert
I am trying to run BlinkDB(https://github.com/sameeragarwal/blinkdb) which 
seems to work only with Spark 0.9. However, if I want to access HDFS I need to 
compile Spark against Hadoop version which is running on my cluster(2.6.0). 
Hence, the versions problem ...



 On Friday, February 13, 2015 11:28 AM, Sean Owen so...@cloudera.com 
wrote:
   

 Oh right, you said Spark 0.9. Those profiles won't exist back then. I
don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles
were introduced later to fix up some compatibility. Why not use 1.2.1?

On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote:
 Thanks Sean for your prompt response.

 I was trying to compile as following:
 mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package

 but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and
 all hadoop jars are in .m2 repository.

 Do you have any idea what might happens ?

 Robert

 [WARNING] Class com.google.protobuf.Parser not found - continuing with a
 stub.
 [ERROR] error while loading RpcResponseHeaderProto, class file
 '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)'
 is broken
 (class java.lang.NullPointerException/null)
 [WARNING] one warning found
 [ERROR] one error found
 [INFO]
 
 [INFO] Reactor Summary:
 [INFO]
 [INFO] Spark Project Parent POM .. SUCCESS [2.537s]
 [INFO] Spark Project Core  FAILURE [25.917s]
 [INFO] Spark Project Bagel ... SKIPPED
 [INFO] Spark Project GraphX .. SKIPPED
 [INFO] Spark Project ML Library .. SKIPPED
 [INFO] Spark Project Streaming ... SKIPPED
 [INFO] Spark Project Tools ... SKIPPED
 [INFO] Spark Project REPL  SKIPPED
 [INFO] Spark Project Assembly  SKIPPED
 [INFO] Spark Project External Twitter  SKIPPED
 [INFO] Spark Project External Kafka .. SKIPPED
 [INFO] Spark Project External Flume .. SKIPPED
 [INFO] Spark Project External ZeroMQ . SKIPPED
 [INFO] Spark Project External MQTT ... SKIPPED
 [INFO] Spark Project Examples  SKIPPED
 [INFO]
 
 [INFO] BUILD FAILURE
 [INFO]
 
 [INFO] Total time: 30.002s
 [INFO] Finished at: Fri Feb 13 11:21:36 PST 2015
 [INFO] Final Memory: 49M/1226M
 [INFO]
 
 [WARNING] The requested profile hadoop-2.4 could not be activated because
 it does not exist.
 [ERROR] Failed to execute goal
 net.alchim31.maven:scala-maven-plugin:3.1.5:compile (scala-compile-first) on
 project spark-core_2.10: Execution scala-compile-first of goal
 net.alchim31.maven:scala-maven-plugin:3.1.5:compile failed. CompileFailed -
 [Help 1]



 On Friday, February 13, 2015 11:16 AM, Sean Owen so...@cloudera.com wrote:


 If you just need standalone mode, you don't need -Pyarn. There is no
 -Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set
 -Dhadoop.version=2.6.0. That should be it.

 If that still doesn't work, define doesn't succeed.

 On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert
 rgra...@yahoo.com.invalid wrote:
 Hi guys,

 Probably a dummy question. Do you know how to compile Spark 0.9 to easily
 integrate with HDFS 2.6.0 ?

 I was trying
 sbt/sbt -Pyarn -Phadoop-2.6 assembly
 or
 mvn -Dhadoop.version=2.6.0 -DskipTests clean package

 but none of these approaches succeeded.

 Thanks,
 Robert




 
 

 On Friday, February 13, 2015 11:28 AM, Sean Owen so...@cloudera.com 
wrote:
   

 Oh right, you said Spark 0.9. Those profiles won't exist back then. I
don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles
were introduced later to fix up some compatibility. Why not use 1.2.1?

On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote:
 Thanks Sean for your prompt response.

 I was trying to compile as following:
 mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package

 but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and
 all hadoop jars are in .m2 repository.

 Do you have any idea what might happens ?

 Robert

 [WARNING] Class com.google.protobuf.Parser not found - continuing with a
 stub.
 [ERROR] error while loading RpcResponseHeaderProto, class file
 '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)'
 is broken
 (class 

Re: NaiveBayes classifier causes ShuffleDependency class cast exception

2015-02-13 Thread Tathagata Das
You cannot have two Spark Contexts in the same JVM active at the same time.
Just create one SparkContext and then use it for both purpose.

TD

On Fri, Feb 6, 2015 at 8:49 PM, VISHNU SUBRAMANIAN 
johnfedrickena...@gmail.com wrote:

 Can you try creating just a single spark context  and then try your code.
 If you want to use it for streaming pass the same sparkcontext object
 instead of conf.

 Note: Instead of just replying to me , try to use reply to all so that the
 post is visible for the community . That way you can expect immediate
 responses.


 On Fri, Feb 6, 2015 at 6:09 AM, aanilpala aanilp...@gmail.com wrote:

 I have the following code:


 SparkConf conf = new
 SparkConf().setAppName(streamer).setMaster(local[2]);
 conf.set(spark.driver.allowMultipleContexts, true);
 JavaStreamingContext ssc = new JavaStreamingContext(conf, new
 Duration(batch_interval));
 ssc.checkpoint(/tmp/spark/checkpoint);

 SparkConf conf2 = new
 SparkConf().setAppName(classifier).setMaster(local[1]);
 conf2.set(spark.driver.allowMultipleContexts, true);
 JavaSparkContext sc = new JavaSparkContext(conf);

 JavaReceiverInputDStreamString stream =
 ssc.socketTextStream(localhost, );

 // String to Tuple3 Conversion
 JavaDStreamTuple3lt;Long, String, String tuple_stream =
 stream.map(new FunctionString, Tuple3lt;Long, String, String() {
  ... });

 JavaPairDStreamInteger, DictionaryEntry
 raw_dictionary_stream =
 tuple_stream.filter(new FunctionTuple3lt;Long, String,String,
 Boolean()
 {

 @Override
 public Boolean call(Tuple3Long, String,String
 tuple) throws Exception {
 if((tuple._1()/Time.scaling_factor %
 training_interval)  training_dur)
 NaiveBayes.train(sc.parallelize(training_set).rdd());

 return true;
 }


 }).

 I am working on a text mining project and I want to use
 NaiveBayesClassifier
 of MLlib to classify some stream items. So, I have two Spark contexts one
 of
 which is a streaming context. The call to NaiveBayes.train causes the
 following exception.

 Any ideas?


  Exception in thread main org.apache.spark.SparkException: Job aborted
 due
 to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
 Lost task 0.0 in stage 0.0 (TID 0, localhost):
 java.lang.ClassCastException:
 org.apache.spark.SparkContext$$anonfun$runJob$4 cannot be cast to
 org.apache.spark.ShuffleDependency
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at

 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 

Re: Counters in Spark

2015-02-13 Thread Imran Rashid
this is more-or-less the best you can do now, but as has been pointed out,
accumulators don't quite fit the bill for counters.  There is an open issue
to do something better, but no progress on that so far

https://issues.apache.org/jira/browse/SPARK-603

On Fri, Feb 13, 2015 at 11:12 AM, Mark Hamstra m...@clearstorydata.com
wrote:

 Except that transformations don't have an exactly-once guarantee, so this
 way of doing counters may produce different answers across various forms of
 failures and speculative execution.

 On Fri, Feb 13, 2015 at 8:56 AM, Sean McNamara 
 sean.mcnam...@webtrends.com wrote:

  .map is just a transformation, so no work will actually be performed
 until something takes action against it.  Try adding a .count(), like so:

  inputRDD.map { x = {
  counter += 1
} }.count()

  In case it is helpful, here are the docs on what exactly the
 transformations and actions are:
 http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations
 http://spark.apache.org/docs/1.2.0/programming-guide.html#actions

  Cheers,

  Sean


  On Feb 13, 2015, at 9:50 AM, nitinkak001 nitinkak...@gmail.com wrote:

 I am trying to implement counters in Spark and I guess Accumulators are
 the
 way to do it.

 My motive is to update a counter in map function and access/reset it in
 the
 driver code. However the /println/ statement at the end still yields value
 0(It should 9). Am I doing something wrong?

 def main(args : Array[String]){

val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching)
val sc = new SparkContext(conf)
var counter = sc.accumulable(0, Counter)
var inputFilePath = args(0)
val inputRDD = sc.textFile(inputFilePath)

inputRDD.map { x = {
  counter += 1
} }
println(counter.value)
 }




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Spark standalone and HDFS 2.6

2015-02-13 Thread Sean Owen
If you just need standalone mode, you don't need -Pyarn. There is no
-Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set
-Dhadoop.version=2.6.0. That should be it.

If that still doesn't work, define doesn't succeed.

On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert
rgra...@yahoo.com.invalid wrote:
 Hi guys,

 Probably a dummy question. Do you know how to compile Spark 0.9 to easily
 integrate with HDFS 2.6.0 ?

 I was trying
 sbt/sbt -Pyarn -Phadoop-2.6 assembly
 or
 mvn -Dhadoop.version=2.6.0 -DskipTests clean package

 but none of these approaches succeeded.

 Thanks,
 Robert

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Interact with streams in a non-blocking way

2015-02-13 Thread Tathagata Das
Here is an example of how you can do. Lets say myDStream contains the
data that you may want to asynchornously query, say using, Spark SQL.

val sqlContext = new SqlContext(streamingContext.sparkContext)

myDStream.foreachRDD { rdd =   // rdd is a RDD of case class
   sqlContext.registerRDDAsTable(rdd, mytable)
}

This will make sure that the table mytable always refers to the latest
RDD generated by the DStream.
Then from a diffrent thread you can asynchronously query

sqlContext.sql(select * from mytable)

Hope this helps.

TD



On Fri, Feb 13, 2015 at 3:59 AM, Sean Owen so...@cloudera.com wrote:

 Sure it's possible, but you would use Streaming to update some shared
 state, and create another service that accessed that shared state too.

 On Fri, Feb 13, 2015 at 11:57 AM, Tamas Jambor jambo...@gmail.com wrote:
  Thanks for the reply, I am trying to setup a streaming as a service
  approach, using the framework that is used for spark-jobserver. for that
 I
  would need to handle asynchronous  operations that are initiated from
  outside of the stream. Do you think it is not possible?
 
  On Fri Feb 13 2015 at 10:14:18 Sean Owen so...@cloudera.com wrote:
 
  You call awaitTermination() in the main thread, and indeed it blocks
  there forever. From there Spark Streaming takes over, and is invoking
  the operations you set up. Your operations have access to the data of
  course. That's the model; you don't make external threads that reach
  in to Spark Streaming's objects, but can easily create operations that
  take whatever actions you want and invoke them in Streaming.
 
  On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote:
   Hi all,
  
   I am trying to come up with a workflow where I can query streams
   asynchronously. The problem I have is a ssc.awaitTermination() line
   blocks
   the whole thread, so it is not straightforward to me whether it is
   possible
   to get hold of objects from streams once they are started. any
   suggestion on
   what is the best way to implement this?
  
   thanks,
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark standalone and HDFS 2.6

2015-02-13 Thread Sean Owen
OK, from scanning the pom.xml, I think you would try:

-Pyarn -Dhadoop.version=2.6.0

If it doesn't package or pass tests, then I'd assume it's not supported :(

On Fri, Feb 13, 2015 at 7:33 PM, Grandl Robert rgra...@yahoo.com wrote:
 I am trying to run BlinkDB(https://github.com/sameeragarwal/blinkdb) which
 seems to work only with Spark 0.9. However, if I want to access HDFS I need
 to compile Spark against Hadoop version which is running on my
 cluster(2.6.0). Hence, the versions problem ...



 On Friday, February 13, 2015 11:28 AM, Sean Owen so...@cloudera.com wrote:


 Oh right, you said Spark 0.9. Those profiles won't exist back then. I
 don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles
 were introduced later to fix up some compatibility. Why not use 1.2.1?

 On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote:
 Thanks Sean for your prompt response.

 I was trying to compile as following:
 mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package

 but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly,
 and
 all hadoop jars are in .m2 repository.

 Do you have any idea what might happens ?

 Robert

 [WARNING] Class com.google.protobuf.Parser not found - continuing with a
 stub.
 [ERROR] error while loading RpcResponseHeaderProto, class file

 '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)'
 is broken
 (class java.lang.NullPointerException/null)
 [WARNING] one warning found
 [ERROR] one error found
 [INFO]
 
 [INFO] Reactor Summary:
 [INFO]
 [INFO] Spark Project Parent POM .. SUCCESS
 [2.537s]
 [INFO] Spark Project Core  FAILURE
 [25.917s]
 [INFO] Spark Project Bagel ... SKIPPED
 [INFO] Spark Project GraphX .. SKIPPED
 [INFO] Spark Project ML Library .. SKIPPED
 [INFO] Spark Project Streaming ... SKIPPED
 [INFO] Spark Project Tools ... SKIPPED
 [INFO] Spark Project REPL  SKIPPED
 [INFO] Spark Project Assembly  SKIPPED
 [INFO] Spark Project External Twitter  SKIPPED
 [INFO] Spark Project External Kafka .. SKIPPED
 [INFO] Spark Project External Flume .. SKIPPED
 [INFO] Spark Project External ZeroMQ . SKIPPED
 [INFO] Spark Project External MQTT ... SKIPPED
 [INFO] Spark Project Examples  SKIPPED
 [INFO]
 
 [INFO] BUILD FAILURE
 [INFO]
 
 [INFO] Total time: 30.002s
 [INFO] Finished at: Fri Feb 13 11:21:36 PST 2015
 [INFO] Final Memory: 49M/1226M
 [INFO]
 
 [WARNING] The requested profile hadoop-2.4 could not be activated
 because
 it does not exist.
 [ERROR] Failed to execute goal
 net.alchim31.maven:scala-maven-plugin:3.1.5:compile (scala-compile-first)
 on
 project spark-core_2.10: Execution scala-compile-first of goal
 net.alchim31.maven:scala-maven-plugin:3.1.5:compile failed. CompileFailed
 -
 [Help 1]



 On Friday, February 13, 2015 11:16 AM, Sean Owen so...@cloudera.com
 wrote:


 If you just need standalone mode, you don't need -Pyarn. There is no
 -Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set
 -Dhadoop.version=2.6.0. That should be it.

 If that still doesn't work, define doesn't succeed.

 On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert
 rgra...@yahoo.com.invalid wrote:
 Hi guys,

 Probably a dummy question. Do you know how to compile Spark 0.9 to easily
 integrate with HDFS 2.6.0 ?

 I was trying
 sbt/sbt -Pyarn -Phadoop-2.6 assembly
 or
 mvn -Dhadoop.version=2.6.0 -DskipTests clean package

 but none of these approaches succeeded.

 Thanks,
 Robert







 On Friday, February 13, 2015 11:28 AM, Sean Owen so...@cloudera.com wrote:


 Oh right, you said Spark 0.9. Those profiles won't exist back then. I
 don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles
 were introduced later to fix up some compatibility. Why not use 1.2.1?

 On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote:
 Thanks Sean for your prompt response.

 I was trying to compile as following:
 mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package

 but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly,
 and
 all hadoop jars are in .m2 repository.

 Do you have any idea what might happens ?

 Robert

 [WARNING] Class com.google.protobuf.Parser not found - continuing with a
 stub.
 [ERROR] error while loading 

Re: Has Spark 1.2.0 changed EC2 persistent-hdfs?

2015-02-13 Thread Joe Wass
Looks like this is caused by issue SPARK-5008:
https://issues.apache.org/jira/browse/SPARK-5008

On 13 February 2015 at 19:04, Joe Wass jw...@crossref.org wrote:

 I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour
 appears to have changed.

 My launch script is

 spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5
 --ebs-vol-size=1000 launch myproject

 When I ssh into master I get:

 $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  2.9G  5.0G  37% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.3G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G   33M 1000G   1% /vol0

 that /vol0 is the place I want (and assume) persistent-hdfs to go. But
 when I look at the size I get:

 $ persistent-hdfs/bin/start-all.sh
 $ persistent-hdfs/bin/hadoop dfsadmin -report
 Warning: $HADOOP_HOME is deprecated.

 Configured Capacity: 42275430400 (39.37 GB)
 Present Capacity: 2644878 (24.63 GB)
 DFS Remaining: 26448601088 (24.63 GB)
 DFS Used: 143360 (140 KB)
 DFS Used%: 0%
 Under replicated blocks: 0
 Blocks with corrupt replicas: 0
 Missing blocks: 0

 -
 Datanodes available: 5 (5 total, 0 dead)

 Name: 10.46.11.156:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165372416 (2.95 GB)
 DFS Remaining: 5289684992(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 Name: 10.41.51.155:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165364224 (2.95 GB)
 DFS Remaining: 5289693184(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 Name: 10.38.30.254:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165249536 (2.95 GB)
 DFS Remaining: 5289807872(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 Name: 10.204.134.84:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165343744 (2.95 GB)
 DFS Remaining: 5289713664(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 Name: 10.33.15.134:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165356032 (2.95 GB)
 DFS Remaining: 5289701376(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 That's tiny. My suspicions are aroused when I see:

 $ ls /vol
 persistent-hdfs

 /vol is on the small /dev/xvda1 not the large EBS /dev/xvds

 I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change
 the volume:

 property
   namehadoop.tmp.dir/name
   value/vol0/persistent-hdfs/value  !-- was /vol/persistent-hdfs --
 /property

 And then

 persistent-hdfs/bin/stop-all.sh  persistent-hdfs/bin/start-all.sh

 but when I do that, the persistent HDFS won't start for whatever reason. I
 can't run

 $ persistent-hdfs/bin/hadoop dfsadmin -report

 15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server:
 ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
 Already tried 0 time(s).
 15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server:
 ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
 Already tried 1 time(s).
 15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server:
 ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
 Already tried 2 time(s).

 So, it looks like I can't use the EBS for persistent-hdfs. I was doing it
 before, so something must have changed in the last couple of weeks (last
 time I was using 1.1.0).

 Is this a bug? Has the behaviour of AWS changed? Am I doing something
 stupid? How do I fix it?

 Thanks in advance!

 Joe





RE: How to union RDD and remove duplicated keys

2015-02-13 Thread Wang, Ningjun (LNG-NPV)
Do you mean first union all RDDs together and then do a reduceByKey()? Suppose 
my unioned RDD is

rdd :  (“id1”, “text 1”),  (“id1”, “text 2”), (“id1”, “text 3”)
How can I use reduceByKey to return  (“id1”, “text 3”) ? I mean to take the 
last one entry for the same key
Code snippet is appreciated because I am new to Spark.
Ningjun

From: Boromir Widas [mailto:vcsub...@gmail.com]
Sent: Friday, February 13, 2015 1:28 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: How to union RDD and remove duplicated keys

reducebyKey should work, but you need to define the ordering by using some sort 
of index.

On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote:

I have multiple RDD[(String, String)] that store (docId, docText) pairs, e.g.

rdd1:   (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text 3”)
rdd2:   (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”)
rdd3:   (“id1”, “Long text 1 B”)

Then, I want to merge all RDDs. If there is duplicated docids, later RDD should 
overwrite previous RDD. In the above case, rdd2 will overwrite rddd1 for “id1” 
and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final merged rdd should 
be

rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”, “Long 
text 3”)

Note that I have many such RDDs and each rdd have lots of elements. How can I 
do it efficiently?


Ningjun




Re: Spark standalone and HDFS 2.6

2015-02-13 Thread Grandl Robert
Thanks Sean for your prompt response. 

I was trying to compile as following:
mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package
but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and 
all hadoop jars are in .m2 repository.
Do you have any idea what might happens ?
Robert

[WARNING] Class com.google.protobuf.Parser not found - continuing with a stub.
[ERROR] error while loading RpcResponseHeaderProto, class file 
'/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)'
 is broken
(class java.lang.NullPointerException/null)
[WARNING] one warning found
[ERROR] one error found
[INFO] 
[INFO] Reactor Summary:
[INFO] 
[INFO] Spark Project Parent POM .. SUCCESS [2.537s]
[INFO] Spark Project Core  FAILURE [25.917s]
[INFO] Spark Project Bagel ... SKIPPED
[INFO] Spark Project GraphX .. SKIPPED
[INFO] Spark Project ML Library .. SKIPPED
[INFO] Spark Project Streaming ... SKIPPED
[INFO] Spark Project Tools ... SKIPPED
[INFO] Spark Project REPL  SKIPPED
[INFO] Spark Project Assembly  SKIPPED
[INFO] Spark Project External Twitter  SKIPPED
[INFO] Spark Project External Kafka .. SKIPPED
[INFO] Spark Project External Flume .. SKIPPED
[INFO] Spark Project External ZeroMQ . SKIPPED
[INFO] Spark Project External MQTT ... SKIPPED
[INFO] Spark Project Examples  SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 30.002s
[INFO] Finished at: Fri Feb 13 11:21:36 PST 2015
[INFO] Final Memory: 49M/1226M
[INFO] 
[WARNING] The requested profile hadoop-2.4 could not be activated because it 
does not exist.
[ERROR] Failed to execute goal 
net.alchim31.maven:scala-maven-plugin:3.1.5:compile (scala-compile-first) on 
project spark-core_2.10: Execution scala-compile-first of goal 
net.alchim31.maven:scala-maven-plugin:3.1.5:compile failed. CompileFailed - 
[Help 1]

 

 On Friday, February 13, 2015 11:16 AM, Sean Owen so...@cloudera.com 
wrote:
   

 If you just need standalone mode, you don't need -Pyarn. There is no
-Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set
-Dhadoop.version=2.6.0. That should be it.

If that still doesn't work, define doesn't succeed.

On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert
rgra...@yahoo.com.invalid wrote:
 Hi guys,

 Probably a dummy question. Do you know how to compile Spark 0.9 to easily
 integrate with HDFS 2.6.0 ?

 I was trying
 sbt/sbt -Pyarn -Phadoop-2.6 assembly
 or
 mvn -Dhadoop.version=2.6.0 -DskipTests clean package

 but none of these approaches succeeded.

 Thanks,
 Robert




Re: Spark standalone and HDFS 2.6

2015-02-13 Thread Sean Owen
Oh right, you said Spark 0.9. Those profiles won't exist back then. I
don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles
were introduced later to fix up some compatibility. Why not use 1.2.1?

On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote:
 Thanks Sean for your prompt response.

 I was trying to compile as following:
 mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package

 but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and
 all hadoop jars are in .m2 repository.

 Do you have any idea what might happens ?

 Robert

 [WARNING] Class com.google.protobuf.Parser not found - continuing with a
 stub.
 [ERROR] error while loading RpcResponseHeaderProto, class file
 '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)'
 is broken
 (class java.lang.NullPointerException/null)
 [WARNING] one warning found
 [ERROR] one error found
 [INFO]
 
 [INFO] Reactor Summary:
 [INFO]
 [INFO] Spark Project Parent POM .. SUCCESS [2.537s]
 [INFO] Spark Project Core  FAILURE [25.917s]
 [INFO] Spark Project Bagel ... SKIPPED
 [INFO] Spark Project GraphX .. SKIPPED
 [INFO] Spark Project ML Library .. SKIPPED
 [INFO] Spark Project Streaming ... SKIPPED
 [INFO] Spark Project Tools ... SKIPPED
 [INFO] Spark Project REPL  SKIPPED
 [INFO] Spark Project Assembly  SKIPPED
 [INFO] Spark Project External Twitter  SKIPPED
 [INFO] Spark Project External Kafka .. SKIPPED
 [INFO] Spark Project External Flume .. SKIPPED
 [INFO] Spark Project External ZeroMQ . SKIPPED
 [INFO] Spark Project External MQTT ... SKIPPED
 [INFO] Spark Project Examples  SKIPPED
 [INFO]
 
 [INFO] BUILD FAILURE
 [INFO]
 
 [INFO] Total time: 30.002s
 [INFO] Finished at: Fri Feb 13 11:21:36 PST 2015
 [INFO] Final Memory: 49M/1226M
 [INFO]
 
 [WARNING] The requested profile hadoop-2.4 could not be activated because
 it does not exist.
 [ERROR] Failed to execute goal
 net.alchim31.maven:scala-maven-plugin:3.1.5:compile (scala-compile-first) on
 project spark-core_2.10: Execution scala-compile-first of goal
 net.alchim31.maven:scala-maven-plugin:3.1.5:compile failed. CompileFailed -
 [Help 1]



 On Friday, February 13, 2015 11:16 AM, Sean Owen so...@cloudera.com wrote:


 If you just need standalone mode, you don't need -Pyarn. There is no
 -Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set
 -Dhadoop.version=2.6.0. That should be it.

 If that still doesn't work, define doesn't succeed.

 On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert
 rgra...@yahoo.com.invalid wrote:
 Hi guys,

 Probably a dummy question. Do you know how to compile Spark 0.9 to easily
 integrate with HDFS 2.6.0 ?

 I was trying
 sbt/sbt -Pyarn -Phadoop-2.6 assembly
 or
 mvn -Dhadoop.version=2.6.0 -DskipTests clean package

 but none of these approaches succeeded.

 Thanks,
 Robert



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable

2015-02-13 Thread Soila Pertet Kavulya
Thanks Sean and Imran,

I'll try splitting the broadcast variable into smaller ones.

I had tried a regular join but it was failing due to high garbage
collection overhead during the shuffle. One of the RDDs is very large
and has a skewed distribution where a handful of keys account for 90%
of the data. Do you have any pointers on how to handle skewed key
distributions during a join.

Soila

On Fri, Feb 13, 2015 at 10:49 AM, Imran Rashid iras...@cloudera.com wrote:
 unfortunately this is a known issue:
 https://issues.apache.org/jira/browse/SPARK-1476

 as Sean suggested, you need to think of some other way of doing the same
 thing, even if its just breaking your one big broadcast var into a few
 smaller ones

 On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen so...@cloudera.com wrote:

 I think you've hit the nail on the head. Since the serialization
 ultimately creates a byte array, and arrays can have at most ~2
 billion elements in the JVM, the broadcast can be at most ~2GB.

 At that scale, you might consider whether you really have to broadcast
 these values, or want to handle them as RDDs and join and so on.

 Or consider whether you can break it up into several broadcasts?


 On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote:
  I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get
  the
  following exception when the size of the broadcast variable exceeds 2GB.
  Any
  ideas on how I can resolve this issue?
 
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
  at
  org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
  org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
  org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99)
  at
  org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147)
  at
  org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
  at
  org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
  at
 
  org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
  at
  org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
  at
 
  org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
  at
 
  org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
  at
 
  org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
  at
 
  org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
  at
 
  org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
  at
  org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Learning GraphX Questions

2015-02-13 Thread Matthew Bucci
Hello, 

I was looking at GraphX as I believe it can be useful in my research on
temporal data and I had a number of questions about the system:

1) How do you actually run programs in GraphX? At the moment I've been doing
everything live through the shell, but I'd obviously like to be able to work
on it by writing and running scripts. 

2) Is there a way to check the status of the partitions of a graph? For
example, I want to determine for starters if the number of partitions
requested are always made, like if I ask for 8 partitions but only have 4
cores what happens?

3) Would I be able to partition by vertex instead of edges, even if I had to
write it myself? I know partitioning by edges is favored in a majority of
the cases, but for the sake of research I'd like to be able to do both.

4) Is there a better way to time processes outside of using built-in unix
timing through the logs or something?

Thank you very much for your insight,
Matthew Bucci



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Learning-GraphX-Questions-tp21649.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why are there different parts in my CSV?

2015-02-13 Thread Su She
Thanks Akhil for the suggestion, it is now only giving me one part - .
Is there anyway I can just create a file rather than a directory? It
doesn't seem like there is just a saveAsTextFile option for
JavaPairRecieverDstream.

Also, for the copy/merge api, how would I add that to my spark job?

Thanks Akhil!

Best,

Su

On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 For streaming application, for every batch it will create a new directory
 and puts the data in it. If you don't want to have multiple files inside
 the directory as part- then you can do a repartition before the saveAs*
 call.

 messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);


 Thanks
 Best Regards

 On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 I am writing simple word counts to hdfs using
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);

 1) However, each 2 seconds I getting a new *directory *that is titled as
 a csv. So i'll have test.csv, which will be a directory that has two files
 inside of it called part-0 and part 1 (something like that). This
 obv makes it very hard for me to read the data stored in the csv files. I
 am wondering if there is a better way to store the JavaPairRecieverDStream
 and JavaPairDStream?

 2) I know there is a copy/merge hadoop api for merging files...can this
 be done inside java? I am not sure the logic behind this api if I am using
 spark streaming which is constantly making new files.

 Thanks a lot for the help!





Re: pyspark: Java null pointer exception when accessing broadcast variables

2015-02-13 Thread Davies Liu
This is fixed in 1.2.1,  could you upgrade to 1.2.1?

On Thu, Feb 12, 2015 at 4:55 AM, Rok Roskar rokros...@gmail.com wrote:
 Hi again,

 I narrowed down the issue a bit more -- it seems to have to do with the Kryo
 serializer. When I use it, then this results in a Null Pointer:

 rdd = sc.parallelize(range(10))
 d = {}
 from random import random
 for i in range(10) :
 d[i] = random()

 rdd.map(lambda x: d[x]).collect()

 ---
 Py4JJavaError Traceback (most recent call last)
 ipython-input-97-7cd5df24206c in module()
  1 rdd.map(lambda x: d[x]).collect()

 /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in
 collect(self)
 674 
 675 with SCCallSiteSync(self.context) as css:
 -- 676 bytesInJava = self._jrdd.collect().iterator()
 677 return
 list(self._collect_iterator_through_file(bytesInJava))
 678

 /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer, self.gateway_client,
 -- 538 self.target_id, self.name)
 539
 540 for temp_arg in temp_args:

 /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o768.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0
 (TID 87, e1305.hpc-lca.ethz.ch): java.lang.NullPointerException
 at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 If I use a dictionary 

Re: Columnar-Oriented RDDs

2015-02-13 Thread Koert Kuipers
i wrote a proof of concept to automatically store any RDD of tuples or case
classes in columar format using arrays (and strongly typed, so you get the
benefit of primitive arrays). see:
https://github.com/tresata/spark-columnar

On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust mich...@databricks.com
wrote:

 Shark's in-memory code was ported to Spark SQL and is used by default when
 you run .cache on a SchemaRDD or CACHE TABLE.

 I'd also look at parquet which is more efficient and handles nested data
 better.

 On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf nightwolf...@gmail.com
 wrote:

 Hi all,

 I'd like to build/use column oriented RDDs in some of my Spark code. A
 normal Spark RDD is stored as row oriented object if I understand
 correctly.

 I'd like to leverage some of the advantages of a columnar memory format.
 Shark (used to) and SparkSQL uses a columnar storage format using primitive
 arrays for each column.

 I'd be interested to know more about this approach and how I could build
 my own custom columnar-oriented RDD which I can use outside of Spark SQL.

 Could anyone give me some pointers on where to look to do something like
 this, either from scratch or using whats there in the SparkSQL libs or
 elsewhere. I know Evan Chan in a presentation made mention of building a
 custom RDD of column-oriented blocks of data.

 Cheers,
 ~N





SQLContext.applySchema strictness

2015-02-13 Thread Justin Pihony
Per the documentation:

  It is important to make sure that the structure of every Row of the
provided RDD matches the provided schema. Otherwise, there will be runtime
exception.

However, it appears that this is not being enforced. 

import org.apache.spark.sql._
val sqlContext = new SqlContext(sc)
val struct = StructType(List(StructField(test, BooleanType, true)))
val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff)))
val schemaData = sqlContext.applySchema(myData, struct) //No error
schemaData.collect()(0).getBoolean(0) //Only now will I receive an error

Is this expected or a bug?

Thanks,
Justin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to union RDD and remove duplicated keys

2015-02-13 Thread Boromir Widas
I have not run the following, but will be on these lines -

rdd.zipWithIndex().map(x = (x._1._1, (x._1._2, x._2))).reduceByKey((a, b)
= { if(a._2  b._2) a else b }).map(x = (x._1, x._2._1))

On Fri, Feb 13, 2015 at 3:27 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Do you mean first union all RDDs together and then do a reduceByKey()?
 Suppose my unioned RDD is



 rdd :  (“id1”, “text 1”),  (“id1”, “text 2”), (“id1”, “text 3”)

 How can I use reduceByKey to return  (“id1”, “text 3”) ? I mean to take
 the last one entry for the same key

 Code snippet is appreciated because I am new to Spark.

 Ningjun



 *From:* Boromir Widas [mailto:vcsub...@gmail.com]
 *Sent:* Friday, February 13, 2015 1:28 PM
 *To:* Wang, Ningjun (LNG-NPV)
 *Cc:* user@spark.apache.org
 *Subject:* Re: How to union RDD and remove duplicated keys



 reducebyKey should work, but you need to define the ordering by using some
 sort of index.



 On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:



 I have multiple RDD[(String, String)] that store (docId, docText) pairs,
 e.g.



 rdd1:   (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text
 3”)

 rdd2:   (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”)

 rdd3:   (“id1”, “Long text 1 B”)



 Then, I want to merge all RDDs. If there is duplicated docids, later RDD
 should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1
 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final
 merged rdd should be



 rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”,
 “Long text 3”)



 Note that I have many such RDDs and each rdd have lots of elements. How
 can I do it efficiently?





 Ningjun







Re: SQLContext.applySchema strictness

2015-02-13 Thread Yin Huai
Hi Justin,

It is expected. We do not check if the provided schema matches rows since
all rows need to be scanned to give a correct answer.

Thanks,

Yin

On Fri, Feb 13, 2015 at 1:33 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 Per the documentation:

   It is important to make sure that the structure of every Row of the
 provided RDD matches the provided schema. Otherwise, there will be runtime
 exception.

 However, it appears that this is not being enforced.

 import org.apache.spark.sql._
 val sqlContext = new SqlContext(sc)
 val struct = StructType(List(StructField(test, BooleanType, true)))
 val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff)))
 val schemaData = sqlContext.applySchema(myData, struct) //No error
 schemaData.collect()(0).getBoolean(0) //Only now will I receive an error

 Is this expected or a bug?

 Thanks,
 Justin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SQLContext.applySchema strictness

2015-02-13 Thread Justin Pihony
OK, but what about on an action, like collect()? Shouldn't it be able to
determine the correctness at that time?

On Fri, Feb 13, 2015 at 4:49 PM, Yin Huai yh...@databricks.com wrote:

 Hi Justin,

 It is expected. We do not check if the provided schema matches rows since
 all rows need to be scanned to give a correct answer.

 Thanks,

 Yin

 On Fri, Feb 13, 2015 at 1:33 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 Per the documentation:

   It is important to make sure that the structure of every Row of the
 provided RDD matches the provided schema. Otherwise, there will be runtime
 exception.

 However, it appears that this is not being enforced.

 import org.apache.spark.sql._
 val sqlContext = new SqlContext(sc)
 val struct = StructType(List(StructField(test, BooleanType, true)))
 val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff)))
 val schemaData = sqlContext.applySchema(myData, struct) //No error
 schemaData.collect()(0).getBoolean(0) //Only now will I receive an error

 Is this expected or a bug?

 Thanks,
 Justin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: SparkSQL doesn't seem to like $'s in column names

2015-02-13 Thread Corey Nolet
This doesn't seem to have helped.

On Fri, Feb 13, 2015 at 2:51 PM, Michael Armbrust mich...@databricks.com
wrote:

 Try using `backticks` to escape non-standard characters.

 On Fri, Feb 13, 2015 at 11:30 AM, Corey Nolet cjno...@gmail.com wrote:

 I don't remember Oracle ever enforcing that I couldn't include a $ in a
 column name, but I also don't thinking I've ever tried.

 When using sqlContext.sql(...), I have a SELECT * from myTable WHERE
 locations_$homeAddress = '123 Elm St'

 It's telling me $ is invalid. Is this a bug?





Re: Boolean values as predicates in SQL string

2015-02-13 Thread Corey Nolet
Nevermind- I think I may have had a schema-related issue (sometimes
booleans were represented as string and sometimes as raw booleans but when
I populated the schema one or the other was chosen.



On Fri, Feb 13, 2015 at 8:03 PM, Corey Nolet cjno...@gmail.com wrote:

 Here are the results of a few different SQL strings (let's assume the
 schemas are valid for the data types used):

 SELECT * from myTable where key1 = true  - no filters are pushed to my
 PrunedFilteredScan
 SELECT * from myTable where key1 = true and key2 = 5 - 1 filter (key2) is
 pushed to my PrunedFilteredScan
 SELECT * from myTable where key1 = false and key3 = 'val3' - 1 filter
 (key3) is pushed to my PrunedFilteredScan
 SELECT * from myTable where key1 = 'false' - (as expected) it passed down
 an EqualTo Filter that's matching 'false' as a string.


 I was going to file a bug for this but before I did that I wanted to make
 sure I wasn't missing something (like a special way to handle booleans).
 I'm assuming the filter is being optimized out because it's being assigned
 the literal true but in this case I really want to match against a
 boolean datatype which is true.





Re: SparkException: Task not serializable - Jackson Json

2015-02-13 Thread jamckelvey
I'm having the same problem with the same sample code. Any progress on this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkException-Task-not-serializable-Jackson-Json-tp21347p21651.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Boolean values as predicates in SQL string

2015-02-13 Thread Corey Nolet
Here are the results of a few different SQL strings (let's assume the
schemas are valid for the data types used):

SELECT * from myTable where key1 = true  - no filters are pushed to my
PrunedFilteredScan
SELECT * from myTable where key1 = true and key2 = 5 - 1 filter (key2) is
pushed to my PrunedFilteredScan
SELECT * from myTable where key1 = false and key3 = 'val3' - 1 filter
(key3) is pushed to my PrunedFilteredScan
SELECT * from myTable where key1 = 'false' - (as expected) it passed down
an EqualTo Filter that's matching 'false' as a string.


I was going to file a bug for this but before I did that I wanted to make
sure I wasn't missing something (like a special way to handle booleans).
I'm assuming the filter is being optimized out because it's being assigned
the literal true but in this case I really want to match against a
boolean datatype which is true.


Re: Learning GraphX Questions

2015-02-13 Thread Ankur Dave
At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote:
 1) How do you actually run programs in GraphX? At the moment I've been doing
 everything live through the shell, but I'd obviously like to be able to work
 on it by writing and running scripts.

You can create your own projects that build against Spark and GraphX through a 
Maven dependency [1], then run those applications using the bin/spark-submit 
script included with Spark [2].

These guides assume you already know how to do this using your preferred build 
tool (SBT or Maven). In short, here's how to do it with SBT:

1. Install SBT locally (`brew install sbt` on OS X).

2. Inside your project directory, create a build.sbt file listing Spark and 
GraphX as a dependency, as in [3].

3. Run `sbt package` in a shell.

4. Pass the JAR in your_project_dir/target/scala-2.10/ to bin/spark-submit.

[1] 
http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark
[2] http://spark.apache.org/docs/latest/submitting-applications.html
[3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4

 2) Is there a way to check the status of the partitions of a graph? For
 example, I want to determine for starters if the number of partitions
 requested are always made, like if I ask for 8 partitions but only have 4
 cores what happens?

You can look at `graph.vertices` and `graph.edges`, which are both RDDs, so you 
can do for example: graph.vertices.partitions

 3) Would I be able to partition by vertex instead of edges, even if I had to
 write it myself? I know partitioning by edges is favored in a majority of
 the cases, but for the sake of research I'd like to be able to do both.

If you pass PartitionStrategy.EdgePartition1D, this will partition edges by 
their source vertices, so all edges with the same source will be 
co-partitioned, and the communication pattern will be similar to 
vertex-partitioned (edge-cut) systems like Giraph.

 4) Is there a better way to time processes outside of using built-in unix
 timing through the logs or something?

I think the options are Unix timing, log file timestamp parsing, looking at the 
web UI, or writing timing code within your program (System.currentTimeMillis 
and System.nanoTime).

Ankur

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: An interesting and serious problem I encountered

2015-02-13 Thread Landmark
Thanks for Ye Xianjin's suggestions.

The SizeOf.jar may indeed have some problems. I did a simple test as
follows. The codes are 

 val n = 1; //5; //10; //100; //1000;
 val arr1 = new Array[(Int, Array[Int])](n);
 for(i - 0 until arr1.length){
arr1(i) = (i, new Array[Int](43));
 }
 println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr1)));

 
 val arr2 = new Array[Array[Int]](n);
 for(i - 0 until arr2.length){
arr2(i) = new Array[Int](43);
 }
 println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr2)));


I changed the value of n, and its results are 
n=1
1016.0b
216.0b
 
n=5
1.9140625Kb
1000.0b

n=10
3.0625Kb
1.9296875Kb

n=100
23.8046875Kb
19.15625Kb

n=1000
231.2265625Kb
191.421875Kb


As suggested by Ye Xianjin, I tried to use SizeEstimator
(https://github.com/phatak-dev/java-sizeof/blob/master/src/main/scala/com/madhukaraphatak/sizeof/SizeEstimator.scala)
The results are 
n=1
264
216

n=5
1240
1000

n=10
2456
1976

n=100
24416
19616

n=1000
227216
182576

It seems that SizeEstimator computes the memory correctly.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637p21652.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org