Re: CPU RAM

2014-09-17 Thread Akhil Das
Ganglia does give you a cluster wide and per machine utilization of
resources, but i don't think it gives your per Spark Job. If you want to
build something from scratch then you can follow up like :

1. Login to the machine
2. Get the PIDs
3. For network IO per process, you can have a look at
http://nethogs.sourceforge.net/
4. You can make use of the information in /proc/[pid]/stat and /proc/stat
to estimate CPU usage and all


Similarly you can get any metric of process once you have the PID.


Thanks
Best Regards

On Wed, Sep 17, 2014 at 8:59 AM, VJ Shalish vjshal...@gmail.com wrote:

 Sorry for the confusion Team.
 My requirement is to measure the CPU utilisation, RAM usage, Network IO
 and other metrics of a SPARK JOB using Java program.
 Please help on the same.

 On Tue, Sep 16, 2014 at 11:23 PM, Amit kumarami...@gmail.com wrote:

 Not particularly related to Spark, but you can check out SIGAR API. It
 let's you get CPU, Memory, Network, Filesystem and process based metrics.

 Amit
 On Sep 16, 2014, at 20:14, VJ Shalish vjshal...@gmail.com wrote:

  Hi
 
  I need to get the CPU utilisation, RAM usage, Network IO and other
 metrics using Java program. Can anyone help me on this?
 
  Thanks
  Shalish.





Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD

2014-09-17 Thread Davies Liu
PipelinedRDD is an RDD generated by Python mapper/reducer, such as
rdd.map(func) will be PipelinedRDD.

PipelinedRDD is an subclass of RDD, so it should have all the APIs which
RDD has.

 sc.parallelize(range(10)).map(lambda x: (x, str(x))).sortByKey().count()
10

I'm wondering that how can you trigger this error?

Davies

On Tue, Sep 16, 2014 at 10:03 PM, edmond_huo huoxiang5...@gmail.com wrote:
 Hi,

 I am a freshman about spark. I tried to run a job like wordcount example in
 python. But when I tried to get the top 10 popular words in the file, I got
 the message:AttributeError: 'PipelinedRDD' object has no attribute
 'sortByKey'.

 So my question is what is the difference between PipelinedRDD and RDD? and
 if I want to sort the data in PipelinedRDD, how can I do it?

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/The-difference-between-pyspark-rdd-PipelinedRDD-and-pyspark-rdd-RDD-tp14421.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: collect on hadoopFile RDD returns wrong results

2014-09-17 Thread vasiliy
it also appears in streaming hdfs fileStream



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: collect on hadoopFile RDD returns wrong results

2014-09-17 Thread Akhil Das
Can you dump out a small piece of data? while doing rdd.collect and
rdd.foreach(println)

Thanks
Best Regards

On Wed, Sep 17, 2014 at 12:26 PM, vasiliy zadonsk...@gmail.com wrote:

 it also appears in streaming hdfs fileStream



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14425.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: permission denied on local dir

2014-09-17 Thread Sean Owen
Yes, that is how it is supposed to work. Apps run as yarn and do not
generally expect to depend on local file state that is created externally.

This directory should be owned by yarn though right? Your error does not
show permission denied. It looks like you are unable to list a yarn dir as
your user but that's expected. What are you expecting to do?
On Sep 17, 2014 6:23 AM, style95 style9...@gmail.com wrote:

 I am running spark on shared yarn cluster.
 My user ID is online, but I found that when I run my spark application,
 local directories are created by yarn user ID.
 So I am unable to delete local directories and finally application failed.

 Please refer to my log below:

 14/09/16 21:59:02 ERROR DiskBlockManager: Exception while deleting local
 spark dir:

 /hadoop02/hadoop/yarn/local/usercache/online/appcache/application_1410795082830_3994/spark-local-20140916215842-6fe7
 java.io.IOException: Failed to list files for dir:

 /hadoop02/hadoop/yarn/local/usercache/online/appcache/application_1410795082830_3994/spark-local-20140916215842-6fe7/3a
 at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:580)
 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:592)
 at

 org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:593)
 at

 org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:592)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:592)
 at

 org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:163)
 at

 org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:160)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:160)
 at

 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:153)
 at

 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:151)
 at

 org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:151)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
 at

 org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:151)


 I am unable to access

 /hadoop02/hadoop/yarn/local/usercache/online/appcache/application_1410795082830_3994/spark-local-20140916215842-6fe7
 e.g) ls

 /hadoop02/hadoop/yarn/local/usercache/online/appcache/application_1410795082830_3994/spark-local-20140916215842-6fe7
 does not work and permission denied occurred.

 I am using spark-1.0.0 and yarn 2.4.0.

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/permission-denied-on-local-dir-tp14422.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: collect on hadoopFile RDD returns wrong results

2014-09-17 Thread vasiliy
full code example:
  def main(args: Array[String]) {
val conf = new
SparkConf().setAppName(ErrorExample).setMaster(local[8])
  .set(spark.serializer, classOf[KryoSerializer].getName)
val sc = new SparkContext(conf)

val rdd = sc.hadoopFile(
hdfs://./user.avro,
  classOf[org.apache.avro.mapred.AvroInputFormat[User]],
  classOf[org.apache.avro.mapred.AvroWrapper[User]],
  classOf[org.apache.hadoop.io.NullWritable],
  1)

val usersRDD = rdd.map({ case (u, _) = u.datum()})
usersRDD.foreach(println)

println(-)

val collected = usersRDD.collect()

collected.foreach(println)
  }


output (without info loggind etc):
{id: 1, name: a}
{id: 2, name: b}
{id: 3, name: c}
{id: 4, name: d}
{id: 5, name: e}
{id: 6, name: f}
-
{id: 6, name: f}
{id: 6, name: f}
{id: 6, name: f}
{id: 6, name: f}
{id: 6, name: f}
{id: 6, name: f}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14428.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Configuring Spark for heterogenous hardware

2014-09-17 Thread Victor Tso-Guillen
I'm supposing that there's no good solution to having heterogenous hardware
in a cluster. What are the prospects of having something like this in the
future? Am I missing an architectural detail that precludes this
possibility?

Thanks,
Victor

On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com
wrote:

 Ping...

 On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 So I have a bunch of hardware with different core and memory setups. Is
 there a way to do one of the following:

 1. Express a ratio of cores to memory to retain. The spark worker config
 would represent all of the cores and all of the memory usable for any
 application, and the application would take a fraction that sustains the
 ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker
 take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing
 both out. If there were only 16G with the same ratio requirement, it would
 only take 3 cores and 12G in a single executor and leave the rest.

 2. Have the executor take whole number ratios of what it needs. Say it is
 configured for 2/8G and the worker has 4/20. So we can give the executor
 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of
 the two parameters.

 Either way would allow me to get my heterogenous hardware all
 participating in the work of my spark cluster, presumably without
 endangering spark's assumption of homogenous execution environments in the
 dimensions of memory and cores. If there's any way to do this, please
 enlighten me.





Re: Configuring Spark for heterogenous hardware

2014-09-17 Thread Sean Owen
I thought I answered this ... you can easily accomplish this with YARN
by just telling YARN how much memory / CPU each machine has. This can
be configured in groups too rather than per machine. I don't think you
actually want differently-sized executors, and so don't need ratios.
But you can have differently-sized containers which can fit different
numbers of executors as appropriate.

On Wed, Sep 17, 2014 at 8:35 AM, Victor Tso-Guillen v...@paxata.com wrote:
 I'm supposing that there's no good solution to having heterogenous hardware
 in a cluster. What are the prospects of having something like this in the
 future? Am I missing an architectural detail that precludes this
 possibility?

 Thanks,
 Victor

 On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Ping...

 On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 So I have a bunch of hardware with different core and memory setups. Is
 there a way to do one of the following:

 1. Express a ratio of cores to memory to retain. The spark worker config
 would represent all of the cores and all of the memory usable for any
 application, and the application would take a fraction that sustains the
 ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker
 take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing
 both out. If there were only 16G with the same ratio requirement, it would
 only take 3 cores and 12G in a single executor and leave the rest.

 2. Have the executor take whole number ratios of what it needs. Say it is
 configured for 2/8G and the worker has 4/20. So we can give the executor
 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of
 the two parameters.

 Either way would allow me to get my heterogenous hardware all
 participating in the work of my spark cluster, presumably without
 endangering spark's assumption of homogenous execution environments in the
 dimensions of memory and cores. If there's any way to do this, please
 enlighten me.




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: YARN mode not available error

2014-09-17 Thread Sean Owen
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.scheduler.cluster.YarnClientClusterScheduler

It sounds like you perhaps deployed a custom build of Spark that did
not include YARN support? you need -Pyarn in your build.

On Wed, Sep 17, 2014 at 4:47 AM, Barrington barrington.he...@me.com wrote:
 Hi,

 I am running Spark in cluster mode with Hadoop YARN as the underlying
 cluster manager. I get this error when trying to initialize the
 SparkContext.


 Exception in thread main org.apache.spark.SparkException: YARN mode not
 available ?
 at
 org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:1586)
 at org.apache.spark.SparkContext.init(SparkContext.scala:310)
 at org.apache.spark.SparkContext.init(SparkContext.scala:86)
 at LascoScript$.main(LascoScript.scala:24)
 at LascoScript.main(LascoScript.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at 
 com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.scheduler.cluster.YarnClientClusterScheduler
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:190)
 at
 org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:1580)




 My build.sbt file  looks like this:



 name := LascoScript

 version := 1.0

 scalaVersion := 2.10.4

 val excludeJBossNetty = ExclusionRule(organization = org.jboss.netty)
 val excludeMortbayJetty = ExclusionRule(organization = org.eclipse.jetty,
 artifact = jetty-server)
 val excludeAsm = ExclusionRule(organization = org.ow2.asm)
 val excludeCommonsLogging = ExclusionRule(organization = commons-logging)
 val excludeSLF4J = ExclusionRule(organization = org.slf4j)
 val excludeOldAsm = ExclusionRule(organization = asm)
 val excludeServletApi = ExclusionRule(organization = javax.servlet,
 artifact = servlet-api)


 libraryDependencies += org.apache.spark %% spark-core % 1.1.0
 excludeAll(
  excludeServletApi, excludeMortbayJetty
 )

 libraryDependencies += org.apache.hadoop % hadoop-client % 2.5.1
 excludeAll(
  excludeJBossNetty, excludeMortbayJetty, excludeAsm, excludeCommonsLogging,
 excludeSLF4J, excludeOldAsm, excludeServletApi
  )

 libraryDependencies += org.mortbay.jetty % servlet-api % 3.0.20100224

 libraryDependencies += org.eclipse.jetty % jetty-server %
 8.1.16.v20140903

 unmanagedJars in Compile ++= {
  val base = baseDirectory.value
  val baseDirectories = (base / lib) +++ (base)
  val customJars = (baseDirectories ** *.jar)
  customJars.classpath
 }

 resolvers += Akka Repository at http://repo.akka.io/releases/“



 How can I fix this issue?

 - Barrington



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/YARN-mode-not-available-error-tp14420.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Configuring Spark for heterogenous hardware

2014-09-17 Thread Victor Tso-Guillen
Hmm, interesting. I'm using standalone mode but I could consider YARN. I'll
have to simmer on that one. Thanks as always, Sean!

On Wed, Sep 17, 2014 at 12:40 AM, Sean Owen so...@cloudera.com wrote:

 I thought I answered this ... you can easily accomplish this with YARN
 by just telling YARN how much memory / CPU each machine has. This can
 be configured in groups too rather than per machine. I don't think you
 actually want differently-sized executors, and so don't need ratios.
 But you can have differently-sized containers which can fit different
 numbers of executors as appropriate.

 On Wed, Sep 17, 2014 at 8:35 AM, Victor Tso-Guillen v...@paxata.com
 wrote:
  I'm supposing that there's no good solution to having heterogenous
 hardware
  in a cluster. What are the prospects of having something like this in the
  future? Am I missing an architectural detail that precludes this
  possibility?
 
  Thanks,
  Victor
 
  On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com
  wrote:
 
  Ping...
 
  On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com
  wrote:
 
  So I have a bunch of hardware with different core and memory setups. Is
  there a way to do one of the following:
 
  1. Express a ratio of cores to memory to retain. The spark worker
 config
  would represent all of the cores and all of the memory usable for any
  application, and the application would take a fraction that sustains
 the
  ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the
 worker
  take 4/20 and the executor take 5 G for each of the 4 cores, thus
 maxing
  both out. If there were only 16G with the same ratio requirement, it
 would
  only take 3 cores and 12G in a single executor and leave the rest.
 
  2. Have the executor take whole number ratios of what it needs. Say it
 is
  configured for 2/8G and the worker has 4/20. So we can give the
 executor
  2/8G (which is true now) or we can instead give it 4/16G, maxing out
 one of
  the two parameters.
 
  Either way would allow me to get my heterogenous hardware all
  participating in the work of my spark cluster, presumably without
  endangering spark's assumption of homogenous execution environments in
 the
  dimensions of memory and cores. If there's any way to do this, please
  enlighten me.
 
 
 



Re: Questions about Spark speculation

2014-09-17 Thread Andrew Ash
Hi Nicolas,

I've had suspicions about speculation causing problems on my cluster but
don't have any hard evidence of it yet.

I'm also interested in why it's turned off by default.

On Tue, Sep 16, 2014 at 3:01 PM, Nicolas Mai nicolas@gmail.com wrote:

 Hi, guys

 My current project is using Spark 0.9.1, and after increasing the level of
 parallelism and partitions in our RDDs, stages and tasks seem to complete
 much faster. However it also seems that our cluster becomes more unstable
 after some time:
 - stalled stages still showing under active stages in the Spark app web
 dashboard
 - incomplete stages showing under completed stages
 - stages with failures

 I was thinking about reducing/tuning the number of parallelism, but I was
 also considering using spark.speculation which is currently turned off
 but
 seems promising.

 Questions about speculation:
 - Just wondering why it is turned off by default?
 - Are there any risks using speculation?
 - Is it possible that a speculative task straggles, and would trigger
 another new speculative task to finish the job... and so on... (some kind
 of
 loop until there's no more executors available).
 - What configuration do you guys usually use for spark.speculation?
 (interval, quantile, multiplier) I guess it depends on the project, it may
 give some ideas about how to use it properly.

 Thank you! :)
 Nicolas



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-speculation-tp14398.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Do I Need to Set Checkpoint Interval for Every DStream?

2014-09-17 Thread Ji ZHANG
Hi,

I'm using spark streaming 1.0. I create dstream with kafkautils and
apply some operations on it. There's a reduceByWindow operation at
last so I suppose the checkpoint interval should be automatically set
to more than 10 seconds. But what I see is it still checkpoint every 2
seconds (my batch interval), and from the log I see:

[2014-09-17 16:43:25,096] INFO Checkpoint interval automatically set
to 12000 ms (org.apache.spark.streaming.dstream.ReducedWindowedDStream)
[2014-09-17 16:43:25,105] INFO Checkpoint interval = null
(org.apache.spark.streaming.kafka.KafkaInputDStream)
[2014-09-17 16:43:25,107] INFO Checkpoint interval = null
(org.apache.spark.streaming.dstream.MappedDStream)
[2014-09-17 16:43:25,108] INFO Checkpoint interval = null
(org.apache.spark.streaming.dstream.MappedDStream)
[2014-09-17 16:43:25,108] INFO Checkpoint interval = null
(org.apache.spark.streaming.dstream.FilteredDStream)
[2014-09-17 16:43:25,109] INFO Checkpoint interval = null
(org.apache.spark.streaming.dstream.FlatMappedDStream)
[2014-09-17 16:43:25,110] INFO Checkpoint interval = null
(org.apache.spark.streaming.dstream.FlatMappedDStream)
[2014-09-17 16:43:25,110] INFO Checkpoint interval = null
(org.apache.spark.streaming.dstream.ShuffledDStream)
[2014-09-17 16:43:25,111] INFO Checkpoint interval = 12000 ms
(org.apache.spark.streaming.dstream.ReducedWindowedDStream)
[2014-09-17 16:43:25,111] INFO Checkpoint interval = null
(org.apache.spark.streaming.dstream.ForEachDStream)

So does it mean I have to set checkpoint interval for all the dstreams?

Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



About the Spark Scala API Excption

2014-09-17 Thread churly lin
Hi all:
The Scala API here: http://spark.apache.org/docs/latest/api/scala/#package
I found that it didn't specify the exception of each function or class.
Then how could I know if a function throws a exception or not without
reading the source code?


Change RDDs using map()

2014-09-17 Thread Deep Pradhan
Hi,
I want to make the following changes in the RDD (create new RDD from the
existing to reflect some transformation):
In an RDD of key-value pair, I want to get the keys for which the values
are 1.
How to do this using map()?
Thank You


Re: Change RDDs using map()

2014-09-17 Thread Mark Hamstra
You don't.  That's what filter or the partial function version of collect
are for:

val transformedRDD = yourRDD.collect { case (k, v) if k == 1 = v }

On Wed, Sep 17, 2014 at 3:24 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I want to make the following changes in the RDD (create new RDD from the
 existing to reflect some transformation):
 In an RDD of key-value pair, I want to get the keys for which the values
 are 1.
 How to do this using map()?
 Thank You



Re: CPU RAM

2014-09-17 Thread VJ Shalish
Hi

I need the same through Java.
Doesn't the SPark API support this?

On Wed, Sep 17, 2014 at 2:48 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Ganglia does give you a cluster wide and per machine utilization of
 resources, but i don't think it gives your per Spark Job. If you want to
 build something from scratch then you can follow up like :

 1. Login to the machine
 2. Get the PIDs
 3. For network IO per process, you can have a look at
 http://nethogs.sourceforge.net/
 4. You can make use of the information in /proc/[pid]/stat and /proc/stat
 to estimate CPU usage and all


 Similarly you can get any metric of process once you have the PID.


 Thanks
 Best Regards

 On Wed, Sep 17, 2014 at 8:59 AM, VJ Shalish vjshal...@gmail.com wrote:

 Sorry for the confusion Team.
 My requirement is to measure the CPU utilisation, RAM usage, Network IO
 and other metrics of a SPARK JOB using Java program.
 Please help on the same.

 On Tue, Sep 16, 2014 at 11:23 PM, Amit kumarami...@gmail.com wrote:

 Not particularly related to Spark, but you can check out SIGAR API. It
 let's you get CPU, Memory, Network, Filesystem and process based metrics.

 Amit
 On Sep 16, 2014, at 20:14, VJ Shalish vjshal...@gmail.com wrote:

  Hi
 
  I need to get the CPU utilisation, RAM usage, Network IO and other
 metrics using Java program. Can anyone help me on this?
 
  Thanks
  Shalish.






pyspark on yarn - lost executor

2014-09-17 Thread Oleg Ruchovets
Hi ,
  I am execution pyspark on yarn.
I have successfully executed initial dataset but now I growed it 10 times
more.

during execution I got all the time this error:
  14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor
68 on UCS-NODE1.sms1.local: remote Akka client disassociated

 tasks are failed a resubmitted again:

14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29,
32, 33, 48, 75, 86, 91, 93, 94
14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93
14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27,
39, 51, 64
14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42,
61, 67, 77, 81, 91
14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29,
34, 40, 46, 67, 69, 86
14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17,
18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89
14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79,
92
14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24,
31, 43, 65, 73
14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72,
75, 84



*QUESTION:*
   how to debug / tune the problem.
What can cause to such behavior?
I have 5 machine cluster with 32 GB ram.
 Dataset - 3G.

command for execution:


 /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
--master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
--py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
 /input/tad/inpuut.csv  /output/cad_model_500_2


Where can I find description of the parameters?
--num-executors 12
--driver-memory 4g
--executor-memory 2g

What parameters should be used for tuning?

Thanks
Oleg.


Short Circuit Local Reads

2014-09-17 Thread Gary Malouf
Cloudera had a blog post about this in August 2013:
http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/

Has anyone been using this in production - curious as to if it made a
significant difference from a Spark perspective.


Number of partitions when saving (pyspark)

2014-09-17 Thread Luis Guerra
Hi everyone,

Is it possible to fix the number of tasks related to a saveAsTextFile in
Pyspark?

I am loading several files from HDFS, fixing the number of partitions to X
(let's say 40 for instance). Then some transformations, like joins and
filters are carried out. The weird thing here is that the number of tasks
involved in these transformations are 80, i.e. the double of the fixed
number of partitions. However, when the saveAsTextFile action is carried
out, there are only 4 tasks to do this (and I have not been able to
increase that number). My problem here is that those 4 tasks make rapidly
increase the used memory and take too long to finish.

I am launching my process from windows to a cluster in ubuntu, with 13
computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2.

Any clue with this?

Thanks in advance


Re: pyspark on yarn - lost executor

2014-09-17 Thread Eric Friedman
How many partitions do you have in your input rdd?  Are you specifying 
numPartitions in subsequent calls to groupByKey/reduceByKey?  

 On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote:
 
 Hi , 
   I am execution pyspark on yarn.
 I have successfully executed initial dataset but now I growed it 10 times 
 more.
 
 during execution I got all the time this error:
   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 
 68 on UCS-NODE1.sms1.local: remote Akka client disassociated
 
  tasks are failed a resubmitted again:
 
 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 
 32, 33, 48, 75, 86, 91, 93, 94
 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93
 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 
 39, 51, 64
 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 
 67, 77, 81, 91
 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 
 40, 46, 67, 69, 86
 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 
 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89
 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92
 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 
 31, 43, 65, 73
 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at 
 PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 
 84
 
 
 
 QUESTION:
how to debug / tune the problem.
 What can cause to such behavior? 
 I have 5 machine cluster with 32 GB ram.
  Dataset - 3G.
 
 command for execution:
 
  /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit 
 --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g 
 --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py  
 /input/tad/inpuut.csv  /output/cad_model_500_2 
 
 
 Where can I find description of the parameters? 
 --num-executors 12  
 --driver-memory 4g 
 --executor-memory 2g
 
 What parameters should be used for tuning?
 
 Thanks
 Oleg.
 
 
 


Adjacency List representation in Spark

2014-09-17 Thread Harsha HN
Hello

We are building an adjacency list to represent a graph. Vertexes, Edges and
Weights for the same has been extracted from hdfs files by a Spark job.
Further we expect size of the adjacency list(Hash Map) could grow over
20Gigs.
How can we represent this in RDD, so that it will distributed in nature?

Basically we are trying to fit HashMap(Adjacency List) into Spark RDD. Is
there any other way other than GraphX?

Thanks and Regards,
Harsha


Adjacency List representation in Spark

2014-09-17 Thread Sree Harsha
Hello

We are building an adjacency list to represent a graph. Vertexes, Edges and
Weights for the same has been extracted from hdfs files by a Spark job. 
Further we expect size of the adjacency list(Hash Map) could grow over
20Gigs. 
How can we represent this in RDD, so that it will distributed in nature? 

Basically we are trying to fit HashMap(Adjacency List) into Spark RDD. Is
there any other way other than GraphX?

Thanks and Regards,
Harsha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Adjacency-List-representation-in-Spark-tp1.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



list of documents sentiment analysis - problem with defining proper approach with Spark

2014-09-17 Thread xnts
Hi,

For last few days I am working on an exercise where I want to understand the 
sentiment of a set of articles.

As the input I have XML file with articles and the AFINN-111.txt file defining 
sentiment of few hundred words.

What I am able to do without any problem is loading of the data, putting it 
into structures (classes for articles, tuples (word, sentiment-value) for 
sentiments).

Then what I think I need to do (from the logical pov) is:

foreach article
   articleWords = split the body by   
   join the two lists (articleWords and sentimentWords) together.
   calculate the sentiment for the article by summing up sentiments of all 
words that it includes
dump the article id, sentiment into a flat file

And this is where I am stuck :) I tried multiple combinations of 
map/reduceByKey all either didn't make too much sense (like getting sentiment 
for all articles combined) or resulted in errors that function cannot be 
serialised. Today I even tried to implement this with a brute-force approach 
doing:

articles.foreach(calculateSentiment)

where calculateSentiment looks like below:

val words = sc.parallelize(post.body.split( )) // split body by   
val wordPairs = words.map(w = (w, 1)).reduceByKey(_+_, 1) // create tuples of 
(word, #occurrences in article)
val joinedValues = wordPairs.join(sentiments_) // join 

But somehow I had a feeling this is not the best idea and I think I was right, 
since the job is running for like an hour (and I have few hundred GBs to 
process only).

So the question is - what I am doing wrong? Any hints or suggestions for 
direction are really appreciated!

Thank you,
Leszek



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD

2014-09-17 Thread edmond_huo
Hi Davis, 

Thank you for you answer. This is my code. I think it is very similar with
word count example in spark

  lines = sc.textFile(sys.argv[2])
  sie = lines.map(lambda l: (l.strip().split(',')[4],1)).reduceByKey(lambda
a, b: a + b)
  sort_sie = sie.sortByKey(False)

Thanks again.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-difference-between-pyspark-rdd-PipelinedRDD-and-pyspark-rdd-RDD-tp14421p14448.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD

2014-09-17 Thread edmond_huo
Hi Davis, 

When I run your code in pyspark, I still get the same error:

 sc.parallelize(range(10)).map(lambda x: (x, str(x))).sortByKey().count()
Traceback (most recent call last):
  File stdin, line 1, in module
AttributeError: 'PipelinedRDD' object has no attribute 'sortByKey'

Is it the matter with spark version? I am using spark-0.7.3.

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-difference-between-pyspark-rdd-PipelinedRDD-and-pyspark-rdd-RDD-tp14421p14449.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark and disk usage.

2014-09-17 Thread Макар Красноперов
Hello everyone.

The problem is that spark write data to the disk very hard, even if
application has a lot of free memory (about 3.8g).
So, I've noticed that folder with name like
spark-local-20140917165839-f58c contains a lot of other folders with
files like shuffle_446_0_1. The total size of files in the dir
spark-local-20140917165839-f58c can reach 1.1g.
Sometimes its size decreases (are there only temp files in that folder?),
so the totally amount of data written to the disk is greater than 1.1g.

The question is what kind of data Spark store there and can I make spark
not to write it on the disk and just keep it in the memory if there is
enough RAM free space?

I run my job locally with Spark 1.0.1:
./bin/spark-submit --driver-memory 12g --master local[3] --properties-file
conf/spark-defaults.conf --class my.company.Main /path/to/jar/myJob.jar

spark-defaults.conf :
spark.shuffle.spill false
spark.reducer.maxMbInFlight 1024
spark.shuffle.file.buffer.kb2048
spark.storage.memoryFraction0.7

The situation with disk usage is common for many jobs. I had also used ALS
from MLIB and saw the similar things.

I had reached no success by playing with spark configuration and i hope
someone can help me :)


Re: pyspark on yarn - lost executor

2014-09-17 Thread Oleg Ruchovets
Sure, I'll post to the mail list.
groupByKey(self, numPartitions=None)source code
http://spark.apache.org/docs/1.0.2/api/python/pyspark.rdd-pysrc.html#RDD.groupByKey


Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with into numPartitions partitions.


So instead of using default I'll provide numPartitions , but what is the
best practice to calculate the number of partitions? and how number of
partitions related to my original problem?


Thanks

Oleg.

http://spark.apache.org/docs/1.0.2/api/python/frames.html



On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman eric.d.fried...@gmail.com
wrote:

 Look at the API for text file and groupByKey. Please don't take threads
 off list. Other people have the same questions.

 
 Eric Friedman

 On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 Can hou please explain how to configure partitions?
 Thanks
 Oleg

 On Wednesday, September 17, 2014, Eric Friedman eric.d.fried...@gmail.com
 wrote:

 Yeah, you need to increase partitions. You only have one on your text
 file. On groupByKey you're getting the pyspark default, which is too low.

 
 Eric Friedman

 On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 This is very good question :-).

 Here is my code:

 sc = SparkContext(appName=CAD)
 lines = sc.textFile(sys.argv[1], 1)
 result = lines.map(doSplit).groupByKey().mapValues(lambda vc:
 my_custom_function(vc))
 result.saveAsTextFile(sys.argv[2])

 Should I configure partitioning manually ? Where should I configure it?
 Where can I read about partitioning best practices?

 Thanks
 Oleg.

 On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman eric.d.fried...@gmail.com
  wrote:

 How many partitions do you have in your input rdd?  Are you specifying
 numPartitions in subsequent calls to groupByKey/reduceByKey?

 On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com
 wrote:

 Hi ,
   I am execution pyspark on yarn.
 I have successfully executed initial dataset but now I growed it 10
 times more.

 during execution I got all the time this error:
   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost
 executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated

  tasks are failed a resubmitted again:

 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26,
 29, 32, 33, 48, 75, 86, 91, 93, 94
 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93
 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23,
 27, 39, 51, 64
 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34,
 42, 61, 67, 77, 81, 91
 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23,
 29, 34, 40, 46, 67, 69, 86
 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15,
 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85,
 89
 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59,
 79, 92
 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11,
 24, 31, 43, 65, 73
 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
 at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72,
 75, 84



 *QUESTION:*
how to debug / tune the problem.
 What can cause to such behavior?
 I have 5 machine cluster with 32 GB ram.
  Dataset - 3G.

 command for execution:


  /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
 --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
 --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
  /input/tad/inpuut.csv  /output/cad_model_500_2


 Where can I find description of the parameters?
 --num-executors 12
 --driver-memory 4g
 --executor-memory 2g

 What parameters should be used for tuning?

 Thanks
 Oleg.







Re: Spark and disk usage.

2014-09-17 Thread Burak Yavuz
Hi,

The files you mentioned are temporary files written by Spark during shuffling. 
ALS will write a LOT of those files as it is a shuffle heavy algorithm.
Those files will be deleted after your program completes as Spark looks for 
those files in case a fault occurs. Having those files ready allows Spark to 
continue from the stage the shuffle left off, instead of starting from the very 
beginning.

Long story short, it's to your benefit that Spark writes those files to disk. 
If you don't want Spark writing to disk, you can specify a checkpoint directory 
in
HDFS, where Spark will write the current status instead and will clean up files 
from disk.

Best,
Burak

- Original Message -
From: Макар Красноперов connector@gmail.com
To: user@spark.apache.org
Sent: Wednesday, September 17, 2014 7:37:49 AM
Subject: Spark and disk usage.

Hello everyone.

The problem is that spark write data to the disk very hard, even if
application has a lot of free memory (about 3.8g).
So, I've noticed that folder with name like
spark-local-20140917165839-f58c contains a lot of other folders with
files like shuffle_446_0_1. The total size of files in the dir
spark-local-20140917165839-f58c can reach 1.1g.
Sometimes its size decreases (are there only temp files in that folder?),
so the totally amount of data written to the disk is greater than 1.1g.

The question is what kind of data Spark store there and can I make spark
not to write it on the disk and just keep it in the memory if there is
enough RAM free space?

I run my job locally with Spark 1.0.1:
./bin/spark-submit --driver-memory 12g --master local[3] --properties-file
conf/spark-defaults.conf --class my.company.Main /path/to/jar/myJob.jar

spark-defaults.conf :
spark.shuffle.spill false
spark.reducer.maxMbInFlight 1024
spark.shuffle.file.buffer.kb2048
spark.storage.memoryFraction0.7

The situation with disk usage is common for many jobs. I had also used ALS
from MLIB and saw the similar things.

I had reached no success by playing with spark configuration and i hope
someone can help me :)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: permission denied on local dir

2014-09-17 Thread style95
However, in my application there is no logic to access local files.
So I thought that spark is internally using the local file system to cache
RDDs.

As per the log, it looks that error occurred during spark internal logic
rather than my business logic.
It is trying to delete local directories.
and they looks directories for cache.

/hadoop02/hadoop/yarn/local/*usercache/online/appcache*/application_1410795082830_3994/spark-local-20140916215842-6fe7

Are there any way not to use cache or local directories?
or way to access the directories created by yarn user via online, my
spark user?

Thanks
Regards
Dongkyoung.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Permission-denied-on-local-dir-tp14422p14453.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-17 Thread Soumitra Kumar
Hmm, no response to this thread!

Adding to it, please share experiences of building an enterprise grade product 
based on Spark Streaming.

I am exploring Spark Streaming for enterprise software and am cautiously 
optimistic about it. I see huge potential to improve debuggability of Spark.

- Original Message -
From: Tim Smith secs...@gmail.com
To: spark users user@spark.apache.org
Sent: Friday, September 12, 2014 10:09:53 AM
Subject: Stable spark streaming app

Hi,

Anyone have a stable streaming app running in production? Can you
share some overview of the app and setup like number of nodes, events
per second, broad stream processing workflow, config highlights etc?

Thanks,

Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



GroupBy Key and then sort values with the group

2014-09-17 Thread abraham.jacob
Hi Group,

I am quite fresh in the spark world. There is a particular use case that I just 
cannot understand how to accomplish in spark. I am using Cloudera 
CDH5/YARN/Java 7.

I have a dataset that has the following characteristics -

A JavaPairRDD that represents the following -

Key = {int ID}
Value = {date effectiveFrom, float value}

Let's say that the data I have is the following -


Partition - 1
[K= 1, V= {09-17-2014, 2.8}]
[K= 1, V= {09-11-2014, 3.9}]
[K= 3, V= {09-18-2014, 5.0}]
[K= 3, V= {09-10-2014, 7.4}]


Partition - 2
[K= 2, V= {09-13-2014, 2.5}]
[K= 4, V= {09-07-2014, 6.2}]
[K= 2, V= {09-12-2014, 1.8}]
[K= 4, V= {09-22-2014, 2.9}]


Grouping by key gives me the following RDD

Partition - 1
[K= 1, V= Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})]
[K= 3, V= Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})]

Partition - 2
[K= 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})]
[K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]

Now I would like to sort by the values and the result should look like this -

Partition - 1
[K= 1, V= Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})]
[K= 3, V= Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})]

Partition - 2
[K= 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})]
[K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]


What is the best way to do this in spark? If so desired, I can even move the 
effectiveFrom (the field that I want to sort on) into the key field.

A code snippet or some pointers on how to solve this would be very helpful.

Regards,
Abraham


Re: pyspark on yarn - lost executor

2014-09-17 Thread Davies Liu
Maybe the Python worker use too much memory during groupByKey(),
groupByKey() with larger numPartitions can help.

Also, can you upgrade your cluster to 1.1? It can spilling the data
into disks if the memory can not hold all the data during groupByKey().

Also, If there is hot key with dozens of millions of values, the PR [1]
can help it, it actually helped someone with large datasets (3T).

Davies

[1] https://github.com/apache/spark/pull/1977

On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 Sure, I'll post to the mail list.

 groupByKey(self, numPartitions=None)

 source code

 Group the values for each key in the RDD into a single sequence. 
 Hash-partitions the resulting RDD with into numPartitions partitions.


 So instead of using default I'll provide numPartitions , but what is the best 
 practice to calculate the number of partitions? and how number of partitions 
 related to my original problem?


 Thanks

 Oleg.


 http://spark.apache.org/docs/1.0.2/api/python/frames.html



 On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman eric.d.fried...@gmail.com 
 wrote:

 Look at the API for text file and groupByKey. Please don't take threads off 
 list. Other people have the same questions.

 
 Eric Friedman

 On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 Can hou please explain how to configure partitions?
 Thanks
 Oleg

 On Wednesday, September 17, 2014, Eric Friedman eric.d.fried...@gmail.com 
 wrote:

 Yeah, you need to increase partitions. You only have one on your text file. 
 On groupByKey you're getting the pyspark default, which is too low.

 
 Eric Friedman

 On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 This is very good question :-).

 Here is my code:

 sc = SparkContext(appName=CAD)
 lines = sc.textFile(sys.argv[1], 1)
 result = lines.map(doSplit).groupByKey().mapValues(lambda vc: 
 my_custom_function(vc))
 result.saveAsTextFile(sys.argv[2])

 Should I configure partitioning manually ? Where should I configure it? 
 Where can I read about partitioning best practices?

 Thanks
 Oleg.

 On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman eric.d.fried...@gmail.com 
 wrote:

 How many partitions do you have in your input rdd?  Are you specifying 
 numPartitions in subsequent calls to groupByKey/reduceByKey?

 On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 Hi ,
   I am execution pyspark on yarn.
 I have successfully executed initial dataset but now I growed it 10 times 
 more.

 during execution I got all the time this error:
   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost 
 executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated

  tasks are failed a resubmitted again:

 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 
 29, 32, 33, 48, 75, 86, 91, 93, 94
 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 
 93
 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 
 27, 39, 51, 64
 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 
 42, 61, 67, 77, 81, 91
 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 
 29, 34, 40, 46, 67, 69, 86
 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 
 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 
 85, 89
 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 
 79, 92
 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 
 24, 31, 43, 65, 73
 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 
 72, 75, 84



 QUESTION:
how to debug / tune the problem.
 What can cause to such behavior?
 I have 5 machine cluster with 32 GB ram.
  Dataset - 3G.

 command for execution:

  
 /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit 
 --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g 
 --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py 
  /input/tad/inpuut.csv  /output/cad_model_500_2


 Where can I find 

Re: Number of partitions when saving (pyspark)

2014-09-17 Thread Davies Liu
On Wed, Sep 17, 2014 at 5:21 AM, Luis Guerra luispelay...@gmail.com wrote:
 Hi everyone,

 Is it possible to fix the number of tasks related to a saveAsTextFile in
 Pyspark?

 I am loading several files from HDFS, fixing the number of partitions to X
 (let's say 40 for instance). Then some transformations, like joins and
 filters are carried out. The weird thing here is that the number of tasks
 involved in these transformations are 80, i.e. the double of the fixed
 number of partitions. However, when the saveAsTextFile action is carried
 out, there are only 4 tasks to do this (and I have not been able to increase
 that number). My problem here is that those 4 tasks make rapidly increase
 the used memory and take too long to finish.

 I am launching my process from windows to a cluster in ubuntu, with 13
 computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2.

The saveAsTextFile() is an mapper RDD, so the number of partitions of it
is determined by previous RDD.

In Spark 1.0.2, groupByKey() or reduceByKey() will take the number of CPUs
on driver (locally) as the default partitions, so it's 4. You need to change it
to 40 or 80 in this case.

BTW, In Spark 1.1, groupByKey() and reduceByKey() will use the number of
partitions of previous RDD as the default value.

Davies

 Any clue with this?

 Thanks in advance

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GroupBy Key and then sort values with the group

2014-09-17 Thread Sean Owen
You just need to call mapValues() to change your Iterable of things
into a sorted Iterable of things for each key-value pair. In that
function you write, it's no different from any other Java program. I
imagine you'll need to copy the input Iterable into an ArrayList
(unfortunately), sort it with whatever Comparator you want, and return
the result.

On Wed, Sep 17, 2014 at 4:37 PM,  abraham.ja...@thomsonreuters.com wrote:
 Hi Group,



 I am quite fresh in the spark world. There is a particular use case that I
 just cannot understand how to accomplish in spark. I am using Cloudera
 CDH5/YARN/Java 7.



 I have a dataset that has the following characteristics –



 A JavaPairRDD that represents the following –



 Key = {int ID}

 Value = {date effectiveFrom, float value}



 Let’s say that the data I have is the following –





 Partition – 1

 [K= 1, V= {09-17-2014, 2.8}]

 [K= 1, V= {09-11-2014, 3.9}]

 [K= 3, V= {09-18-2014, 5.0}]

 [K= 3, V= {09-10-2014, 7.4}]





 Partition – 2

 [K= 2, V= {09-13-2014, 2.5}]

 [K= 4, V= {09-07-2014, 6.2}]

 [K= 2, V= {09-12-2014, 1.8}]

 [K= 4, V= {09-22-2014, 2.9}]





 Grouping by key gives me the following RDD



 Partition – 1

 [K= 1, V= Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})]

 [K= 3, V= Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})]



 Partition – 2

 [K= 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})]

 [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]



 Now I would like to sort by the values and the result should look like this
 –



 Partition – 1

 [K= 1, V= Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})]

 [K= 3, V= Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})]



 Partition – 2

 [K= 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})]

 [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]





 What is the best way to do this in spark? If so desired, I can even move the
 “effectiveFrom” (the field that I want to sort on) into the key field.



 A code snippet or some pointers on how to solve this would be very helpful.



 Regards,

 Abraham

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: GroupBy Key and then sort values with the group

2014-09-17 Thread abraham.jacob
Thanks Sean,

Makes total sense. I guess I was so caught up with RDD's and all the wonderful 
transformations it can do, that I did not think about pain old Java 
Collections.sort(list, comparator).

Thanks,

__

Abraham


-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, September 17, 2014 9:37 AM
To: Jacob, Abraham (FinancialRisk)
Cc: user@spark.apache.org
Subject: Re: GroupBy Key and then sort values with the group

You just need to call mapValues() to change your Iterable of things into a 
sorted Iterable of things for each key-value pair. In that function you write, 
it's no different from any other Java program. I imagine you'll need to copy 
the input Iterable into an ArrayList (unfortunately), sort it with whatever 
Comparator you want, and return the result.

On Wed, Sep 17, 2014 at 4:37 PM,  abraham.ja...@thomsonreuters.com wrote:
 Hi Group,



 I am quite fresh in the spark world. There is a particular use case 
 that I just cannot understand how to accomplish in spark. I am using 
 Cloudera CDH5/YARN/Java 7.



 I have a dataset that has the following characteristics –



 A JavaPairRDD that represents the following –



 Key = {int ID}

 Value = {date effectiveFrom, float value}



 Let’s say that the data I have is the following –





 Partition – 1

 [K= 1, V= {09-17-2014, 2.8}]

 [K= 1, V= {09-11-2014, 3.9}]

 [K= 3, V= {09-18-2014, 5.0}]

 [K= 3, V= {09-10-2014, 7.4}]





 Partition – 2

 [K= 2, V= {09-13-2014, 2.5}]

 [K= 4, V= {09-07-2014, 6.2}]

 [K= 2, V= {09-12-2014, 1.8}]

 [K= 4, V= {09-22-2014, 2.9}]





 Grouping by key gives me the following RDD



 Partition – 1

 [K= 1, V= Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})]

 [K= 3, V= Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})]



 Partition – 2

 [K= 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})]

 [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]



 Now I would like to sort by the values and the result should look like 
 this –



 Partition – 1

 [K= 1, V= Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})]

 [K= 3, V= Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})]



 Partition – 2

 [K= 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})]

 [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]





 What is the best way to do this in spark? If so desired, I can even 
 move the “effectiveFrom” (the field that I want to sort on) into the key 
 field.



 A code snippet or some pointers on how to solve this would be very helpful.



 Regards,

 Abraham

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Adjacency List representation in Spark

2014-09-17 Thread Andrew Ash
Hi Harsha,

You could look through the GraphX source to see the approach taken there
for ideas in your own.  I'd recommend starting at
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala#L385
to see the storage technique.

Why do you want to avoid using GraphX?

Good luck!
Andrew

On Wed, Sep 17, 2014 at 6:43 AM, Harsha HN 99harsha.h@gmail.com wrote:

 Hello

 We are building an adjacency list to represent a graph. Vertexes, Edges
 and Weights for the same has been extracted from hdfs files by a Spark job.
 Further we expect size of the adjacency list(Hash Map) could grow over
 20Gigs.
 How can we represent this in RDD, so that it will distributed in nature?

 Basically we are trying to fit HashMap(Adjacency List) into Spark RDD. Is
 there any other way other than GraphX?

 Thanks and Regards,
 Harsha



Re: Spark and disk usage.

2014-09-17 Thread Andrew Ash
Hi Burak,

Most discussions of checkpointing in the docs is related to Spark
streaming.  Are you talking about the sparkContext.setCheckpointDir()?
 What effect does that have?

https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

On Wed, Sep 17, 2014 at 7:44 AM, Burak Yavuz bya...@stanford.edu wrote:

 Hi,

 The files you mentioned are temporary files written by Spark during
 shuffling. ALS will write a LOT of those files as it is a shuffle heavy
 algorithm.
 Those files will be deleted after your program completes as Spark looks
 for those files in case a fault occurs. Having those files ready allows
 Spark to
 continue from the stage the shuffle left off, instead of starting from the
 very beginning.

 Long story short, it's to your benefit that Spark writes those files to
 disk. If you don't want Spark writing to disk, you can specify a checkpoint
 directory in
 HDFS, where Spark will write the current status instead and will clean up
 files from disk.

 Best,
 Burak

 - Original Message -
 From: Макар Красноперов connector@gmail.com
 To: user@spark.apache.org
 Sent: Wednesday, September 17, 2014 7:37:49 AM
 Subject: Spark and disk usage.

 Hello everyone.

 The problem is that spark write data to the disk very hard, even if
 application has a lot of free memory (about 3.8g).
 So, I've noticed that folder with name like
 spark-local-20140917165839-f58c contains a lot of other folders with
 files like shuffle_446_0_1. The total size of files in the dir
 spark-local-20140917165839-f58c can reach 1.1g.
 Sometimes its size decreases (are there only temp files in that folder?),
 so the totally amount of data written to the disk is greater than 1.1g.

 The question is what kind of data Spark store there and can I make spark
 not to write it on the disk and just keep it in the memory if there is
 enough RAM free space?

 I run my job locally with Spark 1.0.1:
 ./bin/spark-submit --driver-memory 12g --master local[3] --properties-file
 conf/spark-defaults.conf --class my.company.Main /path/to/jar/myJob.jar

 spark-defaults.conf :
 spark.shuffle.spill false
 spark.reducer.maxMbInFlight 1024
 spark.shuffle.file.buffer.kb2048
 spark.storage.memoryFraction0.7

 The situation with disk usage is common for many jobs. I had also used ALS
 from MLIB and saw the similar things.

 I had reached no success by playing with spark configuration and i hope
 someone can help me :)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark and disk usage.

2014-09-17 Thread Burak Yavuz
Hi Andrew,

Yes, I'm referring to sparkContext.setCheckpointDir(). It has the same effect 
as in Spark Streaming. 
For example, in an algorithm like ALS, the RDDs go through many transformations 
and the lineage of the RDD starts to grow drastically just like 
the lineage of DStreams do in Spark Streaming. You may observe 
StackOverflowErrors in ALS if you set the number of iterations to be very high. 

If you set the checkpointing directory however, the intermediate state of the 
RDDs will be saved in HDFS, and the lineage will pick off from there. 
You won't need to keep the shuffle data before the checkpointed state, 
therefore those can be safely removed (will be removed automatically).
However, checkpoint must be called explicitly as in 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L291
 ,just setting the directory will not be enough.

Best,
Burak

- Original Message -
From: Andrew Ash and...@andrewash.com
To: Burak Yavuz bya...@stanford.edu
Cc: Макар Красноперов connector@gmail.com, user 
user@spark.apache.org
Sent: Wednesday, September 17, 2014 10:19:42 AM
Subject: Re: Spark and disk usage.

Hi Burak,

Most discussions of checkpointing in the docs is related to Spark
streaming.  Are you talking about the sparkContext.setCheckpointDir()?
 What effect does that have?

https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

On Wed, Sep 17, 2014 at 7:44 AM, Burak Yavuz bya...@stanford.edu wrote:

 Hi,

 The files you mentioned are temporary files written by Spark during
 shuffling. ALS will write a LOT of those files as it is a shuffle heavy
 algorithm.
 Those files will be deleted after your program completes as Spark looks
 for those files in case a fault occurs. Having those files ready allows
 Spark to
 continue from the stage the shuffle left off, instead of starting from the
 very beginning.

 Long story short, it's to your benefit that Spark writes those files to
 disk. If you don't want Spark writing to disk, you can specify a checkpoint
 directory in
 HDFS, where Spark will write the current status instead and will clean up
 files from disk.

 Best,
 Burak

 - Original Message -
 From: Макар Красноперов connector@gmail.com
 To: user@spark.apache.org
 Sent: Wednesday, September 17, 2014 7:37:49 AM
 Subject: Spark and disk usage.

 Hello everyone.

 The problem is that spark write data to the disk very hard, even if
 application has a lot of free memory (about 3.8g).
 So, I've noticed that folder with name like
 spark-local-20140917165839-f58c contains a lot of other folders with
 files like shuffle_446_0_1. The total size of files in the dir
 spark-local-20140917165839-f58c can reach 1.1g.
 Sometimes its size decreases (are there only temp files in that folder?),
 so the totally amount of data written to the disk is greater than 1.1g.

 The question is what kind of data Spark store there and can I make spark
 not to write it on the disk and just keep it in the memory if there is
 enough RAM free space?

 I run my job locally with Spark 1.0.1:
 ./bin/spark-submit --driver-memory 12g --master local[3] --properties-file
 conf/spark-defaults.conf --class my.company.Main /path/to/jar/myJob.jar

 spark-defaults.conf :
 spark.shuffle.spill false
 spark.reducer.maxMbInFlight 1024
 spark.shuffle.file.buffer.kb2048
 spark.storage.memoryFraction0.7

 The situation with disk usage is common for many jobs. I had also used ALS
 from MLIB and saw the similar things.

 I had reached no success by playing with spark configuration and i hope
 someone can help me :)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and disk usage.

2014-09-17 Thread Andrew Ash
Thanks for the info!

Are there performance impacts with writing to HDFS instead of local disk?
 I'm assuming that's why ALS checkpoints every third iteration instead of
every iteration.

Also I can imagine that checkpointing should be done every N shuffles
instead of every N operations (counting maps), since only the shuffle
leaves data on disk.  Do you have any suggestions on this?

We should write up some guidance on the use of checkpointing in the programming
guide https://spark.apache.org/docs/latest/programming-guide.html - I can
help with this

Andrew


Re: Short Circuit Local Reads

2014-09-17 Thread Matei Zaharia
I'm pretty sure it does help, though I don't have any numbers for it. In any 
case, Spark will automatically benefit from this if you link it to a version of 
HDFS that contains this.

Matei

On September 17, 2014 at 5:15:47 AM, Gary Malouf (malouf.g...@gmail.com) wrote:

Cloudera had a blog post about this in August 2013: 
http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/

Has anyone been using this in production - curious as to if it made a 
significant difference from a Spark perspective.

How to ship cython library to workers?

2014-09-17 Thread freedafeng
I have a library written in Cython and C. wondering if it can be shipped to
the workers which don't have cython installed. maybe create an egg package
from this library? how? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-ship-cython-library-to-workers-tp14467.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to group within the messages at a vertex?

2014-09-17 Thread spr
Sorry if this is in the docs someplace and I'm missing it.

I'm trying to implement label propagation in GraphX.  The core step of that
algorithm is 

- for each vertex, find the most frequent label among its neighbors and set
its label to that.

(I think) I see how to get the input from all the neighbors, but I don't see
how to group/reduce those to find the most frequent label.  

var G2 = G.mapVertices((id,attr) = id)
val perSrcCount: VertexRDD[(Long, Long)] = G2.mapReduceTriplets[(Long,
Long)](
  edge = Iterator((edge.dstAttr, (edge.srcAttr, 1))),
  (a,b) = ((a._1), (a._2 + b._2))   // this line seems broken
  )

It seems on the broken line above, I don't want to reduce all the values
to a scalar, as this code does, but rather group them first and then reduce
them.  Can I do that all within mapReduceTriples?  If not, how do I build
something that I can then further reduce?  

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-group-within-the-messages-at-a-vertex-tp14468.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming - batchDuration for streaming

2014-09-17 Thread alJune
hi estimated Sparkes,
I have some doubt about Streaming Context batchDuration parameter. 
I've already read excellent explication by Tathagata Das about difference
between batch  window duration.
But the issue seem a few confusion as for exam.- without any window
SparkStreaming plays as implicit window by batchDuration actions. 
So, 
1. What is simple strategy to get the correct value batchDuration? 
  My UCase-is Flume spooling directory source for new text file =
SparkStreaming analytic app.
  And one question more-
2. What is the reason for batching streaming? 
  I used BigInsights Streams and there is not this batch strategy as the
stream is the continuous   flow.You run Streams with one from many source
connectors to catch data input flow(stream).If you  need- you can work with
data on window frame.But anything case - your streams app gets(listens)
continuously stream/ flow.
Perhaps - I don't understand some important and powerful characteristic of
Spark Streaming architecture.  
Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-batchDuration-for-streaming-tp14469.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to group within the messages at a vertex?

2014-09-17 Thread Ankur Dave
At 2014-09-17 11:39:19 -0700, spr s...@yarcdata.com wrote:
 I'm trying to implement label propagation in GraphX.  The core step of that
 algorithm is

 - for each vertex, find the most frequent label among its neighbors and set
 its label to that.

 [...]

 It seems on the broken line above, I don't want to reduce all the values
 to a scalar, as this code does, but rather group them first and then reduce
 them.  Can I do that all within mapReduceTriples?  If not, how do I build
 something that I can then further reduce?

Label propagation is actually already implemented in GraphX [1]. The way it 
handles the most frequent label reduce operation is to aggregate a histogram, 
implemented as a map from label to frequency, and then take the most frequent 
element from the map at the end. Something to watch out for is that this can 
create large aggregation messages for high-degree vertices.

Ankur

[1] 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and disk usage.

2014-09-17 Thread Burak Yavuz
Yes, writing to HDFS is more expensive, but I feel it is still a small price to 
pay when compared to having a Disk Space Full error three hours in
and having to start from scratch.

The main goal of checkpointing is to truncate the lineage. Clearing up shuffle 
writes come as a bonus to checkpointing, it is not the main goal. The 
subtlety here is that .checkpoint() is just like .cache(). Until you call an 
action, nothing happens. Therefore, if you're going to do 1000 maps in a 
row and you don't want to checkpoint in the meantime until a shuffle happens, 
you will still get a StackOverflowError, because the lineage is too long.

I went through some of the code for checkpointing. As far as I can tell, it 
materializes the data in HDFS, and resets all its dependencies, so you start 
a fresh lineage. My understanding would be that checkpointing still should be 
done every N operations to reset the lineage. However, an action must be 
performed before the lineage grows too long.

I believe it would be nice to write up checkpointing in the programming guide. 
The reason that it's not there yet I believe is that most applications don't
grow such a long lineage, except in Spark Streaming, and some MLlib algorithms. 
If you can help with the guide, I think it would be a nice feature to have!

Burak


- Original Message -
From: Andrew Ash and...@andrewash.com
To: Burak Yavuz bya...@stanford.edu
Cc: Макар Красноперов connector@gmail.com, user 
user@spark.apache.org
Sent: Wednesday, September 17, 2014 11:04:02 AM
Subject: Re: Spark and disk usage.

Thanks for the info!

Are there performance impacts with writing to HDFS instead of local disk?
 I'm assuming that's why ALS checkpoints every third iteration instead of
every iteration.

Also I can imagine that checkpointing should be done every N shuffles
instead of every N operations (counting maps), since only the shuffle
leaves data on disk.  Do you have any suggestions on this?

We should write up some guidance on the use of checkpointing in the programming
guide https://spark.apache.org/docs/latest/programming-guide.html - I can
help with this

Andrew


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OutOfMemoryError with basic kmeans

2014-09-17 Thread st553
Not sure if you resolved this but I had a similar issue and resolved it. In
my case, the problem was the ids of my items were of type Long and could be
very large (even though there are only a small number of distinct ids...
maybe a few hundred of them). KMeans will create a dense vector for the
cluster centers so its important that the dimensionality not be huge. I had
to map my ids to a smaller space and it worked fine. The mapping was
something like...
1001223412 - 1
1006591779 - 2
1011232423 - 3
...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-with-basic-kmeans-tp1651p14472.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to run kmeans after pca?

2014-09-17 Thread st553
I would like to reduce the dimensionality of my data before running kmeans.
The problem I'm having is that both RowMatrix.computePrincipalComponents()
and RowMatrix.computeSVD() return a DenseMatrix whereas KMeans.train()
requires an RDD[Vector]. Does MLlib provide a way to do this conversion?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-kmeans-after-pca-tp14473.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: partitioned groupBy

2014-09-17 Thread Akshat Aranya
Patrick,

If I understand this correctly, I won't be able to do this in the closure
provided to mapPartitions() because that's going to be stateless, in the
sense that a hash map that I create within the closure would only be useful
for one call of MapPartitionsRDD.compute().  I guess I would need to
override mapPartitions() directly within my RDD.  Right?

On Tue, Sep 16, 2014 at 4:57 PM, Patrick Wendell pwend...@gmail.com wrote:

 If each partition can fit in memory, you can do this using
 mapPartitions and then building an inverse mapping within each
 partition. You'd need to construct a hash map within each partition
 yourself.

 On Tue, Sep 16, 2014 at 4:27 PM, Akshat Aranya aara...@gmail.com wrote:
  I have a use case where my RDD is set up such:
 
  Partition 0:
  K1 - [V1, V2]
  K2 - [V2]
 
  Partition 1:
  K3 - [V1]
  K4 - [V3]
 
  I want to invert this RDD, but only within a partition, so that the
  operation does not require a shuffle.  It doesn't matter if the
 partitions
  of the inverted RDD have non unique keys across the partitions, for
 example:
 
  Partition 0:
  V1 - [K1]
  V2 - [K1, K2]
 
  Partition 1:
  V1 - [K3]
  V3 - [K4]
 
  Is there a way to do only a per-partition groupBy, instead of shuffling
 the
  entire data?
 



Re: Stable spark streaming app

2014-09-17 Thread Tim Smith
I don't have anything in production yet but I now at least have a
stable (running for more than 24 hours) streaming app. Earlier, the
app would crash for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each
node (Yarn managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This
might be more of a CDH issue. I noticed that while the Yarn container
and other Spark ancillary tasks had GC options set at launch but none
for the executors. So I played with different GC options and this
worked best:
SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
-XX:+PrintGCDetails

I tried G1GC but for some reason it just didn't work. I am not a Java
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the stdout file in the Yarn
container logs on each node/worker. You can set SPARK_JAVA_OPTS in
spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
specifically, even though you don't run Spark as a service (since you
are using Yarn for RM), you can goto Spark Client Advanced
Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
set SPARK_JAVA_OPTS there.

- Set these two params - spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
because the executors running the kafka receivers would get killed by
Yarn for over utilization of memory. Now, these are my memory settings
(I will paste the entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \

Your total executor JVM will consume executor-memory minus
spark.yarn.executor.memoryOverhead so you should see each executor
JVM consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +%m-%d-%YT%T`; \
nohup spark-submit --class myAwesomeApp \
--master yarn myawesomeapp.jar \
--jars 
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 64 \
--spark.akka.frameSize 500 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \
--spark.shuffle.consolidateFiles true \
--spark.default.parallelism 528 \
logs/normRunLog-$run.log \
2logs/normRunLogError-$run.log  \
echo $!  logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not
scientifically measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as
you see above)
- I map one receive task for each kafka partition
- Instead of doing a union on all the incoming streams and then
repartition() I now repartition() each incoming stream and process
them separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a
union on all incoming dStreams. I now repartition each of the five
streams separately (in a loop) to 24.
- For each RDD, I set storagelevel to MEMORY_AND_DISK_SER
- Process data per partition instead of per RDD: dataout.foreachRDD(
rdd = rdd.foreachPartition(rec = { myFunc(rec) }) )
- Specific to kafka: when I create new Producer, make sure I close
it else I had a ton of too many files open errors :)
- Use immutable objects as far as possible. If I use mutable objects
within a method/class then I turn them into immutable before passing
onto another class/method.
- For logging, create a LogService object that I then use for other
object/class declarations. Once instantiated, I can make logInfo
calls from within other Objects/Methods/Classes and output goes to the
stderr file in the Yarn container logs. Good for debugging stream
processing logic.

Currently, my processing delay is lower than my dStream time window so
all is good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
threw an exception

These seem related to: https://issues.apache.org/jira/browse/SPARK-2316

Best I understand and have been told, this does not affect data
integrity but may cause un-necessary recomputes.

Hope this helps,

Tim


On Wed, Sep 17, 2014 

Re: Stable spark streaming app

2014-09-17 Thread Soumitra Kumar
Thanks Tim for the detailed email, would help me a lot.

I have:

. 3 nodes CDH5.1 cluster with 16G memory.

. Majority of code in Scala, some part in Java (using Cascading earlier).

. Inputs are bunch of textFileStream directories.

. Every batch output is going to Parquet files, and to HBase
  Had to set export 
SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar fix 
some issue during class loading.
  For some reason 'spark.driver.extraClassPath' did not work.

. I started doing union pretty early in the flow, but did not work because all 
of my classes are not serializable. Each DStream in isolation works but if I 
union them early, then got into serialization issues.

. Took a while to find the log directory, /run/spark/work .

. Deploying at --master spark://mymachine:7077

. Modified '/etc/spark/conf.cloudera.spark/log4j.properties' for logging

. Currently I cannot process  1G file with this configuration. I tried various 
things but could not succeed yet.

- Original Message -
From: Tim Smith secs...@gmail.com
Cc: spark users user@spark.apache.org
Sent: Wednesday, September 17, 2014 1:11:12 PM
Subject: Re: Stable spark streaming app

I don't have anything in production yet but I now at least have a
stable (running for more than 24 hours) streaming app. Earlier, the
app would crash for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each
node (Yarn managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This
might be more of a CDH issue. I noticed that while the Yarn container
and other Spark ancillary tasks had GC options set at launch but none
for the executors. So I played with different GC options and this
worked best:
SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
-XX:+PrintGCDetails

I tried G1GC but for some reason it just didn't work. I am not a Java
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the stdout file in the Yarn
container logs on each node/worker. You can set SPARK_JAVA_OPTS in
spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
specifically, even though you don't run Spark as a service (since you
are using Yarn for RM), you can goto Spark Client Advanced
Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
set SPARK_JAVA_OPTS there.

- Set these two params - spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
because the executors running the kafka receivers would get killed by
Yarn for over utilization of memory. Now, these are my memory settings
(I will paste the entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \

Your total executor JVM will consume executor-memory minus
spark.yarn.executor.memoryOverhead so you should see each executor
JVM consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +%m-%d-%YT%T`; \
nohup spark-submit --class myAwesomeApp \
--master yarn myawesomeapp.jar \
--jars 
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 64 \
--spark.akka.frameSize 500 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
--spark.yarn.executor.memoryOverhead 4096 \
--spark.yarn.driver.memoryOverhead 1024 \
--spark.shuffle.consolidateFiles true \
--spark.default.parallelism 528 \
logs/normRunLog-$run.log \
2logs/normRunLogError-$run.log  \
echo $!  logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not
scientifically measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as
you see above)
- I map one receive task for each kafka partition
- Instead of doing a union on all the incoming streams and then
repartition() I now repartition() each incoming stream and process
them separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a
union on all incoming dStreams. I now repartition each of the five
streams separately (in a loop) to 24.
- For each RDD, 

RE: Change RDDs using map()

2014-09-17 Thread qihong
if you want the result as RDD of (key, 1)

  new_rdd = rdd.filter(x = x._2 == 1)

if you want result as RDD of keys (since you know the values are 1), then

  new_rdd = rdd.filter(x = x._2 == 1).map(x = x._1)

x._1 and x._2 are the way of scala to access the key and value from
key/value pair.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Change-RDDs-using-map-tp14436p14481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dealing with Time Series Data

2014-09-17 Thread qihong
what are you trying to do? generate time series from your data in HDFS, or
doing
some transformation and/or aggregation from your time series data in HDFS?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dealing-with-Time-Series-Data-tp14275p14482.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Size exceeds Integer.MAX_VALUE in BlockFetcherIterator

2014-09-17 Thread francisco
Hi,

We are running aggregation on a huge data set (few billion rows).
While running the task got the following error (see below). Any ideas?
Running spark 1.1.0 on cdh distribution.

...
14/09/17 13:33:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0).
2083 bytes result sent to driver
14/09/17 13:33:30 INFO CoarseGrainedExecutorBackend: Got assigned task 1
14/09/17 13:33:30 INFO Executor: Running task 0.0 in stage 2.0 (TID 1)
14/09/17 13:33:30 INFO TorrentBroadcast: Started reading broadcast variable
2
14/09/17 13:33:30 INFO MemoryStore: ensureFreeSpace(1428) called with
curMem=163719, maxMem=34451478282
14/09/17 13:33:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 1428.0 B, free 32.1 GB)
14/09/17 13:33:30 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/09/17 13:33:30 INFO TorrentBroadcast: Reading broadcast variable 2 took
0.027374294 s
14/09/17 13:33:30 INFO MemoryStore: ensureFreeSpace(2336) called with
curMem=165147, maxMem=34451478282
14/09/17 13:33:30 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 2.3 KB, free 32.1 GB)
14/09/17 13:33:30 INFO MapOutputTrackerWorker: Updating epoch to 1 and
clearing cache
14/09/17 13:33:30 INFO MapOutputTrackerWorker: Don't have map outputs for
shuffle 1, fetching them
14/09/17 13:33:30 INFO MapOutputTrackerWorker: Doing the fetch; tracker
actor =
Actor[akka.tcp://sparkdri...@sas-model1.pv.sv.nextag.com:56631/user/MapOutputTracker#794212052]
14/09/17 13:33:30 INFO MapOutputTrackerWorker: Got the output locations
14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 1 non-empty blocks out of 1 blocks
14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote fetches in 8 ms
14/09/17 13:33:30 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
Error occurred while fetching local blocks
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104)
at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:120)
at
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:358)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:208)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:205)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:205)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:240)
at
org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:583)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:77)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:41)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/09/17 13:33:30 INFO CoarseGrainedExecutorBackend: Got assigned task 2
14/09/17 13:33:30 INFO Executor: Running task 0.0 in stage 1.1 (TID 2)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-in-BlockFetcherIterator-tp14483.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Stable spark streaming app

2014-09-17 Thread abraham.jacob
Nice write-up... very helpful!


-Original Message-
From: Tim Smith [mailto:secs...@gmail.com] 
Sent: Wednesday, September 17, 2014 1:11 PM
Cc: spark users
Subject: Re: Stable spark streaming app

I don't have anything in production yet but I now at least have a stable 
(running for more than 24 hours) streaming app. Earlier, the app would crash 
for all sorts of reasons. Caveats/setup:
- Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
- Yarn for RM
- Input and Output to Kafka
- CDH 5.1
- 11 node cluster with 32-cores and 48G max container size for each node (Yarn 
managed)
- 5 partition Kafka topic - both in and out
- Roughly, an average of 25k messages per second
- App written in Scala (warning: I am a Scala noob)

Few things I had to add/tweak to get the app to be stable:
- The executor JVMs did not have any GC options set, by default. This might be 
more of a CDH issue. I noticed that while the Yarn container and other Spark 
ancillary tasks had GC options set at launch but none for the executors. So I 
played with different GC options and this worked best:
SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=70
-XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails

I tried G1GC but for some reason it just didn't work. I am not a Java 
programmer or expert so my conclusion is purely trial and error based.
The GC logs, with these flags, go to the stdout file in the Yarn container 
logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the 
driver node and Yarn will respect these. On CDH/CM specifically, even though 
you don't run Spark as a service (since you are using Yarn for RM), you can 
goto Spark Client Advanced Configuration Snippet (Safety Valve) for 
spark-conf/spark-env.sh and set SPARK_JAVA_OPTS there.

- Set these two params - spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead. Earlier, my app would get killed because 
the executors running the kafka receivers would get killed by Yarn for over 
utilization of memory. Now, these are my memory settings (I will paste the 
entire app launch params later in the email):
--driver-memory 2G \
--executor-memory 16G \
--spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 
1024 \

Your total executor JVM will consume executor-memory minus 
spark.yarn.executor.memoryOverhead so you should see each executor JVM 
consuming no more than 12G, in this case.

Here is how I launch my app:
run=`date +%m-%d-%YT%T`; \
nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ 
--jars 
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 2G \
--executor-memory 16G \
--executor-cores 16 \
--num-executors 10 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \ 
--spark.rdd.compress true \ --spark.io.compression.codec 
org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ 
--spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ 
--spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ 
--spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true 
\ --spark.default.parallelism 528 \
logs/normRunLog-$run.log \
2logs/normRunLogError-$run.log  \
echo $!  logs/current-run.pid

Some code optimizations (or, goof ups that I fixed). I did not scientifically 
measure the impact of each but I think they helped:
- Made all my classes and objects serializable and then use Kryo (as you see 
above)
- I map one receive task for each kafka partition
- Instead of doing a union on all the incoming streams and then
repartition() I now repartition() each incoming stream and process them 
separately. I believe this reduces shuffle.
- Reduced number of repartitions. I was doing 128 after doing a union on all 
incoming dStreams. I now repartition each of the five streams separately (in a 
loop) to 24.
- For each RDD, I set storagelevel to MEMORY_AND_DISK_SER
- Process data per partition instead of per RDD: dataout.foreachRDD( rdd = 
rdd.foreachPartition(rec = { myFunc(rec) }) )
- Specific to kafka: when I create new Producer, make sure I close
it else I had a ton of too many files open errors :)
- Use immutable objects as far as possible. If I use mutable objects within a 
method/class then I turn them into immutable before passing onto another 
class/method.
- For logging, create a LogService object that I then use for other 
object/class declarations. Once instantiated, I can make logInfo
calls from within other Objects/Methods/Classes and output goes to the stderr 
file in the Yarn container logs. Good for debugging stream processing logic.

Currently, my processing delay is lower than my dStream time window so all is 
good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener threw an 

Re: partitioned groupBy

2014-09-17 Thread Patrick Wendell
If you'd like to re-use the resulting inverted map, you can persist the result:

x = myRdd.mapPartitions(create inverted map).persist()

Your function would create the reverse map and then return an iterator
over the keys in that map.


On Wed, Sep 17, 2014 at 1:04 PM, Akshat Aranya aara...@gmail.com wrote:
 Patrick,

 If I understand this correctly, I won't be able to do this in the closure
 provided to mapPartitions() because that's going to be stateless, in the
 sense that a hash map that I create within the closure would only be useful
 for one call of MapPartitionsRDD.compute().  I guess I would need to
 override mapPartitions() directly within my RDD.  Right?

 On Tue, Sep 16, 2014 at 4:57 PM, Patrick Wendell pwend...@gmail.com wrote:

 If each partition can fit in memory, you can do this using
 mapPartitions and then building an inverse mapping within each
 partition. You'd need to construct a hash map within each partition
 yourself.

 On Tue, Sep 16, 2014 at 4:27 PM, Akshat Aranya aara...@gmail.com wrote:
  I have a use case where my RDD is set up such:
 
  Partition 0:
  K1 - [V1, V2]
  K2 - [V2]
 
  Partition 1:
  K3 - [V1]
  K4 - [V3]
 
  I want to invert this RDD, but only within a partition, so that the
  operation does not require a shuffle.  It doesn't matter if the
  partitions
  of the inverted RDD have non unique keys across the partitions, for
  example:
 
  Partition 0:
  V1 - [K1]
  V2 - [K1, K2]
 
  Partition 1:
  V1 - [K3]
  V3 - [K4]
 
  Is there a way to do only a per-partition groupBy, instead of shuffling
  the
  entire data?
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Size exceeds Integer.MAX_VALUE in BlockFetcherIterator

2014-09-17 Thread Burak Yavuz
Hi,

Could you try repartitioning the data by .repartition(# of cores on machine) or 
while reading the data, supply the number of minimum partitions as in
sc.textFile(path, # of cores on machine).

It may be that the whole data is stored in one block? If it is billions of 
rows, then the indexing probably will not work giving the exceeds 
Integer.MAX_VALUE error.

Best,
Burak

- Original Message -
From: francisco ftanudj...@nextag.com
To: u...@spark.incubator.apache.org
Sent: Wednesday, September 17, 2014 3:18:29 PM
Subject: Size exceeds Integer.MAX_VALUE in BlockFetcherIterator

Hi,

We are running aggregation on a huge data set (few billion rows).
While running the task got the following error (see below). Any ideas?
Running spark 1.1.0 on cdh distribution.

...
14/09/17 13:33:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0).
2083 bytes result sent to driver
14/09/17 13:33:30 INFO CoarseGrainedExecutorBackend: Got assigned task 1
14/09/17 13:33:30 INFO Executor: Running task 0.0 in stage 2.0 (TID 1)
14/09/17 13:33:30 INFO TorrentBroadcast: Started reading broadcast variable
2
14/09/17 13:33:30 INFO MemoryStore: ensureFreeSpace(1428) called with
curMem=163719, maxMem=34451478282
14/09/17 13:33:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 1428.0 B, free 32.1 GB)
14/09/17 13:33:30 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/09/17 13:33:30 INFO TorrentBroadcast: Reading broadcast variable 2 took
0.027374294 s
14/09/17 13:33:30 INFO MemoryStore: ensureFreeSpace(2336) called with
curMem=165147, maxMem=34451478282
14/09/17 13:33:30 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 2.3 KB, free 32.1 GB)
14/09/17 13:33:30 INFO MapOutputTrackerWorker: Updating epoch to 1 and
clearing cache
14/09/17 13:33:30 INFO MapOutputTrackerWorker: Don't have map outputs for
shuffle 1, fetching them
14/09/17 13:33:30 INFO MapOutputTrackerWorker: Doing the fetch; tracker
actor =
Actor[akka.tcp://sparkdri...@sas-model1.pv.sv.nextag.com:56631/user/MapOutputTracker#794212052]
14/09/17 13:33:30 INFO MapOutputTrackerWorker: Got the output locations
14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 1 non-empty blocks out of 1 blocks
14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote fetches in 8 ms
14/09/17 13:33:30 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
Error occurred while fetching local blocks
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104)
at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:120)
at
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:358)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:208)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:205)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:205)
at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:240)
at
org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:583)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:77)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:41)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/09/17 13:33:30 INFO 

Re: Spark Streaming - batchDuration for streaming

2014-09-17 Thread qihong
Here's official spark document about batch size/interval:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-size

spark is batch oriented processing. As you mentioned, the streaming
is continuous flow, and core spark can not handle it. 

Spark streaming bridges the gap between the continuous flow and 
batch oriented processing. It generates an RDD from continuous
data flow/stream every batch interval, then the spark can process
them as normal RDDs.
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-batchDuration-for-streaming-tp14469p14487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SPARK BENCHMARK TEST

2014-09-17 Thread VJ Shalish
 I am trying to benchmark spark in a hadoop cluster.
I need to design a sample spark job to test the CPU utilization, RAM usage,
Input throughput, Output throughput and Duration of execution in the
cluster.

I need to test the state of the cluster for :-

A spark job which uses high CPU
A spark job which uses high RAM
A spark job which uses high Input throughput
A spark job which uses high Output throughput
A spark job which takes long time.

These have to be tested individually and a combination of these scenarios
would also be used.

Please help me in understanding the factors of a Spark Job which would
contribute to  CPU utilization, RAM usage, Input throughput, Output
throughput, Duration of execution in the cluster. So that I can design
spark jobs that could be used for testing.



Thanks
Shalish.


Re: Stable spark streaming app

2014-09-17 Thread Tim Smith
Thanks :)

On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais pw...@yelp.com wrote:
 Thanks Tim, this is super helpful!

 Question about jars and spark-submit:  why do you provide
 myawesomeapp.jar as the program jar but then include other jars via
 the --jars argument?  Have you tried building one uber jar with all
 dependencies and just sending that to Spark as your app jar?

I guess that is mostly because I am Scala/sbt noob :) How do I create
the uber jar? My .sbt file says:
name := My Awesome App
version := 1.025
scalaVersion := 2.10.4
resolvers += Apache repo at
https://repository.apache.org/content/repositories/releases;
libraryDependencies += org.apache.spark %% spark-core % 1.0.0
libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0
libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.0.0
libraryDependencies += org.apache.kafka %% kafka % 0.8.1.1

Then I run sbt package to generate myawesomeapp.jar.


 Also, have you ever seen any issues with Spark caching your app jar
 between runs even if it changes?

Not that I can tell but then maybe because I use Yarn, I might be
shielded from some jar distribution bugs in Spark?



 On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith secs...@gmail.com wrote:
 I don't have anything in production yet but I now at least have a
 stable (running for more than 24 hours) streaming app. Earlier, the
 app would crash for all sorts of reasons. Caveats/setup:
 - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
 - Yarn for RM
 - Input and Output to Kafka
 - CDH 5.1
 - 11 node cluster with 32-cores and 48G max container size for each
 node (Yarn managed)
 - 5 partition Kafka topic - both in and out
 - Roughly, an average of 25k messages per second
 - App written in Scala (warning: I am a Scala noob)

 Few things I had to add/tweak to get the app to be stable:
 - The executor JVMs did not have any GC options set, by default. This
 might be more of a CDH issue. I noticed that while the Yarn container
 and other Spark ancillary tasks had GC options set at launch but none
 for the executors. So I played with different GC options and this
 worked best:
 SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
 -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
 -XX:+PrintGCDetails

 I tried G1GC but for some reason it just didn't work. I am not a Java
 programmer or expert so my conclusion is purely trial and error based.
 The GC logs, with these flags, go to the stdout file in the Yarn
 container logs on each node/worker. You can set SPARK_JAVA_OPTS in
 spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
 specifically, even though you don't run Spark as a service (since you
 are using Yarn for RM), you can goto Spark Client Advanced
 Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
 set SPARK_JAVA_OPTS there.

 - Set these two params - spark.yarn.executor.memoryOverhead
 spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
 because the executors running the kafka receivers would get killed by
 Yarn for over utilization of memory. Now, these are my memory settings
 (I will paste the entire app launch params later in the email):
 --driver-memory 2G \
 --executor-memory 16G \
 --spark.yarn.executor.memoryOverhead 4096 \
 --spark.yarn.driver.memoryOverhead 1024 \

 Your total executor JVM will consume executor-memory minus
 spark.yarn.executor.memoryOverhead so you should see each executor
 JVM consuming no more than 12G, in this case.

 Here is how I launch my app:
 run=`date +%m-%d-%YT%T`; \
 nohup spark-submit --class myAwesomeApp \
 --master yarn myawesomeapp.jar \
 --jars 
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
 \
 --driver-memory 2G \
 --executor-memory 16G \
 --executor-cores 16 \
 --num-executors 10 \
 --spark.serializer org.apache.spark.serializer.KryoSerializer \
 --spark.rdd.compress true \
 --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
 --spark.akka.threads 64 \
 --spark.akka.frameSize 500 \
 --spark.task.maxFailures 64 \
 --spark.scheduler.mode FAIR \
 --spark.yarn.executor.memoryOverhead 4096 \
 --spark.yarn.driver.memoryOverhead 1024 \
 --spark.shuffle.consolidateFiles true \
 --spark.default.parallelism 528 \
logs/normRunLog-$run.log \
 2logs/normRunLogError-$run.log  \
 echo $!  logs/current-run.pid

 Some code optimizations (or, goof ups that I fixed). I did not
 scientifically measure the impact of each but I think they helped:
 - Made all my classes and objects serializable and then use Kryo (as
 you see above)
 - I map one receive task for each kafka partition
 - Instead of doing a union on all the incoming streams and then
 repartition() I now repartition() each incoming stream and process
 them separately. I believe this reduces shuffle.
 - Reduced number of repartitions. I was doing 128 after 

spark-1.1.0-bin-hadoop2.4 java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass

2014-09-17 Thread Andy Davidson
Hi I am new to spark.

I am trying to write a simple java program that process tweets that where
collected and stored in a file. I figured the simplest thing to do would be
to convert the JSON string into a java map. When I submit my jar file I keep
getting the following error

java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass


For the life of me I can not figure out what the problem is. I am using
maven shade plugin and check to see that the missing class is my uber jar

Any suggestions would be greatly appreciated.

Andy

P.s. I should mention that I am running everything on my local machine.

properties

jackson.version1.8.8/jackson.version !-- 1.9.13 --

project.build.sourceEncodingUTF-8/project.build.sourceEncoding

/properties



dependencies

dependency

groupIdorg.codehaus.jackson/groupId

artifactIdjackson-core-asl/artifactId

version${jackson.version}/version

/dependency

dependency

groupIdorg.codehaus.jackson/groupId

artifactIdjackson-mapper-asl/artifactId

version${jackson.version}/version

/dependency



I am using shade to build an uber jar



$jar ­tvf target/myUber.jar

Š

   580 Wed Sep 17 16:17:36 PDT 2014
org/codehaus/jackson/annotate/JsonClass.class

Š




14/09/17 16:13:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass

at 
org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeseri
alizationType(JacksonAnnotationIntrospector.java:524)

at 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotati
on(BasicDeserializerFactory.java:732)

at 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.createMapDeserialize
r(BasicDeserializerFactory.java:337)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(S
tdDeserializerProvider.java:377)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdD
eserializerProvider.java:307)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueD
eserializer(StdDeserializerProvider.java:287)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer
(StdDeserializerProvider.java:136)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeseria
lizer(StdDeserializerProvider.java:157)

at 
org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.jav
a:2468)

at 
org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:240
2)

at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1609)

at com.santacruzintegration.spark.JSONHelper.toJsonMap(JSONHelper.java:40)





public class JSONHelper {

public static final ObjectMapper mapper = new ObjectMapper();

static {

mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);

//mapper.configure(Feature.USE_ANNOTATIONS, false);

}

Š

   public static MapString, String toJsonMap(String json) {

System.err.println(AEDWIP toJsonMap:  + json);

try {

TypeReferenceHashMapString, String typeRef = new
TypeReferenceHashMapString, String() {

};

//return mapper.readValue(json, new
TypeReferenceHashMapString,String(){});

return mapper.readValue(json, typeRef);

//return mapper.convertValue(json, typeRef);



//HashMapString,String ret =

//new ObjectMapper().readValue(json, HashMap.class);

} catch (Exception e) {

throw new IOError(e);

}

}









problem with HiveContext inside Actor

2014-09-17 Thread Du Li
Hi,

Wonder anybody had similar experience or any suggestion here.

I have an akka Actor that processes database requests in high-level messages. 
Inside this Actor, it creates a HiveContext object that does the actual db 
work. The main thread creates the needed SparkContext and passes in to the 
Actor to create the HiveContext.

When a message is sent to the Actor, it is processed properly except that, when 
the message triggers the HiveContext to create a database, it throws a 
NullPointerException in hive.ql.Driver.java which suggests that its conf 
variable is not initialized.

Ironically, it works fine if my main thread directly calls actor.hiveContext to 
create the database. The spark version is 1.1.0.

Thanks,
Du


Re: permission denied on local dir

2014-09-17 Thread style95
More clearly, that yarn cluster is managed by other team.
That means, I do not have any permission to change the system.

If required, I can request to them, 
but as of now, I only have permission to manage my spark application.

So if there are any way to solve this problem by changing configuration in
spark application, it will be the best.
However, if configuration in yarn layer is required, I need to ask to
managing team.

Thanks
Regards
Dongkyoung.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Permission-denied-on-local-dir-tp14422p14492.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LZO support in Spark 1.0.0 - nothing seems to work

2014-09-17 Thread Vipul Pandey
It works for me : 


export 
JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/native
export 
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/native

export 
SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/native
export 
SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar


I hope you are adding this to the code : 

val conf = sc.hadoopConfiguration
conf.set(io.compression.codecs,com.hadoop.compression.lzo.LzopCodec)



Vipul

On Sep 17, 2014, at 5:40 PM, rogthefrog ro...@amino.com wrote:

 I have a HDFS cluster managed with CDH Manager. Version is CDH 5.1 with
 matching GPLEXTRAS parcel. LZO works with Hive and Pig, but I can't make it
 work with Spark 1.0.0. I've tried:
 
 * Setting this:
 
 HADOOP_OPTS=-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS
 -Djava.library.path=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native/
 
 * Setting this in spark-env.sh. I tried with and without export. I tried
 in CDH Manager and manually on the host.
 
 export
 SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/hadoop-lzo.jar
 export
 SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native/
 
 * Setting this in /etc/spark/conf/spark-defaults.conf:
 
 spark.executor.extraLibraryPath 
 /opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native
 spark.spark.executor.extraClassPath
 /opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/hadoop-lzo.jar
 
 * Adding this in CDH manager:
 
 export LD_LIBRARY_PATH=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native
 
 * Hardcoding
 -Djava.library.path=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native in
 the Spark command 
 
 * Symlinking the gpl compression binaries into
 /opt/cloudera/parcels/CDH/lib/hadoop/lib/native
 
 * Symlinking the gpl compression binaries into /usr/lib
 
 And nothing worked. When I run pyspark I get this:
 
 14/09/17 20:38:54 WARN util.NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 
 and when I try to run a simple job on a LZO file in HDFS I get this:
 
 distFile.count()
 14/09/17 13:51:54 ERROR GPLNativeCodeLoader: Could not load native gpl
 library
 java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
   at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886)
   at java.lang.Runtime.loadLibrary0(Runtime.java:849)
   at java.lang.System.loadLibrary(System.java:1088)
   at
 com.hadoop.compression.lzo.GPLNativeCodeLoader.clinit(GPLNativeCodeLoader.java:32)
   at com.hadoop.compression.lzo.LzoCodec.clinit(LzoCodec.java:71)
 
 Can anybody help please? Many thanks.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/LZO-support-in-Spark-1-0-0-nothing-seems-to-work-tp14494.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Huge matrix

2014-09-17 Thread Debasish Das
Hi Reza,

In similarColumns, it seems with cosine similarity I also need other
numbers such as intersection, jaccard and other measures...

Right now I modified the code to generate jaccard but I had to run it twice
due to the design of RowMatrix / CoordinateMatrix...I feel we should modify
RowMatrix and CoordinateMatrix to be templated on the value...

Are you considering this in your design ?

Thanks.
Deb


On Tue, Sep 9, 2014 at 9:45 AM, Reza Zadeh r...@databricks.com wrote:

 Better to do it in a PR of your own, it's not sufficiently related to
 dimsum

 On Tue, Sep 9, 2014 at 7:03 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Cool...can I add loadRowMatrix in your PR ?

 Thanks.
 Deb

 On Tue, Sep 9, 2014 at 1:14 AM, Reza Zadeh r...@databricks.com wrote:

 Hi Deb,

 Did you mean to message me instead of Xiangrui?

 For TS matrices, dimsum with positiveinfinity and computeGramian have
 the same cost, so you can do either one. For dense matrices with say, 1m
 columns this won't be computationally feasible and you'll want to start
 sampling with dimsum.

 It would be helpful to have a loadRowMatrix function, I would use it.

 Best,
 Reza

 On Tue, Sep 9, 2014 at 12:05 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Hi Xiangrui,

 For tall skinny matrices, if I can pass a similarityMeasure to
 computeGrammian, I could re-use the SVD's computeGrammian for similarity
 computation as well...

 Do you recommend using this approach for tall skinny matrices or just
 use the dimsum's routines ?

 Right now RowMatrix does not have a loadRowMatrix function like the one
 available in LabeledPoint...should I add one ? I want to export the matrix
 out from my stable code and then test dimsum...

 Thanks.
 Deb



 On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh r...@databricks.com wrote:

 I will add dice, overlap, and jaccard similarity in a future PR,
 probably still for 1.2


 On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das debasish.da...@gmail.com
  wrote:

 Awesome...Let me try it out...

 Any plans of putting other similarity measures in future (jaccard is
 something that will be useful) ? I guess it makes sense to add some
 similarity measures in mllib...


 On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh r...@databricks.com
 wrote:

 Yes you're right, calling dimsum with gamma as PositiveInfinity
 turns it into the usual brute force algorithm for cosine similarity, 
 there
 is no sampling. This is by design.


 On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das 
 debasish.da...@gmail.com wrote:

 I looked at the code: similarColumns(Double.posInf) is generating
 the brute force...

 Basically dimsum with gamma as PositiveInfinity will produce the
 exact same result as doing catesian products of RDD[(product, vector)] 
 and
 computing similarities or there will be some approximation ?

 Sorry I have not read your paper yet. Will read it over the weekend.



 On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh r...@databricks.com
 wrote:

 For 60M x 10K brute force and dimsum thresholding should be fine.

 For 60M x 10M probably brute force won't work depending on the
 cluster's power, and dimsum thresholding should work with appropriate
 threshold.

 Dimensionality reduction should help, and how effective it is will
 depend on your application and domain, it's worth trying if the direct
 computation doesn't work.

 You can also try running KMeans clustering (perhaps after
 dimensionality reduction) if your goal is to find batches of similar 
 points
 instead of all pairs above a threshold.




 On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das 
 debasish.da...@gmail.com wrote:

 Also for tall and wide (rows ~60M, columns 10M), I am considering
 running a matrix factorization to reduce the dimension to say ~60M x 
 50 and
 then run all pair similarity...

 Did you also try similar ideas and saw positive results ?



 On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das 
 debasish.da...@gmail.com wrote:

 Ok...just to make sure I have RowMatrix[SparseVector] where rows
 are ~ 60M and columns are 10M say with billion data points...

 I have another version that's around 60M and ~ 10K...

 I guess for the second one both all pair and dimsum will run
 fine...

 But for tall and wide, what do you suggest ? can dimsum handle
 it ?

 I might need jaccard as well...can I plug that in the PR ?



 On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com
 wrote:

 You might want to wait until Wednesday since the interface will
 be changing in that PR before Wednesday, probably over the 
 weekend, so that
 you don't have to redo your code. Your call if you need it before 
 a week.
 Reza


 On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das 
 debasish.da...@gmail.com wrote:

 Ohh coolall-pairs brute force is also part of this PR ?
 Let me pull it in and test on our dataset...

 Thanks.
 Deb


 On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh 
 r...@databricks.com wrote:

 Hi Deb,

 We are adding all-pairs and thresholded all-pairs via dimsum
 in this PR: 

RE: problem with HiveContext inside Actor

2014-09-17 Thread Cheng, Hao
Hi, Du
I am not sure what you mean triggers the HiveContext to create a database, do 
you create the sub class of HiveContext? Just be sure you call the 
HiveContext.sessionState eagerly, since it will set the proper hiveconf 
into the SessionState, otherwise the HiveDriver will always get the null value 
when retrieving HiveConf.

Cheng Hao

From: Du Li [mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, September 18, 2014 7:51 AM
To: user@spark.apache.org; d...@spark.apache.org
Subject: problem with HiveContext inside Actor

Hi,

Wonder anybody had similar experience or any suggestion here.

I have an akka Actor that processes database requests in high-level messages. 
Inside this Actor, it creates a HiveContext object that does the actual db 
work. The main thread creates the needed SparkContext and passes in to the 
Actor to create the HiveContext.

When a message is sent to the Actor, it is processed properly except that, when 
the message triggers the HiveContext to create a database, it throws a 
NullPointerException in hive.ql.Driver.java which suggests that its conf 
variable is not initialized.

Ironically, it works fine if my main thread directly calls actor.hiveContext to 
create the database. The spark version is 1.1.0.

Thanks,
Du


Re: Cannot run unit test.

2014-09-17 Thread Jies
When I run `sbt test-only SparkTest` or `sbt test-only SparkTest1`, it
was pass. But run `set test` to tests SparkTest and SparkTest1, it was
failed.

If merge all cases into one file, the test was pass.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-run-unit-test-tp14459p14506.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MLLib: LIBSVM issue

2014-09-17 Thread Sameer Tilak
Hi All,We have a fairly large amount of sparse data. I was following the 
following instructions in the manual:
Sparse dataIt is very common in practice to have sparse training data. MLlib 
supports reading training examples stored in LIBSVM format, which is the 
default format used by LIBSVM and LIBLINEAR. It is a text format in which each 
line represents a labeled sparse feature vector using the following 
format:label index1:value1 index2:value2 ...
import org.apache.spark.mllib.regression.LabeledPointimport 
org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, 
data/mllib/sample_libsvm_data.txt)
I believe that I have formatted my data as per the required Libsvm format. Here 
is a snippet of that:
1122:11693:11771:11974:12334:1
2378:12562:1 1118:11389:11413:11454:1   
 1780:12562:15051:15417:15548:1
5798:15862:1 0150:1214:1468:11013:1 
   1078:11092:11117:11489:11546:11630:1 
   1635:11827:12024:12215:12478:1
2761:15985:16115:16218:1 0251:15578:1 
However,When I use MLUtils.loadLibSVMFile(sc, path-to-data-file)I get the 
following error messages in mt spark-shell. Can someone please point me in 
right direction.
java.lang.NumberFormatException: For input string: 150:1214:1
468:11013:11078:11092:11117:11489:1 
   1546:11630:11635:11827:12024:12215:1 
   2478:12761:15985:16115:16218:1 
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1241) 
at java.lang.Double.parseDouble(Double.java:540) at 
scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)  
 

Re: problem with HiveContext inside Actor

2014-09-17 Thread Michael Armbrust
- dev

Is it possible that you are constructing more than one HiveContext in a
single JVM?  Due to global state in Hive code this is not allowed.

Michael

On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Hi, Du

 I am not sure what you mean “triggers the HiveContext to create a
 database”, do you create the sub class of HiveContext? Just be sure you
 call the “HiveContext.sessionState” eagerly, since it will set the proper
 “hiveconf” into the SessionState, otherwise the HiveDriver will always get
 the null value when retrieving HiveConf.



 Cheng Hao



 *From:* Du Li [mailto:l...@yahoo-inc.com.INVALID]
 *Sent:* Thursday, September 18, 2014 7:51 AM
 *To:* user@spark.apache.org; d...@spark.apache.org
 *Subject:* problem with HiveContext inside Actor



 Hi,



 Wonder anybody had similar experience or any suggestion here.



 I have an akka Actor that processes database requests in high-level
 messages. Inside this Actor, it creates a HiveContext object that does the
 actual db work. The main thread creates the needed SparkContext and passes
 in to the Actor to create the HiveContext.



 When a message is sent to the Actor, it is processed properly except that,
 when the message triggers the HiveContext to create a database, it throws a
 NullPointerException in hive.ql.Driver.java which suggests that its conf
 variable is not initialized.



 Ironically, it works fine if my main thread directly calls
 actor.hiveContext to create the database. The spark version is 1.1.0.



 Thanks,

 Du



Move Spark configuration from SPARK_CLASSPATH to spark-default.conf , HiveContext went wrong with Class com.hadoop.compression.lzo.LzoCodec not found

2014-09-17 Thread Zhun Shen
Hi there,

My product environment is AWS EMR with hadoop2.4.0 and spark1.0.2. I moved the 
spark configuration in SPARK_CLASSPATH to spark-default.conf,  then the 
hiveContext went wrong.
I also found WARN info “WARN DataNucleus.General: Plugin (Bundle) 
org.datanucleus.store.rdbms is already registered. Ensure you dont have 
multiple JAR versions of the same plugin in the classpath. The URL 
file:/home/hadoop/.versions/spark-1.0.2-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar
 is already registered, and you are trying to register an identical plugin 
located at URL file:/home/hadoop/spark/lib/datanucleus-rdbms-3.2.1.jar.”. But 
I do not know where the registration was?

content of SPAKR_CLASSPATH:

export SPARK_MASTER_IP=10.187.25.107
export SCALA_HOME=/home/hadoop/.versions/scala-2.10.3
export SPARK_LOCAL_DIRS=/mnt/spark/
export 
SPARK_CLASSPATH=/usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar
export SPARK_DAEMON_JAVA_OPTS=-verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps


contents of spark-default.conf:
 spark.master            spark://10.187.25.107:7077
 spark.eventLog.enabled  true
# spark.eventLog.dir      hdfs://namenode:8021/directory
 spark.serializer        org.apache.spark.serializer.KryoSerializer
 spark.local.dir         /mnt/spark/
 spark.executor.memory   10g
 spark.executor.extraLibraryPath 
/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar
# spark.executor.extraClassPath 
/usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar


the error log:

14/09/18 02:28:45 INFO parse.ParseDriver: Parsing command: show tables
14/09/18 02:28:45 INFO parse.ParseDriver: Parse Completed
14/09/18 02:28:45 INFO analysis.Analyzer: Max iterations (2) reached for batch 
MultiInstanceRelations
14/09/18 02:28:45 INFO analysis.Analyzer: Max iterations (2) reached for batch 
CaseInsensitiveAttributeReferences
14/09/18 02:28:45 INFO analysis.Analyzer: Max iterations (2) reached for batch 
Check Analysis
14/09/18 02:28:45 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for 
batch Add exchange
14/09/18 02:28:45 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for 
batch Prepare Expressions
14/09/18 02:28:45 INFO Configuration.deprecation: mapred.input.dir.recursive is 
deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=Driver.run
14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=TimeToSubmit
14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=compile
14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=parse
14/09/18 02:28:45 INFO parse.ParseDriver: Parsing command: show tables
14/09/18 02:28:45 INFO parse.ParseDriver: Parse Completed
14/09/18 02:28:45 INFO ql.Driver: /PERFLOG method=parse start=1411007325561 
end=1411007325561 duration=0
14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=semanticAnalyze
14/09/18 02:28:45 INFO ql.Driver: Semantic Analysis Completed
14/09/18 02:28:45 INFO ql.Driver: /PERFLOG method=semanticAnalyze 
start=1411007325561 end=1411007325611 duration=50
14/09/18 02:28:45 INFO exec.ListSinkOperator: Initializing Self 0 OP
14/09/18 02:28:45 INFO exec.ListSinkOperator: Operator 0 OP initialized
14/09/18 02:28:45 INFO exec.ListSinkOperator: Initialization Done 0 OP
14/09/18 02:28:45 INFO ql.Driver: Returning Hive schema: 
Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from 
deserializer)], properties:null)
14/09/18 02:28:45 INFO ql.Driver: /PERFLOG method=compile start=1411007325538 
end=1411007325677 duration=139
14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=Driver.execute
14/09/18 02:28:45 INFO Configuration.deprecation: mapred.job.name is 
deprecated. Instead, use mapreduce.job.name
14/09/18 02:28:45 INFO ql.Driver: Starting command: show tables
14/09/18 02:28:45 INFO ql.Driver: /PERFLOG method=TimeToSubmit 
start=1411007325538 end=1411007325692 duration=154
14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=runTasks
14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=task.DDL.Stage-0
14/09/18 02:28:45 INFO metastore.HiveMetaStore: 0: Opening raw store with 
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
14/09/18 02:28:45 INFO metastore.ObjectStore: ObjectStore, initialize called
14/09/18 02:28:45 WARN DataNucleus.General: Plugin (Bundle) 
org.datanucleus.store.rdbms is already registered. Ensure you dont have 
multiple JAR versions of the same plugin in the classpath. The URL 
file:/home/hadoop/.versions/spark-1.0.2-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar
 is already registered, and you are trying to register an identical plugin 
located at URL file:/home/hadoop/spark/lib/datanucleus-rdbms-3.2.1.jar.
14/09/18 02:28:45 WARN DataNucleus.General: Plugin (Bundle) org.datanucleus 
is already registered. Ensure you dont have multiple 

Re: MLLib: LIBSVM issue

2014-09-17 Thread Burak Yavuz
Hi,

The spacing between the inputs should be a single space, not a tab. I feel like 
your inputs have tabs between them instead of a single space. Therefore the 
parser
cannot parse the input.

Best,
Burak

- Original Message -
From: Sameer Tilak ssti...@live.com
To: user@spark.apache.org
Sent: Wednesday, September 17, 2014 7:25:10 PM
Subject: MLLib: LIBSVM issue

Hi All,We have a fairly large amount of sparse data. I was following the 
following instructions in the manual:
Sparse dataIt is very common in practice to have sparse training data. MLlib 
supports reading training examples stored in LIBSVM format, which is the 
default format used by LIBSVM and LIBLINEAR. It is a text format in which each 
line represents a labeled sparse feature vector using the following 
format:label index1:value1 index2:value2 ...
import org.apache.spark.mllib.regression.LabeledPointimport 
org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, 
data/mllib/sample_libsvm_data.txt)
I believe that I have formatted my data as per the required Libsvm format. Here 
is a snippet of that:
1122:11693:11771:11974:12334:1
2378:12562:1 1118:11389:11413:11454:1   
 1780:12562:15051:15417:15548:1
5798:15862:1 0150:1214:1468:11013:1 
   1078:11092:11117:11489:11546:11630:1 
   1635:11827:12024:12215:12478:1
2761:15985:16115:16218:1 0251:15578:1 
However,When I use MLUtils.loadLibSVMFile(sc, path-to-data-file)I get the 
following error messages in mt spark-shell. Can someone please point me in 
right direction.
java.lang.NumberFormatException: For input string: 150:1214:1
468:11013:11078:11092:11117:11489:1 
   1546:11630:11635:11827:12024:12215:1 
   2478:12761:15985:16115:16218:1 
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1241) 
at java.lang.Double.parseDouble(Double.java:540) at 
scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)  
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SchemaRDD and RegisterAsTable

2014-09-17 Thread Addanki, Santosh Kumar
Hi,

We built out SPARK 1.1.0 Version with MVN using
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive clean package


And the Thrift Server has been configured to use the Hive Meta Store.

When a schemaRDD is registered as table where does the metadata of this table 
get stored. Can it be stored in the configured hive meta-store or?

Also if the thrift Server is not configured to use the HIVE metastore its using 
its own default (probably derby) metastore.So would the table metainfo be 
stored in this meta-store.

Best Regards
Santosh






Re: SchemaRDD and RegisterAsTable

2014-09-17 Thread Denny Lee
The registered table is stored within the spark context itself.  To have the 
table available for the thrift server to get access to, you can save the sc 
table into the Hive context so that way the Thrift server process can see the 
table.  If you are using derby as your metastore, then the thrift server should 
be accessing this as you would want to utilize the same hive configuration 
(i.e. hive-site.xml).  You may want to migrate your metastore to MySQL or 
Postgres as it will be handle concurrency better than derby.

HTH!
Denny


On September 17, 2014 at 21:47:50, Addanki, Santosh Kumar 
(santosh.kumar.adda...@sap.com) wrote:

Hi,

 

We built out SPARK 1.1.0 Version with MVN using

mvn –Pyarn –Phadoop-2.4 –Dhadoop.version=2.4.0 –Phive clean package

 

 

And the Thrift Server has been configured to use the Hive Meta Store.

 

When a schemaRDD is registered as table where does the metadata of this table 
get stored. Can it be stored in the configured hive meta-store or?

 

Also if the thrift Server is not configured to use the HIVE metastore its using 
its own default (probably derby) metastore.So would the table metainfo be 
stored in this meta-store.

 

Best Regards
Santosh


  
 

 

Re: problem with HiveContext inside Actor

2014-09-17 Thread Du Li
Thanks for your reply.

Michael: No. I only create one HiveContext in the code.

Hao: Yes. I subclass HiveContext and defines own function to create database 
and then subclass akka Actor to call that function in response to an abstract 
message. By your suggestion, I called 
println(sessionState.getConf.getAllProperties) that printed tons of properties; 
however, the same NullPointerException was still thrown.

As mentioned, the weird thing is that everything worked fine if I simply called 
actor.hiveContext.createDB() directly. But it throws the null pointer exception 
from Driver.java if I do actor ! CreateSomeDB”, which seems to me just the 
same thing because the actor does nothing but call createDB().

Du



From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Wednesday, September 17, 2014 at 7:40 PM
To: Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com
Cc: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: problem with HiveContext inside Actor

- dev

Is it possible that you are constructing more than one HiveContext in a single 
JVM?  Due to global state in Hive code this is not allowed.

Michael

On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Hi, Du
I am not sure what you mean “triggers the HiveContext to create a database”, do 
you create the sub class of HiveContext? Just be sure you call the 
“HiveContext.sessionState” eagerly, since it will set the proper “hiveconf” 
into the SessionState, otherwise the HiveDriver will always get the null value 
when retrieving HiveConf.

Cheng Hao

From: Du Li 
[mailto:l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, September 18, 2014 7:51 AM
To: user@spark.apache.orgmailto:user@spark.apache.org; 
d...@spark.apache.orgmailto:d...@spark.apache.org
Subject: problem with HiveContext inside Actor

Hi,

Wonder anybody had similar experience or any suggestion here.

I have an akka Actor that processes database requests in high-level messages. 
Inside this Actor, it creates a HiveContext object that does the actual db 
work. The main thread creates the needed SparkContext and passes in to the 
Actor to create the HiveContext.

When a message is sent to the Actor, it is processed properly except that, when 
the message triggers the HiveContext to create a database, it throws a 
NullPointerException in hive.ql.Driver.java which suggests that its conf 
variable is not initialized.

Ironically, it works fine if my main thread directly calls actor.hiveContext to 
create the database. The spark version is 1.1.0.

Thanks,
Du



Python version of kmeans

2014-09-17 Thread MEETHU MATHEW
Hi all,

I need the kmeans code written against Pyspark for some testing purpose.
Can somebody tell me the difference between these two files.

 spark-1.0.1/examples/src/main/python/kmeans.py   and 

 spark-1.0.1/python/pyspark/mllib/clustering.py


Thanks  Regards, 
Meethu M

SQL shell for Spark SQL?

2014-09-17 Thread David Rosenstrauch
Is there a shell available for Spark SQL, similar to the way the Shark 
or Hive shells work?


From my reading up on Spark SQL, it seems like one can execute SQL 
queries in the Spark shell, but only from within code in a programming 
language such as Scala.  There does not seem to be any way to directly 
issue SQL (or HQL) queries from the shell, send files of queries to the 
shell (i.e., using the -f flag), etc.


Does this functionality exist somewhere and I'm just missing it?

Thanks,

DR

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org