Re: CPU RAM
Ganglia does give you a cluster wide and per machine utilization of resources, but i don't think it gives your per Spark Job. If you want to build something from scratch then you can follow up like : 1. Login to the machine 2. Get the PIDs 3. For network IO per process, you can have a look at http://nethogs.sourceforge.net/ 4. You can make use of the information in /proc/[pid]/stat and /proc/stat to estimate CPU usage and all Similarly you can get any metric of process once you have the PID. Thanks Best Regards On Wed, Sep 17, 2014 at 8:59 AM, VJ Shalish vjshal...@gmail.com wrote: Sorry for the confusion Team. My requirement is to measure the CPU utilisation, RAM usage, Network IO and other metrics of a SPARK JOB using Java program. Please help on the same. On Tue, Sep 16, 2014 at 11:23 PM, Amit kumarami...@gmail.com wrote: Not particularly related to Spark, but you can check out SIGAR API. It let's you get CPU, Memory, Network, Filesystem and process based metrics. Amit On Sep 16, 2014, at 20:14, VJ Shalish vjshal...@gmail.com wrote: Hi I need to get the CPU utilisation, RAM usage, Network IO and other metrics using Java program. Can anyone help me on this? Thanks Shalish.
Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD
PipelinedRDD is an RDD generated by Python mapper/reducer, such as rdd.map(func) will be PipelinedRDD. PipelinedRDD is an subclass of RDD, so it should have all the APIs which RDD has. sc.parallelize(range(10)).map(lambda x: (x, str(x))).sortByKey().count() 10 I'm wondering that how can you trigger this error? Davies On Tue, Sep 16, 2014 at 10:03 PM, edmond_huo huoxiang5...@gmail.com wrote: Hi, I am a freshman about spark. I tried to run a job like wordcount example in python. But when I tried to get the top 10 popular words in the file, I got the message:AttributeError: 'PipelinedRDD' object has no attribute 'sortByKey'. So my question is what is the difference between PipelinedRDD and RDD? and if I want to sort the data in PipelinedRDD, how can I do it? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-difference-between-pyspark-rdd-PipelinedRDD-and-pyspark-rdd-RDD-tp14421.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: collect on hadoopFile RDD returns wrong results
it also appears in streaming hdfs fileStream -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: collect on hadoopFile RDD returns wrong results
Can you dump out a small piece of data? while doing rdd.collect and rdd.foreach(println) Thanks Best Regards On Wed, Sep 17, 2014 at 12:26 PM, vasiliy zadonsk...@gmail.com wrote: it also appears in streaming hdfs fileStream -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: permission denied on local dir
Yes, that is how it is supposed to work. Apps run as yarn and do not generally expect to depend on local file state that is created externally. This directory should be owned by yarn though right? Your error does not show permission denied. It looks like you are unable to list a yarn dir as your user but that's expected. What are you expecting to do? On Sep 17, 2014 6:23 AM, style95 style9...@gmail.com wrote: I am running spark on shared yarn cluster. My user ID is online, but I found that when I run my spark application, local directories are created by yarn user ID. So I am unable to delete local directories and finally application failed. Please refer to my log below: 14/09/16 21:59:02 ERROR DiskBlockManager: Exception while deleting local spark dir: /hadoop02/hadoop/yarn/local/usercache/online/appcache/application_1410795082830_3994/spark-local-20140916215842-6fe7 java.io.IOException: Failed to list files for dir: /hadoop02/hadoop/yarn/local/usercache/online/appcache/application_1410795082830_3994/spark-local-20140916215842-6fe7/3a at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:580) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:592) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:593) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:592) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:592) at org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:163) at org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:160) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:160) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:153) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:151) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:151) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:151) I am unable to access /hadoop02/hadoop/yarn/local/usercache/online/appcache/application_1410795082830_3994/spark-local-20140916215842-6fe7 e.g) ls /hadoop02/hadoop/yarn/local/usercache/online/appcache/application_1410795082830_3994/spark-local-20140916215842-6fe7 does not work and permission denied occurred. I am using spark-1.0.0 and yarn 2.4.0. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/permission-denied-on-local-dir-tp14422.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: collect on hadoopFile RDD returns wrong results
full code example: def main(args: Array[String]) { val conf = new SparkConf().setAppName(ErrorExample).setMaster(local[8]) .set(spark.serializer, classOf[KryoSerializer].getName) val sc = new SparkContext(conf) val rdd = sc.hadoopFile( hdfs://./user.avro, classOf[org.apache.avro.mapred.AvroInputFormat[User]], classOf[org.apache.avro.mapred.AvroWrapper[User]], classOf[org.apache.hadoop.io.NullWritable], 1) val usersRDD = rdd.map({ case (u, _) = u.datum()}) usersRDD.foreach(println) println(-) val collected = usersRDD.collect() collected.foreach(println) } output (without info loggind etc): {id: 1, name: a} {id: 2, name: b} {id: 3, name: c} {id: 4, name: d} {id: 5, name: e} {id: 6, name: f} - {id: 6, name: f} {id: 6, name: f} {id: 6, name: f} {id: 6, name: f} {id: 6, name: f} {id: 6, name: f} -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring Spark for heterogenous hardware
I'm supposing that there's no good solution to having heterogenous hardware in a cluster. What are the prospects of having something like this in the future? Am I missing an architectural detail that precludes this possibility? Thanks, Victor On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com wrote: Ping... On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com wrote: So I have a bunch of hardware with different core and memory setups. Is there a way to do one of the following: 1. Express a ratio of cores to memory to retain. The spark worker config would represent all of the cores and all of the memory usable for any application, and the application would take a fraction that sustains the ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing both out. If there were only 16G with the same ratio requirement, it would only take 3 cores and 12G in a single executor and leave the rest. 2. Have the executor take whole number ratios of what it needs. Say it is configured for 2/8G and the worker has 4/20. So we can give the executor 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of the two parameters. Either way would allow me to get my heterogenous hardware all participating in the work of my spark cluster, presumably without endangering spark's assumption of homogenous execution environments in the dimensions of memory and cores. If there's any way to do this, please enlighten me.
Re: Configuring Spark for heterogenous hardware
I thought I answered this ... you can easily accomplish this with YARN by just telling YARN how much memory / CPU each machine has. This can be configured in groups too rather than per machine. I don't think you actually want differently-sized executors, and so don't need ratios. But you can have differently-sized containers which can fit different numbers of executors as appropriate. On Wed, Sep 17, 2014 at 8:35 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm supposing that there's no good solution to having heterogenous hardware in a cluster. What are the prospects of having something like this in the future? Am I missing an architectural detail that precludes this possibility? Thanks, Victor On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com wrote: Ping... On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com wrote: So I have a bunch of hardware with different core and memory setups. Is there a way to do one of the following: 1. Express a ratio of cores to memory to retain. The spark worker config would represent all of the cores and all of the memory usable for any application, and the application would take a fraction that sustains the ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing both out. If there were only 16G with the same ratio requirement, it would only take 3 cores and 12G in a single executor and leave the rest. 2. Have the executor take whole number ratios of what it needs. Say it is configured for 2/8G and the worker has 4/20. So we can give the executor 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of the two parameters. Either way would allow me to get my heterogenous hardware all participating in the work of my spark cluster, presumably without endangering spark's assumption of homogenous execution environments in the dimensions of memory and cores. If there's any way to do this, please enlighten me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: YARN mode not available error
Caused by: java.lang.ClassNotFoundException: org.apache.spark.scheduler.cluster.YarnClientClusterScheduler It sounds like you perhaps deployed a custom build of Spark that did not include YARN support? you need -Pyarn in your build. On Wed, Sep 17, 2014 at 4:47 AM, Barrington barrington.he...@me.com wrote: Hi, I am running Spark in cluster mode with Hadoop YARN as the underlying cluster manager. I get this error when trying to initialize the SparkContext. Exception in thread main org.apache.spark.SparkException: YARN mode not available ? at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:1586) at org.apache.spark.SparkContext.init(SparkContext.scala:310) at org.apache.spark.SparkContext.init(SparkContext.scala:86) at LascoScript$.main(LascoScript.scala:24) at LascoScript.main(LascoScript.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) Caused by: java.lang.ClassNotFoundException: org.apache.spark.scheduler.cluster.YarnClientClusterScheduler at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190) at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:1580) My build.sbt file looks like this: name := LascoScript version := 1.0 scalaVersion := 2.10.4 val excludeJBossNetty = ExclusionRule(organization = org.jboss.netty) val excludeMortbayJetty = ExclusionRule(organization = org.eclipse.jetty, artifact = jetty-server) val excludeAsm = ExclusionRule(organization = org.ow2.asm) val excludeCommonsLogging = ExclusionRule(organization = commons-logging) val excludeSLF4J = ExclusionRule(organization = org.slf4j) val excludeOldAsm = ExclusionRule(organization = asm) val excludeServletApi = ExclusionRule(organization = javax.servlet, artifact = servlet-api) libraryDependencies += org.apache.spark %% spark-core % 1.1.0 excludeAll( excludeServletApi, excludeMortbayJetty ) libraryDependencies += org.apache.hadoop % hadoop-client % 2.5.1 excludeAll( excludeJBossNetty, excludeMortbayJetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm, excludeServletApi ) libraryDependencies += org.mortbay.jetty % servlet-api % 3.0.20100224 libraryDependencies += org.eclipse.jetty % jetty-server % 8.1.16.v20140903 unmanagedJars in Compile ++= { val base = baseDirectory.value val baseDirectories = (base / lib) +++ (base) val customJars = (baseDirectories ** *.jar) customJars.classpath } resolvers += Akka Repository at http://repo.akka.io/releases/“ How can I fix this issue? - Barrington -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/YARN-mode-not-available-error-tp14420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring Spark for heterogenous hardware
Hmm, interesting. I'm using standalone mode but I could consider YARN. I'll have to simmer on that one. Thanks as always, Sean! On Wed, Sep 17, 2014 at 12:40 AM, Sean Owen so...@cloudera.com wrote: I thought I answered this ... you can easily accomplish this with YARN by just telling YARN how much memory / CPU each machine has. This can be configured in groups too rather than per machine. I don't think you actually want differently-sized executors, and so don't need ratios. But you can have differently-sized containers which can fit different numbers of executors as appropriate. On Wed, Sep 17, 2014 at 8:35 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm supposing that there's no good solution to having heterogenous hardware in a cluster. What are the prospects of having something like this in the future? Am I missing an architectural detail that precludes this possibility? Thanks, Victor On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com wrote: Ping... On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com wrote: So I have a bunch of hardware with different core and memory setups. Is there a way to do one of the following: 1. Express a ratio of cores to memory to retain. The spark worker config would represent all of the cores and all of the memory usable for any application, and the application would take a fraction that sustains the ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing both out. If there were only 16G with the same ratio requirement, it would only take 3 cores and 12G in a single executor and leave the rest. 2. Have the executor take whole number ratios of what it needs. Say it is configured for 2/8G and the worker has 4/20. So we can give the executor 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of the two parameters. Either way would allow me to get my heterogenous hardware all participating in the work of my spark cluster, presumably without endangering spark's assumption of homogenous execution environments in the dimensions of memory and cores. If there's any way to do this, please enlighten me.
Re: Questions about Spark speculation
Hi Nicolas, I've had suspicions about speculation causing problems on my cluster but don't have any hard evidence of it yet. I'm also interested in why it's turned off by default. On Tue, Sep 16, 2014 at 3:01 PM, Nicolas Mai nicolas@gmail.com wrote: Hi, guys My current project is using Spark 0.9.1, and after increasing the level of parallelism and partitions in our RDDs, stages and tasks seem to complete much faster. However it also seems that our cluster becomes more unstable after some time: - stalled stages still showing under active stages in the Spark app web dashboard - incomplete stages showing under completed stages - stages with failures I was thinking about reducing/tuning the number of parallelism, but I was also considering using spark.speculation which is currently turned off but seems promising. Questions about speculation: - Just wondering why it is turned off by default? - Are there any risks using speculation? - Is it possible that a speculative task straggles, and would trigger another new speculative task to finish the job... and so on... (some kind of loop until there's no more executors available). - What configuration do you guys usually use for spark.speculation? (interval, quantile, multiplier) I guess it depends on the project, it may give some ideas about how to use it properly. Thank you! :) Nicolas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-speculation-tp14398.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Do I Need to Set Checkpoint Interval for Every DStream?
Hi, I'm using spark streaming 1.0. I create dstream with kafkautils and apply some operations on it. There's a reduceByWindow operation at last so I suppose the checkpoint interval should be automatically set to more than 10 seconds. But what I see is it still checkpoint every 2 seconds (my batch interval), and from the log I see: [2014-09-17 16:43:25,096] INFO Checkpoint interval automatically set to 12000 ms (org.apache.spark.streaming.dstream.ReducedWindowedDStream) [2014-09-17 16:43:25,105] INFO Checkpoint interval = null (org.apache.spark.streaming.kafka.KafkaInputDStream) [2014-09-17 16:43:25,107] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.MappedDStream) [2014-09-17 16:43:25,108] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.MappedDStream) [2014-09-17 16:43:25,108] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.FilteredDStream) [2014-09-17 16:43:25,109] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.FlatMappedDStream) [2014-09-17 16:43:25,110] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.FlatMappedDStream) [2014-09-17 16:43:25,110] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.ShuffledDStream) [2014-09-17 16:43:25,111] INFO Checkpoint interval = 12000 ms (org.apache.spark.streaming.dstream.ReducedWindowedDStream) [2014-09-17 16:43:25,111] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.ForEachDStream) So does it mean I have to set checkpoint interval for all the dstreams? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
About the Spark Scala API Excption
Hi all: The Scala API here: http://spark.apache.org/docs/latest/api/scala/#package I found that it didn't specify the exception of each function or class. Then how could I know if a function throws a exception or not without reading the source code?
Change RDDs using map()
Hi, I want to make the following changes in the RDD (create new RDD from the existing to reflect some transformation): In an RDD of key-value pair, I want to get the keys for which the values are 1. How to do this using map()? Thank You
Re: Change RDDs using map()
You don't. That's what filter or the partial function version of collect are for: val transformedRDD = yourRDD.collect { case (k, v) if k == 1 = v } On Wed, Sep 17, 2014 at 3:24 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I want to make the following changes in the RDD (create new RDD from the existing to reflect some transformation): In an RDD of key-value pair, I want to get the keys for which the values are 1. How to do this using map()? Thank You
Re: CPU RAM
Hi I need the same through Java. Doesn't the SPark API support this? On Wed, Sep 17, 2014 at 2:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Ganglia does give you a cluster wide and per machine utilization of resources, but i don't think it gives your per Spark Job. If you want to build something from scratch then you can follow up like : 1. Login to the machine 2. Get the PIDs 3. For network IO per process, you can have a look at http://nethogs.sourceforge.net/ 4. You can make use of the information in /proc/[pid]/stat and /proc/stat to estimate CPU usage and all Similarly you can get any metric of process once you have the PID. Thanks Best Regards On Wed, Sep 17, 2014 at 8:59 AM, VJ Shalish vjshal...@gmail.com wrote: Sorry for the confusion Team. My requirement is to measure the CPU utilisation, RAM usage, Network IO and other metrics of a SPARK JOB using Java program. Please help on the same. On Tue, Sep 16, 2014 at 11:23 PM, Amit kumarami...@gmail.com wrote: Not particularly related to Spark, but you can check out SIGAR API. It let's you get CPU, Memory, Network, Filesystem and process based metrics. Amit On Sep 16, 2014, at 20:14, VJ Shalish vjshal...@gmail.com wrote: Hi I need to get the CPU utilisation, RAM usage, Network IO and other metrics using Java program. Can anyone help me on this? Thanks Shalish.
pyspark on yarn - lost executor
Hi , I am execution pyspark on yarn. I have successfully executed initial dataset but now I growed it 10 times more. during execution I got all the time this error: 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated tasks are failed a resubmitted again: 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84 *QUESTION:* how to debug / tune the problem. What can cause to such behavior? I have 5 machine cluster with 32 GB ram. Dataset - 3G. command for execution: /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/inpuut.csv /output/cad_model_500_2 Where can I find description of the parameters? --num-executors 12 --driver-memory 4g --executor-memory 2g What parameters should be used for tuning? Thanks Oleg.
Short Circuit Local Reads
Cloudera had a blog post about this in August 2013: http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/ Has anyone been using this in production - curious as to if it made a significant difference from a Spark perspective.
Number of partitions when saving (pyspark)
Hi everyone, Is it possible to fix the number of tasks related to a saveAsTextFile in Pyspark? I am loading several files from HDFS, fixing the number of partitions to X (let's say 40 for instance). Then some transformations, like joins and filters are carried out. The weird thing here is that the number of tasks involved in these transformations are 80, i.e. the double of the fixed number of partitions. However, when the saveAsTextFile action is carried out, there are only 4 tasks to do this (and I have not been able to increase that number). My problem here is that those 4 tasks make rapidly increase the used memory and take too long to finish. I am launching my process from windows to a cluster in ubuntu, with 13 computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2. Any clue with this? Thanks in advance
Re: pyspark on yarn - lost executor
How many partitions do you have in your input rdd? Are you specifying numPartitions in subsequent calls to groupByKey/reduceByKey? On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am execution pyspark on yarn. I have successfully executed initial dataset but now I growed it 10 times more. during execution I got all the time this error: 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated tasks are failed a resubmitted again: 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84 QUESTION: how to debug / tune the problem. What can cause to such behavior? I have 5 machine cluster with 32 GB ram. Dataset - 3G. command for execution: /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/inpuut.csv /output/cad_model_500_2 Where can I find description of the parameters? --num-executors 12 --driver-memory 4g --executor-memory 2g What parameters should be used for tuning? Thanks Oleg.
Adjacency List representation in Spark
Hello We are building an adjacency list to represent a graph. Vertexes, Edges and Weights for the same has been extracted from hdfs files by a Spark job. Further we expect size of the adjacency list(Hash Map) could grow over 20Gigs. How can we represent this in RDD, so that it will distributed in nature? Basically we are trying to fit HashMap(Adjacency List) into Spark RDD. Is there any other way other than GraphX? Thanks and Regards, Harsha
Adjacency List representation in Spark
Hello We are building an adjacency list to represent a graph. Vertexes, Edges and Weights for the same has been extracted from hdfs files by a Spark job. Further we expect size of the adjacency list(Hash Map) could grow over 20Gigs. How can we represent this in RDD, so that it will distributed in nature? Basically we are trying to fit HashMap(Adjacency List) into Spark RDD. Is there any other way other than GraphX? Thanks and Regards, Harsha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Adjacency-List-representation-in-Spark-tp1.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
list of documents sentiment analysis - problem with defining proper approach with Spark
Hi, For last few days I am working on an exercise where I want to understand the sentiment of a set of articles. As the input I have XML file with articles and the AFINN-111.txt file defining sentiment of few hundred words. What I am able to do without any problem is loading of the data, putting it into structures (classes for articles, tuples (word, sentiment-value) for sentiments). Then what I think I need to do (from the logical pov) is: foreach article articleWords = split the body by join the two lists (articleWords and sentimentWords) together. calculate the sentiment for the article by summing up sentiments of all words that it includes dump the article id, sentiment into a flat file And this is where I am stuck :) I tried multiple combinations of map/reduceByKey all either didn't make too much sense (like getting sentiment for all articles combined) or resulted in errors that function cannot be serialised. Today I even tried to implement this with a brute-force approach doing: articles.foreach(calculateSentiment) where calculateSentiment looks like below: val words = sc.parallelize(post.body.split( )) // split body by val wordPairs = words.map(w = (w, 1)).reduceByKey(_+_, 1) // create tuples of (word, #occurrences in article) val joinedValues = wordPairs.join(sentiments_) // join But somehow I had a feeling this is not the best idea and I think I was right, since the job is running for like an hour (and I have few hundred GBs to process only). So the question is - what I am doing wrong? Any hints or suggestions for direction are really appreciated! Thank you, Leszek - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD
Hi Davis, Thank you for you answer. This is my code. I think it is very similar with word count example in spark lines = sc.textFile(sys.argv[2]) sie = lines.map(lambda l: (l.strip().split(',')[4],1)).reduceByKey(lambda a, b: a + b) sort_sie = sie.sortByKey(False) Thanks again. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-difference-between-pyspark-rdd-PipelinedRDD-and-pyspark-rdd-RDD-tp14421p14448.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD
Hi Davis, When I run your code in pyspark, I still get the same error: sc.parallelize(range(10)).map(lambda x: (x, str(x))).sortByKey().count() Traceback (most recent call last): File stdin, line 1, in module AttributeError: 'PipelinedRDD' object has no attribute 'sortByKey' Is it the matter with spark version? I am using spark-0.7.3. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-difference-between-pyspark-rdd-PipelinedRDD-and-pyspark-rdd-RDD-tp14421p14449.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and disk usage.
Hello everyone. The problem is that spark write data to the disk very hard, even if application has a lot of free memory (about 3.8g). So, I've noticed that folder with name like spark-local-20140917165839-f58c contains a lot of other folders with files like shuffle_446_0_1. The total size of files in the dir spark-local-20140917165839-f58c can reach 1.1g. Sometimes its size decreases (are there only temp files in that folder?), so the totally amount of data written to the disk is greater than 1.1g. The question is what kind of data Spark store there and can I make spark not to write it on the disk and just keep it in the memory if there is enough RAM free space? I run my job locally with Spark 1.0.1: ./bin/spark-submit --driver-memory 12g --master local[3] --properties-file conf/spark-defaults.conf --class my.company.Main /path/to/jar/myJob.jar spark-defaults.conf : spark.shuffle.spill false spark.reducer.maxMbInFlight 1024 spark.shuffle.file.buffer.kb2048 spark.storage.memoryFraction0.7 The situation with disk usage is common for many jobs. I had also used ALS from MLIB and saw the similar things. I had reached no success by playing with spark configuration and i hope someone can help me :)
Re: pyspark on yarn - lost executor
Sure, I'll post to the mail list. groupByKey(self, numPartitions=None)source code http://spark.apache.org/docs/1.0.2/api/python/pyspark.rdd-pysrc.html#RDD.groupByKey Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions. So instead of using default I'll provide numPartitions , but what is the best practice to calculate the number of partitions? and how number of partitions related to my original problem? Thanks Oleg. http://spark.apache.org/docs/1.0.2/api/python/frames.html On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Look at the API for text file and groupByKey. Please don't take threads off list. Other people have the same questions. Eric Friedman On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Can hou please explain how to configure partitions? Thanks Oleg On Wednesday, September 17, 2014, Eric Friedman eric.d.fried...@gmail.com wrote: Yeah, you need to increase partitions. You only have one on your text file. On groupByKey you're getting the pyspark default, which is too low. Eric Friedman On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets oruchov...@gmail.com wrote: This is very good question :-). Here is my code: sc = SparkContext(appName=CAD) lines = sc.textFile(sys.argv[1], 1) result = lines.map(doSplit).groupByKey().mapValues(lambda vc: my_custom_function(vc)) result.saveAsTextFile(sys.argv[2]) Should I configure partitioning manually ? Where should I configure it? Where can I read about partitioning best practices? Thanks Oleg. On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman eric.d.fried...@gmail.com wrote: How many partitions do you have in your input rdd? Are you specifying numPartitions in subsequent calls to groupByKey/reduceByKey? On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am execution pyspark on yarn. I have successfully executed initial dataset but now I growed it 10 times more. during execution I got all the time this error: 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated tasks are failed a resubmitted again: 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84 *QUESTION:* how to debug / tune the problem. What can cause to such behavior? I have 5 machine cluster with 32 GB ram. Dataset - 3G. command for execution: /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/inpuut.csv /output/cad_model_500_2 Where can I find description of the parameters? --num-executors 12 --driver-memory 4g --executor-memory 2g What parameters should be used for tuning? Thanks Oleg.
Re: Spark and disk usage.
Hi, The files you mentioned are temporary files written by Spark during shuffling. ALS will write a LOT of those files as it is a shuffle heavy algorithm. Those files will be deleted after your program completes as Spark looks for those files in case a fault occurs. Having those files ready allows Spark to continue from the stage the shuffle left off, instead of starting from the very beginning. Long story short, it's to your benefit that Spark writes those files to disk. If you don't want Spark writing to disk, you can specify a checkpoint directory in HDFS, where Spark will write the current status instead and will clean up files from disk. Best, Burak - Original Message - From: Макар Красноперов connector@gmail.com To: user@spark.apache.org Sent: Wednesday, September 17, 2014 7:37:49 AM Subject: Spark and disk usage. Hello everyone. The problem is that spark write data to the disk very hard, even if application has a lot of free memory (about 3.8g). So, I've noticed that folder with name like spark-local-20140917165839-f58c contains a lot of other folders with files like shuffle_446_0_1. The total size of files in the dir spark-local-20140917165839-f58c can reach 1.1g. Sometimes its size decreases (are there only temp files in that folder?), so the totally amount of data written to the disk is greater than 1.1g. The question is what kind of data Spark store there and can I make spark not to write it on the disk and just keep it in the memory if there is enough RAM free space? I run my job locally with Spark 1.0.1: ./bin/spark-submit --driver-memory 12g --master local[3] --properties-file conf/spark-defaults.conf --class my.company.Main /path/to/jar/myJob.jar spark-defaults.conf : spark.shuffle.spill false spark.reducer.maxMbInFlight 1024 spark.shuffle.file.buffer.kb2048 spark.storage.memoryFraction0.7 The situation with disk usage is common for many jobs. I had also used ALS from MLIB and saw the similar things. I had reached no success by playing with spark configuration and i hope someone can help me :) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: permission denied on local dir
However, in my application there is no logic to access local files. So I thought that spark is internally using the local file system to cache RDDs. As per the log, it looks that error occurred during spark internal logic rather than my business logic. It is trying to delete local directories. and they looks directories for cache. /hadoop02/hadoop/yarn/local/*usercache/online/appcache*/application_1410795082830_3994/spark-local-20140916215842-6fe7 Are there any way not to use cache or local directories? or way to access the directories created by yarn user via online, my spark user? Thanks Regards Dongkyoung. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Permission-denied-on-local-dir-tp14422p14453.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stable spark streaming app
Hmm, no response to this thread! Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming. I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark. - Original Message - From: Tim Smith secs...@gmail.com To: spark users user@spark.apache.org Sent: Friday, September 12, 2014 10:09:53 AM Subject: Stable spark streaming app Hi, Anyone have a stable streaming app running in production? Can you share some overview of the app and setup like number of nodes, events per second, broad stream processing workflow, config highlights etc? Thanks, Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GroupBy Key and then sort values with the group
Hi Group, I am quite fresh in the spark world. There is a particular use case that I just cannot understand how to accomplish in spark. I am using Cloudera CDH5/YARN/Java 7. I have a dataset that has the following characteristics - A JavaPairRDD that represents the following - Key = {int ID} Value = {date effectiveFrom, float value} Let's say that the data I have is the following - Partition - 1 [K= 1, V= {09-17-2014, 2.8}] [K= 1, V= {09-11-2014, 3.9}] [K= 3, V= {09-18-2014, 5.0}] [K= 3, V= {09-10-2014, 7.4}] Partition - 2 [K= 2, V= {09-13-2014, 2.5}] [K= 4, V= {09-07-2014, 6.2}] [K= 2, V= {09-12-2014, 1.8}] [K= 4, V= {09-22-2014, 2.9}] Grouping by key gives me the following RDD Partition - 1 [K= 1, V= Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})] [K= 3, V= Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})] Partition - 2 [K= 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})] [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})] Now I would like to sort by the values and the result should look like this - Partition - 1 [K= 1, V= Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})] [K= 3, V= Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})] Partition - 2 [K= 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})] [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})] What is the best way to do this in spark? If so desired, I can even move the effectiveFrom (the field that I want to sort on) into the key field. A code snippet or some pointers on how to solve this would be very helpful. Regards, Abraham
Re: pyspark on yarn - lost executor
Maybe the Python worker use too much memory during groupByKey(), groupByKey() with larger numPartitions can help. Also, can you upgrade your cluster to 1.1? It can spilling the data into disks if the memory can not hold all the data during groupByKey(). Also, If there is hot key with dozens of millions of values, the PR [1] can help it, it actually helped someone with large datasets (3T). Davies [1] https://github.com/apache/spark/pull/1977 On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Sure, I'll post to the mail list. groupByKey(self, numPartitions=None) source code Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions. So instead of using default I'll provide numPartitions , but what is the best practice to calculate the number of partitions? and how number of partitions related to my original problem? Thanks Oleg. http://spark.apache.org/docs/1.0.2/api/python/frames.html On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Look at the API for text file and groupByKey. Please don't take threads off list. Other people have the same questions. Eric Friedman On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Can hou please explain how to configure partitions? Thanks Oleg On Wednesday, September 17, 2014, Eric Friedman eric.d.fried...@gmail.com wrote: Yeah, you need to increase partitions. You only have one on your text file. On groupByKey you're getting the pyspark default, which is too low. Eric Friedman On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets oruchov...@gmail.com wrote: This is very good question :-). Here is my code: sc = SparkContext(appName=CAD) lines = sc.textFile(sys.argv[1], 1) result = lines.map(doSplit).groupByKey().mapValues(lambda vc: my_custom_function(vc)) result.saveAsTextFile(sys.argv[2]) Should I configure partitioning manually ? Where should I configure it? Where can I read about partitioning best practices? Thanks Oleg. On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman eric.d.fried...@gmail.com wrote: How many partitions do you have in your input rdd? Are you specifying numPartitions in subsequent calls to groupByKey/reduceByKey? On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am execution pyspark on yarn. I have successfully executed initial dataset but now I growed it 10 times more. during execution I got all the time this error: 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated tasks are failed a resubmitted again: 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84 QUESTION: how to debug / tune the problem. What can cause to such behavior? I have 5 machine cluster with 32 GB ram. Dataset - 3G. command for execution: /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/inpuut.csv /output/cad_model_500_2 Where can I find
Re: Number of partitions when saving (pyspark)
On Wed, Sep 17, 2014 at 5:21 AM, Luis Guerra luispelay...@gmail.com wrote: Hi everyone, Is it possible to fix the number of tasks related to a saveAsTextFile in Pyspark? I am loading several files from HDFS, fixing the number of partitions to X (let's say 40 for instance). Then some transformations, like joins and filters are carried out. The weird thing here is that the number of tasks involved in these transformations are 80, i.e. the double of the fixed number of partitions. However, when the saveAsTextFile action is carried out, there are only 4 tasks to do this (and I have not been able to increase that number). My problem here is that those 4 tasks make rapidly increase the used memory and take too long to finish. I am launching my process from windows to a cluster in ubuntu, with 13 computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2. The saveAsTextFile() is an mapper RDD, so the number of partitions of it is determined by previous RDD. In Spark 1.0.2, groupByKey() or reduceByKey() will take the number of CPUs on driver (locally) as the default partitions, so it's 4. You need to change it to 40 or 80 in this case. BTW, In Spark 1.1, groupByKey() and reduceByKey() will use the number of partitions of previous RDD as the default value. Davies Any clue with this? Thanks in advance - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GroupBy Key and then sort values with the group
You just need to call mapValues() to change your Iterable of things into a sorted Iterable of things for each key-value pair. In that function you write, it's no different from any other Java program. I imagine you'll need to copy the input Iterable into an ArrayList (unfortunately), sort it with whatever Comparator you want, and return the result. On Wed, Sep 17, 2014 at 4:37 PM, abraham.ja...@thomsonreuters.com wrote: Hi Group, I am quite fresh in the spark world. There is a particular use case that I just cannot understand how to accomplish in spark. I am using Cloudera CDH5/YARN/Java 7. I have a dataset that has the following characteristics – A JavaPairRDD that represents the following – Key = {int ID} Value = {date effectiveFrom, float value} Let’s say that the data I have is the following – Partition – 1 [K= 1, V= {09-17-2014, 2.8}] [K= 1, V= {09-11-2014, 3.9}] [K= 3, V= {09-18-2014, 5.0}] [K= 3, V= {09-10-2014, 7.4}] Partition – 2 [K= 2, V= {09-13-2014, 2.5}] [K= 4, V= {09-07-2014, 6.2}] [K= 2, V= {09-12-2014, 1.8}] [K= 4, V= {09-22-2014, 2.9}] Grouping by key gives me the following RDD Partition – 1 [K= 1, V= Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})] [K= 3, V= Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})] Partition – 2 [K= 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})] [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})] Now I would like to sort by the values and the result should look like this – Partition – 1 [K= 1, V= Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})] [K= 3, V= Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})] Partition – 2 [K= 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})] [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})] What is the best way to do this in spark? If so desired, I can even move the “effectiveFrom” (the field that I want to sort on) into the key field. A code snippet or some pointers on how to solve this would be very helpful. Regards, Abraham - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: GroupBy Key and then sort values with the group
Thanks Sean, Makes total sense. I guess I was so caught up with RDD's and all the wonderful transformations it can do, that I did not think about pain old Java Collections.sort(list, comparator). Thanks, __ Abraham -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, September 17, 2014 9:37 AM To: Jacob, Abraham (FinancialRisk) Cc: user@spark.apache.org Subject: Re: GroupBy Key and then sort values with the group You just need to call mapValues() to change your Iterable of things into a sorted Iterable of things for each key-value pair. In that function you write, it's no different from any other Java program. I imagine you'll need to copy the input Iterable into an ArrayList (unfortunately), sort it with whatever Comparator you want, and return the result. On Wed, Sep 17, 2014 at 4:37 PM, abraham.ja...@thomsonreuters.com wrote: Hi Group, I am quite fresh in the spark world. There is a particular use case that I just cannot understand how to accomplish in spark. I am using Cloudera CDH5/YARN/Java 7. I have a dataset that has the following characteristics – A JavaPairRDD that represents the following – Key = {int ID} Value = {date effectiveFrom, float value} Let’s say that the data I have is the following – Partition – 1 [K= 1, V= {09-17-2014, 2.8}] [K= 1, V= {09-11-2014, 3.9}] [K= 3, V= {09-18-2014, 5.0}] [K= 3, V= {09-10-2014, 7.4}] Partition – 2 [K= 2, V= {09-13-2014, 2.5}] [K= 4, V= {09-07-2014, 6.2}] [K= 2, V= {09-12-2014, 1.8}] [K= 4, V= {09-22-2014, 2.9}] Grouping by key gives me the following RDD Partition – 1 [K= 1, V= Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})] [K= 3, V= Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})] Partition – 2 [K= 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})] [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})] Now I would like to sort by the values and the result should look like this – Partition – 1 [K= 1, V= Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})] [K= 3, V= Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})] Partition – 2 [K= 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})] [K= 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})] What is the best way to do this in spark? If so desired, I can even move the “effectiveFrom” (the field that I want to sort on) into the key field. A code snippet or some pointers on how to solve this would be very helpful. Regards, Abraham - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adjacency List representation in Spark
Hi Harsha, You could look through the GraphX source to see the approach taken there for ideas in your own. I'd recommend starting at https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala#L385 to see the storage technique. Why do you want to avoid using GraphX? Good luck! Andrew On Wed, Sep 17, 2014 at 6:43 AM, Harsha HN 99harsha.h@gmail.com wrote: Hello We are building an adjacency list to represent a graph. Vertexes, Edges and Weights for the same has been extracted from hdfs files by a Spark job. Further we expect size of the adjacency list(Hash Map) could grow over 20Gigs. How can we represent this in RDD, so that it will distributed in nature? Basically we are trying to fit HashMap(Adjacency List) into Spark RDD. Is there any other way other than GraphX? Thanks and Regards, Harsha
Re: Spark and disk usage.
Hi Burak, Most discussions of checkpointing in the docs is related to Spark streaming. Are you talking about the sparkContext.setCheckpointDir()? What effect does that have? https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing On Wed, Sep 17, 2014 at 7:44 AM, Burak Yavuz bya...@stanford.edu wrote: Hi, The files you mentioned are temporary files written by Spark during shuffling. ALS will write a LOT of those files as it is a shuffle heavy algorithm. Those files will be deleted after your program completes as Spark looks for those files in case a fault occurs. Having those files ready allows Spark to continue from the stage the shuffle left off, instead of starting from the very beginning. Long story short, it's to your benefit that Spark writes those files to disk. If you don't want Spark writing to disk, you can specify a checkpoint directory in HDFS, where Spark will write the current status instead and will clean up files from disk. Best, Burak - Original Message - From: Макар Красноперов connector@gmail.com To: user@spark.apache.org Sent: Wednesday, September 17, 2014 7:37:49 AM Subject: Spark and disk usage. Hello everyone. The problem is that spark write data to the disk very hard, even if application has a lot of free memory (about 3.8g). So, I've noticed that folder with name like spark-local-20140917165839-f58c contains a lot of other folders with files like shuffle_446_0_1. The total size of files in the dir spark-local-20140917165839-f58c can reach 1.1g. Sometimes its size decreases (are there only temp files in that folder?), so the totally amount of data written to the disk is greater than 1.1g. The question is what kind of data Spark store there and can I make spark not to write it on the disk and just keep it in the memory if there is enough RAM free space? I run my job locally with Spark 1.0.1: ./bin/spark-submit --driver-memory 12g --master local[3] --properties-file conf/spark-defaults.conf --class my.company.Main /path/to/jar/myJob.jar spark-defaults.conf : spark.shuffle.spill false spark.reducer.maxMbInFlight 1024 spark.shuffle.file.buffer.kb2048 spark.storage.memoryFraction0.7 The situation with disk usage is common for many jobs. I had also used ALS from MLIB and saw the similar things. I had reached no success by playing with spark configuration and i hope someone can help me :) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and disk usage.
Hi Andrew, Yes, I'm referring to sparkContext.setCheckpointDir(). It has the same effect as in Spark Streaming. For example, in an algorithm like ALS, the RDDs go through many transformations and the lineage of the RDD starts to grow drastically just like the lineage of DStreams do in Spark Streaming. You may observe StackOverflowErrors in ALS if you set the number of iterations to be very high. If you set the checkpointing directory however, the intermediate state of the RDDs will be saved in HDFS, and the lineage will pick off from there. You won't need to keep the shuffle data before the checkpointed state, therefore those can be safely removed (will be removed automatically). However, checkpoint must be called explicitly as in https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L291 ,just setting the directory will not be enough. Best, Burak - Original Message - From: Andrew Ash and...@andrewash.com To: Burak Yavuz bya...@stanford.edu Cc: Макар Красноперов connector@gmail.com, user user@spark.apache.org Sent: Wednesday, September 17, 2014 10:19:42 AM Subject: Re: Spark and disk usage. Hi Burak, Most discussions of checkpointing in the docs is related to Spark streaming. Are you talking about the sparkContext.setCheckpointDir()? What effect does that have? https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing On Wed, Sep 17, 2014 at 7:44 AM, Burak Yavuz bya...@stanford.edu wrote: Hi, The files you mentioned are temporary files written by Spark during shuffling. ALS will write a LOT of those files as it is a shuffle heavy algorithm. Those files will be deleted after your program completes as Spark looks for those files in case a fault occurs. Having those files ready allows Spark to continue from the stage the shuffle left off, instead of starting from the very beginning. Long story short, it's to your benefit that Spark writes those files to disk. If you don't want Spark writing to disk, you can specify a checkpoint directory in HDFS, where Spark will write the current status instead and will clean up files from disk. Best, Burak - Original Message - From: Макар Красноперов connector@gmail.com To: user@spark.apache.org Sent: Wednesday, September 17, 2014 7:37:49 AM Subject: Spark and disk usage. Hello everyone. The problem is that spark write data to the disk very hard, even if application has a lot of free memory (about 3.8g). So, I've noticed that folder with name like spark-local-20140917165839-f58c contains a lot of other folders with files like shuffle_446_0_1. The total size of files in the dir spark-local-20140917165839-f58c can reach 1.1g. Sometimes its size decreases (are there only temp files in that folder?), so the totally amount of data written to the disk is greater than 1.1g. The question is what kind of data Spark store there and can I make spark not to write it on the disk and just keep it in the memory if there is enough RAM free space? I run my job locally with Spark 1.0.1: ./bin/spark-submit --driver-memory 12g --master local[3] --properties-file conf/spark-defaults.conf --class my.company.Main /path/to/jar/myJob.jar spark-defaults.conf : spark.shuffle.spill false spark.reducer.maxMbInFlight 1024 spark.shuffle.file.buffer.kb2048 spark.storage.memoryFraction0.7 The situation with disk usage is common for many jobs. I had also used ALS from MLIB and saw the similar things. I had reached no success by playing with spark configuration and i hope someone can help me :) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and disk usage.
Thanks for the info! Are there performance impacts with writing to HDFS instead of local disk? I'm assuming that's why ALS checkpoints every third iteration instead of every iteration. Also I can imagine that checkpointing should be done every N shuffles instead of every N operations (counting maps), since only the shuffle leaves data on disk. Do you have any suggestions on this? We should write up some guidance on the use of checkpointing in the programming guide https://spark.apache.org/docs/latest/programming-guide.html - I can help with this Andrew
Re: Short Circuit Local Reads
I'm pretty sure it does help, though I don't have any numbers for it. In any case, Spark will automatically benefit from this if you link it to a version of HDFS that contains this. Matei On September 17, 2014 at 5:15:47 AM, Gary Malouf (malouf.g...@gmail.com) wrote: Cloudera had a blog post about this in August 2013: http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/ Has anyone been using this in production - curious as to if it made a significant difference from a Spark perspective.
How to ship cython library to workers?
I have a library written in Cython and C. wondering if it can be shipped to the workers which don't have cython installed. maybe create an egg package from this library? how? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-ship-cython-library-to-workers-tp14467.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to group within the messages at a vertex?
Sorry if this is in the docs someplace and I'm missing it. I'm trying to implement label propagation in GraphX. The core step of that algorithm is - for each vertex, find the most frequent label among its neighbors and set its label to that. (I think) I see how to get the input from all the neighbors, but I don't see how to group/reduce those to find the most frequent label. var G2 = G.mapVertices((id,attr) = id) val perSrcCount: VertexRDD[(Long, Long)] = G2.mapReduceTriplets[(Long, Long)]( edge = Iterator((edge.dstAttr, (edge.srcAttr, 1))), (a,b) = ((a._1), (a._2 + b._2)) // this line seems broken ) It seems on the broken line above, I don't want to reduce all the values to a scalar, as this code does, but rather group them first and then reduce them. Can I do that all within mapReduceTriples? If not, how do I build something that I can then further reduce? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-group-within-the-messages-at-a-vertex-tp14468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming - batchDuration for streaming
hi estimated Sparkes, I have some doubt about Streaming Context batchDuration parameter. I've already read excellent explication by Tathagata Das about difference between batch window duration. But the issue seem a few confusion as for exam.- without any window SparkStreaming plays as implicit window by batchDuration actions. So, 1. What is simple strategy to get the correct value batchDuration? My UCase-is Flume spooling directory source for new text file = SparkStreaming analytic app. And one question more- 2. What is the reason for batching streaming? I used BigInsights Streams and there is not this batch strategy as the stream is the continuous flow.You run Streams with one from many source connectors to catch data input flow(stream).If you need- you can work with data on window frame.But anything case - your streams app gets(listens) continuously stream/ flow. Perhaps - I don't understand some important and powerful characteristic of Spark Streaming architecture. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-batchDuration-for-streaming-tp14469.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to group within the messages at a vertex?
At 2014-09-17 11:39:19 -0700, spr s...@yarcdata.com wrote: I'm trying to implement label propagation in GraphX. The core step of that algorithm is - for each vertex, find the most frequent label among its neighbors and set its label to that. [...] It seems on the broken line above, I don't want to reduce all the values to a scalar, as this code does, but rather group them first and then reduce them. Can I do that all within mapReduceTriples? If not, how do I build something that I can then further reduce? Label propagation is actually already implemented in GraphX [1]. The way it handles the most frequent label reduce operation is to aggregate a histogram, implemented as a map from label to frequency, and then take the most frequent element from the map at the end. Something to watch out for is that this can create large aggregation messages for high-degree vertices. Ankur [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and disk usage.
Yes, writing to HDFS is more expensive, but I feel it is still a small price to pay when compared to having a Disk Space Full error three hours in and having to start from scratch. The main goal of checkpointing is to truncate the lineage. Clearing up shuffle writes come as a bonus to checkpointing, it is not the main goal. The subtlety here is that .checkpoint() is just like .cache(). Until you call an action, nothing happens. Therefore, if you're going to do 1000 maps in a row and you don't want to checkpoint in the meantime until a shuffle happens, you will still get a StackOverflowError, because the lineage is too long. I went through some of the code for checkpointing. As far as I can tell, it materializes the data in HDFS, and resets all its dependencies, so you start a fresh lineage. My understanding would be that checkpointing still should be done every N operations to reset the lineage. However, an action must be performed before the lineage grows too long. I believe it would be nice to write up checkpointing in the programming guide. The reason that it's not there yet I believe is that most applications don't grow such a long lineage, except in Spark Streaming, and some MLlib algorithms. If you can help with the guide, I think it would be a nice feature to have! Burak - Original Message - From: Andrew Ash and...@andrewash.com To: Burak Yavuz bya...@stanford.edu Cc: Макар Красноперов connector@gmail.com, user user@spark.apache.org Sent: Wednesday, September 17, 2014 11:04:02 AM Subject: Re: Spark and disk usage. Thanks for the info! Are there performance impacts with writing to HDFS instead of local disk? I'm assuming that's why ALS checkpoints every third iteration instead of every iteration. Also I can imagine that checkpointing should be done every N shuffles instead of every N operations (counting maps), since only the shuffle leaves data on disk. Do you have any suggestions on this? We should write up some guidance on the use of checkpointing in the programming guide https://spark.apache.org/docs/latest/programming-guide.html - I can help with this Andrew - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutOfMemoryError with basic kmeans
Not sure if you resolved this but I had a similar issue and resolved it. In my case, the problem was the ids of my items were of type Long and could be very large (even though there are only a small number of distinct ids... maybe a few hundred of them). KMeans will create a dense vector for the cluster centers so its important that the dimensionality not be huge. I had to map my ids to a smaller space and it worked fine. The mapping was something like... 1001223412 - 1 1006591779 - 2 1011232423 - 3 ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-with-basic-kmeans-tp1651p14472.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to run kmeans after pca?
I would like to reduce the dimensionality of my data before running kmeans. The problem I'm having is that both RowMatrix.computePrincipalComponents() and RowMatrix.computeSVD() return a DenseMatrix whereas KMeans.train() requires an RDD[Vector]. Does MLlib provide a way to do this conversion? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-kmeans-after-pca-tp14473.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: partitioned groupBy
Patrick, If I understand this correctly, I won't be able to do this in the closure provided to mapPartitions() because that's going to be stateless, in the sense that a hash map that I create within the closure would only be useful for one call of MapPartitionsRDD.compute(). I guess I would need to override mapPartitions() directly within my RDD. Right? On Tue, Sep 16, 2014 at 4:57 PM, Patrick Wendell pwend...@gmail.com wrote: If each partition can fit in memory, you can do this using mapPartitions and then building an inverse mapping within each partition. You'd need to construct a hash map within each partition yourself. On Tue, Sep 16, 2014 at 4:27 PM, Akshat Aranya aara...@gmail.com wrote: I have a use case where my RDD is set up such: Partition 0: K1 - [V1, V2] K2 - [V2] Partition 1: K3 - [V1] K4 - [V3] I want to invert this RDD, but only within a partition, so that the operation does not require a shuffle. It doesn't matter if the partitions of the inverted RDD have non unique keys across the partitions, for example: Partition 0: V1 - [K1] V2 - [K1, K2] Partition 1: V1 - [K3] V3 - [K4] Is there a way to do only a per-partition groupBy, instead of shuffling the entire data?
Re: Stable spark streaming app
I don't have anything in production yet but I now at least have a stable (running for more than 24 hours) streaming app. Earlier, the app would crash for all sorts of reasons. Caveats/setup: - Spark 1.0.0 (I have no input flow control unlike Spark 1.1) - Yarn for RM - Input and Output to Kafka - CDH 5.1 - 11 node cluster with 32-cores and 48G max container size for each node (Yarn managed) - 5 partition Kafka topic - both in and out - Roughly, an average of 25k messages per second - App written in Scala (warning: I am a Scala noob) Few things I had to add/tweak to get the app to be stable: - The executor JVMs did not have any GC options set, by default. This might be more of a CDH issue. I noticed that while the Yarn container and other Spark ancillary tasks had GC options set at launch but none for the executors. So I played with different GC options and this worked best: SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails I tried G1GC but for some reason it just didn't work. I am not a Java programmer or expert so my conclusion is purely trial and error based. The GC logs, with these flags, go to the stdout file in the Yarn container logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the driver node and Yarn will respect these. On CDH/CM specifically, even though you don't run Spark as a service (since you are using Yarn for RM), you can goto Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and set SPARK_JAVA_OPTS there. - Set these two params - spark.yarn.executor.memoryOverhead spark.yarn.driver.memoryOverhead. Earlier, my app would get killed because the executors running the kafka receivers would get killed by Yarn for over utilization of memory. Now, these are my memory settings (I will paste the entire app launch params later in the email): --driver-memory 2G \ --executor-memory 16G \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ Your total executor JVM will consume executor-memory minus spark.yarn.executor.memoryOverhead so you should see each executor JVM consuming no more than 12G, in this case. Here is how I launch my app: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 2G \ --executor-memory 16G \ --executor-cores 16 \ --num-executors 10 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ --spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true \ --spark.default.parallelism 528 \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/current-run.pid Some code optimizations (or, goof ups that I fixed). I did not scientifically measure the impact of each but I think they helped: - Made all my classes and objects serializable and then use Kryo (as you see above) - I map one receive task for each kafka partition - Instead of doing a union on all the incoming streams and then repartition() I now repartition() each incoming stream and process them separately. I believe this reduces shuffle. - Reduced number of repartitions. I was doing 128 after doing a union on all incoming dStreams. I now repartition each of the five streams separately (in a loop) to 24. - For each RDD, I set storagelevel to MEMORY_AND_DISK_SER - Process data per partition instead of per RDD: dataout.foreachRDD( rdd = rdd.foreachPartition(rec = { myFunc(rec) }) ) - Specific to kafka: when I create new Producer, make sure I close it else I had a ton of too many files open errors :) - Use immutable objects as far as possible. If I use mutable objects within a method/class then I turn them into immutable before passing onto another class/method. - For logging, create a LogService object that I then use for other object/class declarations. Once instantiated, I can make logInfo calls from within other Objects/Methods/Classes and output goes to the stderr file in the Yarn container logs. Good for debugging stream processing logic. Currently, my processing delay is lower than my dStream time window so all is good. I get a ton of these errors in my driver logs: 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener threw an exception These seem related to: https://issues.apache.org/jira/browse/SPARK-2316 Best I understand and have been told, this does not affect data integrity but may cause un-necessary recomputes. Hope this helps, Tim On Wed, Sep 17, 2014
Re: Stable spark streaming app
Thanks Tim for the detailed email, would help me a lot. I have: . 3 nodes CDH5.1 cluster with 16G memory. . Majority of code in Scala, some part in Java (using Cascading earlier). . Inputs are bunch of textFileStream directories. . Every batch output is going to Parquet files, and to HBase Had to set export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar fix some issue during class loading. For some reason 'spark.driver.extraClassPath' did not work. . I started doing union pretty early in the flow, but did not work because all of my classes are not serializable. Each DStream in isolation works but if I union them early, then got into serialization issues. . Took a while to find the log directory, /run/spark/work . . Deploying at --master spark://mymachine:7077 . Modified '/etc/spark/conf.cloudera.spark/log4j.properties' for logging . Currently I cannot process 1G file with this configuration. I tried various things but could not succeed yet. - Original Message - From: Tim Smith secs...@gmail.com Cc: spark users user@spark.apache.org Sent: Wednesday, September 17, 2014 1:11:12 PM Subject: Re: Stable spark streaming app I don't have anything in production yet but I now at least have a stable (running for more than 24 hours) streaming app. Earlier, the app would crash for all sorts of reasons. Caveats/setup: - Spark 1.0.0 (I have no input flow control unlike Spark 1.1) - Yarn for RM - Input and Output to Kafka - CDH 5.1 - 11 node cluster with 32-cores and 48G max container size for each node (Yarn managed) - 5 partition Kafka topic - both in and out - Roughly, an average of 25k messages per second - App written in Scala (warning: I am a Scala noob) Few things I had to add/tweak to get the app to be stable: - The executor JVMs did not have any GC options set, by default. This might be more of a CDH issue. I noticed that while the Yarn container and other Spark ancillary tasks had GC options set at launch but none for the executors. So I played with different GC options and this worked best: SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails I tried G1GC but for some reason it just didn't work. I am not a Java programmer or expert so my conclusion is purely trial and error based. The GC logs, with these flags, go to the stdout file in the Yarn container logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the driver node and Yarn will respect these. On CDH/CM specifically, even though you don't run Spark as a service (since you are using Yarn for RM), you can goto Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and set SPARK_JAVA_OPTS there. - Set these two params - spark.yarn.executor.memoryOverhead spark.yarn.driver.memoryOverhead. Earlier, my app would get killed because the executors running the kafka receivers would get killed by Yarn for over utilization of memory. Now, these are my memory settings (I will paste the entire app launch params later in the email): --driver-memory 2G \ --executor-memory 16G \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ Your total executor JVM will consume executor-memory minus spark.yarn.executor.memoryOverhead so you should see each executor JVM consuming no more than 12G, in this case. Here is how I launch my app: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 2G \ --executor-memory 16G \ --executor-cores 16 \ --num-executors 10 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ --spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true \ --spark.default.parallelism 528 \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/current-run.pid Some code optimizations (or, goof ups that I fixed). I did not scientifically measure the impact of each but I think they helped: - Made all my classes and objects serializable and then use Kryo (as you see above) - I map one receive task for each kafka partition - Instead of doing a union on all the incoming streams and then repartition() I now repartition() each incoming stream and process them separately. I believe this reduces shuffle. - Reduced number of repartitions. I was doing 128 after doing a union on all incoming dStreams. I now repartition each of the five streams separately (in a loop) to 24. - For each RDD,
RE: Change RDDs using map()
if you want the result as RDD of (key, 1) new_rdd = rdd.filter(x = x._2 == 1) if you want result as RDD of keys (since you know the values are 1), then new_rdd = rdd.filter(x = x._2 == 1).map(x = x._1) x._1 and x._2 are the way of scala to access the key and value from key/value pair. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Change-RDDs-using-map-tp14436p14481.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dealing with Time Series Data
what are you trying to do? generate time series from your data in HDFS, or doing some transformation and/or aggregation from your time series data in HDFS? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dealing-with-Time-Series-Data-tp14275p14482.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Size exceeds Integer.MAX_VALUE in BlockFetcherIterator
Hi, We are running aggregation on a huge data set (few billion rows). While running the task got the following error (see below). Any ideas? Running spark 1.1.0 on cdh distribution. ... 14/09/17 13:33:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0). 2083 bytes result sent to driver 14/09/17 13:33:30 INFO CoarseGrainedExecutorBackend: Got assigned task 1 14/09/17 13:33:30 INFO Executor: Running task 0.0 in stage 2.0 (TID 1) 14/09/17 13:33:30 INFO TorrentBroadcast: Started reading broadcast variable 2 14/09/17 13:33:30 INFO MemoryStore: ensureFreeSpace(1428) called with curMem=163719, maxMem=34451478282 14/09/17 13:33:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1428.0 B, free 32.1 GB) 14/09/17 13:33:30 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 14/09/17 13:33:30 INFO TorrentBroadcast: Reading broadcast variable 2 took 0.027374294 s 14/09/17 13:33:30 INFO MemoryStore: ensureFreeSpace(2336) called with curMem=165147, maxMem=34451478282 14/09/17 13:33:30 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.3 KB, free 32.1 GB) 14/09/17 13:33:30 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache 14/09/17 13:33:30 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them 14/09/17 13:33:30 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkdri...@sas-model1.pv.sv.nextag.com:56631/user/MapOutputTracker#794212052] 14/09/17 13:33:30 INFO MapOutputTrackerWorker: Got the output locations 14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 8 ms 14/09/17 13:33:30 ERROR BlockFetcherIterator$BasicBlockFetcherIterator: Error occurred while fetching local blocks java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:120) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:358) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:208) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:205) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:205) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:240) at org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:583) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:77) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:41) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/09/17 13:33:30 INFO CoarseGrainedExecutorBackend: Got assigned task 2 14/09/17 13:33:30 INFO Executor: Running task 0.0 in stage 1.1 (TID 2) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-in-BlockFetcherIterator-tp14483.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Stable spark streaming app
Nice write-up... very helpful! -Original Message- From: Tim Smith [mailto:secs...@gmail.com] Sent: Wednesday, September 17, 2014 1:11 PM Cc: spark users Subject: Re: Stable spark streaming app I don't have anything in production yet but I now at least have a stable (running for more than 24 hours) streaming app. Earlier, the app would crash for all sorts of reasons. Caveats/setup: - Spark 1.0.0 (I have no input flow control unlike Spark 1.1) - Yarn for RM - Input and Output to Kafka - CDH 5.1 - 11 node cluster with 32-cores and 48G max container size for each node (Yarn managed) - 5 partition Kafka topic - both in and out - Roughly, an average of 25k messages per second - App written in Scala (warning: I am a Scala noob) Few things I had to add/tweak to get the app to be stable: - The executor JVMs did not have any GC options set, by default. This might be more of a CDH issue. I noticed that while the Yarn container and other Spark ancillary tasks had GC options set at launch but none for the executors. So I played with different GC options and this worked best: SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails I tried G1GC but for some reason it just didn't work. I am not a Java programmer or expert so my conclusion is purely trial and error based. The GC logs, with these flags, go to the stdout file in the Yarn container logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the driver node and Yarn will respect these. On CDH/CM specifically, even though you don't run Spark as a service (since you are using Yarn for RM), you can goto Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and set SPARK_JAVA_OPTS there. - Set these two params - spark.yarn.executor.memoryOverhead spark.yarn.driver.memoryOverhead. Earlier, my app would get killed because the executors running the kafka receivers would get killed by Yarn for over utilization of memory. Now, these are my memory settings (I will paste the entire app launch params later in the email): --driver-memory 2G \ --executor-memory 16G \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ Your total executor JVM will consume executor-memory minus spark.yarn.executor.memoryOverhead so you should see each executor JVM consuming no more than 12G, in this case. Here is how I launch my app: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 2G \ --executor-memory 16G \ --executor-cores 16 \ --num-executors 10 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ --spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true \ --spark.default.parallelism 528 \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/current-run.pid Some code optimizations (or, goof ups that I fixed). I did not scientifically measure the impact of each but I think they helped: - Made all my classes and objects serializable and then use Kryo (as you see above) - I map one receive task for each kafka partition - Instead of doing a union on all the incoming streams and then repartition() I now repartition() each incoming stream and process them separately. I believe this reduces shuffle. - Reduced number of repartitions. I was doing 128 after doing a union on all incoming dStreams. I now repartition each of the five streams separately (in a loop) to 24. - For each RDD, I set storagelevel to MEMORY_AND_DISK_SER - Process data per partition instead of per RDD: dataout.foreachRDD( rdd = rdd.foreachPartition(rec = { myFunc(rec) }) ) - Specific to kafka: when I create new Producer, make sure I close it else I had a ton of too many files open errors :) - Use immutable objects as far as possible. If I use mutable objects within a method/class then I turn them into immutable before passing onto another class/method. - For logging, create a LogService object that I then use for other object/class declarations. Once instantiated, I can make logInfo calls from within other Objects/Methods/Classes and output goes to the stderr file in the Yarn container logs. Good for debugging stream processing logic. Currently, my processing delay is lower than my dStream time window so all is good. I get a ton of these errors in my driver logs: 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener threw an
Re: partitioned groupBy
If you'd like to re-use the resulting inverted map, you can persist the result: x = myRdd.mapPartitions(create inverted map).persist() Your function would create the reverse map and then return an iterator over the keys in that map. On Wed, Sep 17, 2014 at 1:04 PM, Akshat Aranya aara...@gmail.com wrote: Patrick, If I understand this correctly, I won't be able to do this in the closure provided to mapPartitions() because that's going to be stateless, in the sense that a hash map that I create within the closure would only be useful for one call of MapPartitionsRDD.compute(). I guess I would need to override mapPartitions() directly within my RDD. Right? On Tue, Sep 16, 2014 at 4:57 PM, Patrick Wendell pwend...@gmail.com wrote: If each partition can fit in memory, you can do this using mapPartitions and then building an inverse mapping within each partition. You'd need to construct a hash map within each partition yourself. On Tue, Sep 16, 2014 at 4:27 PM, Akshat Aranya aara...@gmail.com wrote: I have a use case where my RDD is set up such: Partition 0: K1 - [V1, V2] K2 - [V2] Partition 1: K3 - [V1] K4 - [V3] I want to invert this RDD, but only within a partition, so that the operation does not require a shuffle. It doesn't matter if the partitions of the inverted RDD have non unique keys across the partitions, for example: Partition 0: V1 - [K1] V2 - [K1, K2] Partition 1: V1 - [K3] V3 - [K4] Is there a way to do only a per-partition groupBy, instead of shuffling the entire data? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Size exceeds Integer.MAX_VALUE in BlockFetcherIterator
Hi, Could you try repartitioning the data by .repartition(# of cores on machine) or while reading the data, supply the number of minimum partitions as in sc.textFile(path, # of cores on machine). It may be that the whole data is stored in one block? If it is billions of rows, then the indexing probably will not work giving the exceeds Integer.MAX_VALUE error. Best, Burak - Original Message - From: francisco ftanudj...@nextag.com To: u...@spark.incubator.apache.org Sent: Wednesday, September 17, 2014 3:18:29 PM Subject: Size exceeds Integer.MAX_VALUE in BlockFetcherIterator Hi, We are running aggregation on a huge data set (few billion rows). While running the task got the following error (see below). Any ideas? Running spark 1.1.0 on cdh distribution. ... 14/09/17 13:33:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0). 2083 bytes result sent to driver 14/09/17 13:33:30 INFO CoarseGrainedExecutorBackend: Got assigned task 1 14/09/17 13:33:30 INFO Executor: Running task 0.0 in stage 2.0 (TID 1) 14/09/17 13:33:30 INFO TorrentBroadcast: Started reading broadcast variable 2 14/09/17 13:33:30 INFO MemoryStore: ensureFreeSpace(1428) called with curMem=163719, maxMem=34451478282 14/09/17 13:33:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1428.0 B, free 32.1 GB) 14/09/17 13:33:30 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 14/09/17 13:33:30 INFO TorrentBroadcast: Reading broadcast variable 2 took 0.027374294 s 14/09/17 13:33:30 INFO MemoryStore: ensureFreeSpace(2336) called with curMem=165147, maxMem=34451478282 14/09/17 13:33:30 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.3 KB, free 32.1 GB) 14/09/17 13:33:30 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache 14/09/17 13:33:30 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them 14/09/17 13:33:30 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkdri...@sas-model1.pv.sv.nextag.com:56631/user/MapOutputTracker#794212052] 14/09/17 13:33:30 INFO MapOutputTrackerWorker: Got the output locations 14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 14/09/17 13:33:30 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 8 ms 14/09/17 13:33:30 ERROR BlockFetcherIterator$BasicBlockFetcherIterator: Error occurred while fetching local blocks java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:120) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:358) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:208) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:205) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:205) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:240) at org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:583) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:77) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:41) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/09/17 13:33:30 INFO
Re: Spark Streaming - batchDuration for streaming
Here's official spark document about batch size/interval: http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-size spark is batch oriented processing. As you mentioned, the streaming is continuous flow, and core spark can not handle it. Spark streaming bridges the gap between the continuous flow and batch oriented processing. It generates an RDD from continuous data flow/stream every batch interval, then the spark can process them as normal RDDs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-batchDuration-for-streaming-tp14469p14487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK BENCHMARK TEST
I am trying to benchmark spark in a hadoop cluster. I need to design a sample spark job to test the CPU utilization, RAM usage, Input throughput, Output throughput and Duration of execution in the cluster. I need to test the state of the cluster for :- A spark job which uses high CPU A spark job which uses high RAM A spark job which uses high Input throughput A spark job which uses high Output throughput A spark job which takes long time. These have to be tested individually and a combination of these scenarios would also be used. Please help me in understanding the factors of a Spark Job which would contribute to CPU utilization, RAM usage, Input throughput, Output throughput, Duration of execution in the cluster. So that I can design spark jobs that could be used for testing. Thanks Shalish.
Re: Stable spark streaming app
Thanks :) On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais pw...@yelp.com wrote: Thanks Tim, this is super helpful! Question about jars and spark-submit: why do you provide myawesomeapp.jar as the program jar but then include other jars via the --jars argument? Have you tried building one uber jar with all dependencies and just sending that to Spark as your app jar? I guess that is mostly because I am Scala/sbt noob :) How do I create the uber jar? My .sbt file says: name := My Awesome App version := 1.025 scalaVersion := 2.10.4 resolvers += Apache repo at https://repository.apache.org/content/repositories/releases; libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.1.1 Then I run sbt package to generate myawesomeapp.jar. Also, have you ever seen any issues with Spark caching your app jar between runs even if it changes? Not that I can tell but then maybe because I use Yarn, I might be shielded from some jar distribution bugs in Spark? On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith secs...@gmail.com wrote: I don't have anything in production yet but I now at least have a stable (running for more than 24 hours) streaming app. Earlier, the app would crash for all sorts of reasons. Caveats/setup: - Spark 1.0.0 (I have no input flow control unlike Spark 1.1) - Yarn for RM - Input and Output to Kafka - CDH 5.1 - 11 node cluster with 32-cores and 48G max container size for each node (Yarn managed) - 5 partition Kafka topic - both in and out - Roughly, an average of 25k messages per second - App written in Scala (warning: I am a Scala noob) Few things I had to add/tweak to get the app to be stable: - The executor JVMs did not have any GC options set, by default. This might be more of a CDH issue. I noticed that while the Yarn container and other Spark ancillary tasks had GC options set at launch but none for the executors. So I played with different GC options and this worked best: SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails I tried G1GC but for some reason it just didn't work. I am not a Java programmer or expert so my conclusion is purely trial and error based. The GC logs, with these flags, go to the stdout file in the Yarn container logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the driver node and Yarn will respect these. On CDH/CM specifically, even though you don't run Spark as a service (since you are using Yarn for RM), you can goto Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and set SPARK_JAVA_OPTS there. - Set these two params - spark.yarn.executor.memoryOverhead spark.yarn.driver.memoryOverhead. Earlier, my app would get killed because the executors running the kafka receivers would get killed by Yarn for over utilization of memory. Now, these are my memory settings (I will paste the entire app launch params later in the email): --driver-memory 2G \ --executor-memory 16G \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ Your total executor JVM will consume executor-memory minus spark.yarn.executor.memoryOverhead so you should see each executor JVM consuming no more than 12G, in this case. Here is how I launch my app: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 2G \ --executor-memory 16G \ --executor-cores 16 \ --num-executors 10 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ --spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true \ --spark.default.parallelism 528 \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/current-run.pid Some code optimizations (or, goof ups that I fixed). I did not scientifically measure the impact of each but I think they helped: - Made all my classes and objects serializable and then use Kryo (as you see above) - I map one receive task for each kafka partition - Instead of doing a union on all the incoming streams and then repartition() I now repartition() each incoming stream and process them separately. I believe this reduces shuffle. - Reduced number of repartitions. I was doing 128 after
spark-1.1.0-bin-hadoop2.4 java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
Hi I am new to spark. I am trying to write a simple java program that process tweets that where collected and stored in a file. I figured the simplest thing to do would be to convert the JSON string into a java map. When I submit my jar file I keep getting the following error java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass For the life of me I can not figure out what the problem is. I am using maven shade plugin and check to see that the missing class is my uber jar Any suggestions would be greatly appreciated. Andy P.s. I should mention that I am running everything on my local machine. properties jackson.version1.8.8/jackson.version !-- 1.9.13 -- project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties dependencies dependency groupIdorg.codehaus.jackson/groupId artifactIdjackson-core-asl/artifactId version${jackson.version}/version /dependency dependency groupIdorg.codehaus.jackson/groupId artifactIdjackson-mapper-asl/artifactId version${jackson.version}/version /dependency I am using shade to build an uber jar $jar tvf target/myUber.jar 580 Wed Sep 17 16:17:36 PDT 2014 org/codehaus/jackson/annotate/JsonClass.class 14/09/17 16:13:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass at org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeseri alizationType(JacksonAnnotationIntrospector.java:524) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotati on(BasicDeserializerFactory.java:732) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.createMapDeserialize r(BasicDeserializerFactory.java:337) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(S tdDeserializerProvider.java:377) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdD eserializerProvider.java:307) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueD eserializer(StdDeserializerProvider.java:287) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer (StdDeserializerProvider.java:136) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeseria lizer(StdDeserializerProvider.java:157) at org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.jav a:2468) at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:240 2) at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1609) at com.santacruzintegration.spark.JSONHelper.toJsonMap(JSONHelper.java:40) public class JSONHelper { public static final ObjectMapper mapper = new ObjectMapper(); static { mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); //mapper.configure(Feature.USE_ANNOTATIONS, false); } public static MapString, String toJsonMap(String json) { System.err.println(AEDWIP toJsonMap: + json); try { TypeReferenceHashMapString, String typeRef = new TypeReferenceHashMapString, String() { }; //return mapper.readValue(json, new TypeReferenceHashMapString,String(){}); return mapper.readValue(json, typeRef); //return mapper.convertValue(json, typeRef); //HashMapString,String ret = //new ObjectMapper().readValue(json, HashMap.class); } catch (Exception e) { throw new IOError(e); } }
problem with HiveContext inside Actor
Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not initialized. Ironically, it works fine if my main thread directly calls actor.hiveContext to create the database. The spark version is 1.1.0. Thanks, Du
Re: permission denied on local dir
More clearly, that yarn cluster is managed by other team. That means, I do not have any permission to change the system. If required, I can request to them, but as of now, I only have permission to manage my spark application. So if there are any way to solve this problem by changing configuration in spark application, it will be the best. However, if configuration in yarn layer is required, I need to ask to managing team. Thanks Regards Dongkyoung. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Permission-denied-on-local-dir-tp14422p14492.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LZO support in Spark 1.0.0 - nothing seems to work
It works for me : export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/native export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/native export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/native export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar I hope you are adding this to the code : val conf = sc.hadoopConfiguration conf.set(io.compression.codecs,com.hadoop.compression.lzo.LzopCodec) Vipul On Sep 17, 2014, at 5:40 PM, rogthefrog ro...@amino.com wrote: I have a HDFS cluster managed with CDH Manager. Version is CDH 5.1 with matching GPLEXTRAS parcel. LZO works with Hive and Pig, but I can't make it work with Spark 1.0.0. I've tried: * Setting this: HADOOP_OPTS=-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS -Djava.library.path=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native/ * Setting this in spark-env.sh. I tried with and without export. I tried in CDH Manager and manually on the host. export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/hadoop-lzo.jar export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native/ * Setting this in /etc/spark/conf/spark-defaults.conf: spark.executor.extraLibraryPath /opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native spark.spark.executor.extraClassPath /opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/hadoop-lzo.jar * Adding this in CDH manager: export LD_LIBRARY_PATH=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native * Hardcoding -Djava.library.path=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native in the Spark command * Symlinking the gpl compression binaries into /opt/cloudera/parcels/CDH/lib/hadoop/lib/native * Symlinking the gpl compression binaries into /usr/lib And nothing worked. When I run pyspark I get this: 14/09/17 20:38:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable and when I try to run a simple job on a LZO file in HDFS I get this: distFile.count() 14/09/17 13:51:54 ERROR GPLNativeCodeLoader: Could not load native gpl library java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886) at java.lang.Runtime.loadLibrary0(Runtime.java:849) at java.lang.System.loadLibrary(System.java:1088) at com.hadoop.compression.lzo.GPLNativeCodeLoader.clinit(GPLNativeCodeLoader.java:32) at com.hadoop.compression.lzo.LzoCodec.clinit(LzoCodec.java:71) Can anybody help please? Many thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LZO-support-in-Spark-1-0-0-nothing-seems-to-work-tp14494.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Huge matrix
Hi Reza, In similarColumns, it seems with cosine similarity I also need other numbers such as intersection, jaccard and other measures... Right now I modified the code to generate jaccard but I had to run it twice due to the design of RowMatrix / CoordinateMatrix...I feel we should modify RowMatrix and CoordinateMatrix to be templated on the value... Are you considering this in your design ? Thanks. Deb On Tue, Sep 9, 2014 at 9:45 AM, Reza Zadeh r...@databricks.com wrote: Better to do it in a PR of your own, it's not sufficiently related to dimsum On Tue, Sep 9, 2014 at 7:03 AM, Debasish Das debasish.da...@gmail.com wrote: Cool...can I add loadRowMatrix in your PR ? Thanks. Deb On Tue, Sep 9, 2014 at 1:14 AM, Reza Zadeh r...@databricks.com wrote: Hi Deb, Did you mean to message me instead of Xiangrui? For TS matrices, dimsum with positiveinfinity and computeGramian have the same cost, so you can do either one. For dense matrices with say, 1m columns this won't be computationally feasible and you'll want to start sampling with dimsum. It would be helpful to have a loadRowMatrix function, I would use it. Best, Reza On Tue, Sep 9, 2014 at 12:05 AM, Debasish Das debasish.da...@gmail.com wrote: Hi Xiangrui, For tall skinny matrices, if I can pass a similarityMeasure to computeGrammian, I could re-use the SVD's computeGrammian for similarity computation as well... Do you recommend using this approach for tall skinny matrices or just use the dimsum's routines ? Right now RowMatrix does not have a loadRowMatrix function like the one available in LabeledPoint...should I add one ? I want to export the matrix out from my stable code and then test dimsum... Thanks. Deb On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh r...@databricks.com wrote: I will add dice, overlap, and jaccard similarity in a future PR, probably still for 1.2 On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das debasish.da...@gmail.com wrote: Awesome...Let me try it out... Any plans of putting other similarity measures in future (jaccard is something that will be useful) ? I guess it makes sense to add some similarity measures in mllib... On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh r...@databricks.com wrote: Yes you're right, calling dimsum with gamma as PositiveInfinity turns it into the usual brute force algorithm for cosine similarity, there is no sampling. This is by design. On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das debasish.da...@gmail.com wrote: I looked at the code: similarColumns(Double.posInf) is generating the brute force... Basically dimsum with gamma as PositiveInfinity will produce the exact same result as doing catesian products of RDD[(product, vector)] and computing similarities or there will be some approximation ? Sorry I have not read your paper yet. Will read it over the weekend. On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh r...@databricks.com wrote: For 60M x 10K brute force and dimsum thresholding should be fine. For 60M x 10M probably brute force won't work depending on the cluster's power, and dimsum thresholding should work with appropriate threshold. Dimensionality reduction should help, and how effective it is will depend on your application and domain, it's worth trying if the direct computation doesn't work. You can also try running KMeans clustering (perhaps after dimensionality reduction) if your goal is to find batches of similar points instead of all pairs above a threshold. On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das debasish.da...@gmail.com wrote: Also for tall and wide (rows ~60M, columns 10M), I am considering running a matrix factorization to reduce the dimension to say ~60M x 50 and then run all pair similarity... Did you also try similar ideas and saw positive results ? On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das debasish.da...@gmail.com wrote: Ok...just to make sure I have RowMatrix[SparseVector] where rows are ~ 60M and columns are 10M say with billion data points... I have another version that's around 60M and ~ 10K... I guess for the second one both all pair and dimsum will run fine... But for tall and wide, what do you suggest ? can dimsum handle it ? I might need jaccard as well...can I plug that in the PR ? On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh r...@databricks.com wrote: You might want to wait until Wednesday since the interface will be changing in that PR before Wednesday, probably over the weekend, so that you don't have to redo your code. Your call if you need it before a week. Reza On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das debasish.da...@gmail.com wrote: Ohh coolall-pairs brute force is also part of this PR ? Let me pull it in and test on our dataset... Thanks. Deb On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh r...@databricks.com wrote: Hi Deb, We are adding all-pairs and thresholded all-pairs via dimsum in this PR:
RE: problem with HiveContext inside Actor
Hi, Du I am not sure what you mean triggers the HiveContext to create a database, do you create the sub class of HiveContext? Just be sure you call the HiveContext.sessionState eagerly, since it will set the proper hiveconf into the SessionState, otherwise the HiveDriver will always get the null value when retrieving HiveConf. Cheng Hao From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, September 18, 2014 7:51 AM To: user@spark.apache.org; d...@spark.apache.org Subject: problem with HiveContext inside Actor Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not initialized. Ironically, it works fine if my main thread directly calls actor.hiveContext to create the database. The spark version is 1.1.0. Thanks, Du
Re: Cannot run unit test.
When I run `sbt test-only SparkTest` or `sbt test-only SparkTest1`, it was pass. But run `set test` to tests SparkTest and SparkTest1, it was failed. If merge all cases into one file, the test was pass. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-run-unit-test-tp14459p14506.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib: LIBSVM issue
Hi All,We have a fairly large amount of sparse data. I was following the following instructions in the manual: Sparse dataIt is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:label index1:value1 index2:value2 ... import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt) I believe that I have formatted my data as per the required Libsvm format. Here is a snippet of that: 1122:11693:11771:11974:12334:1 2378:12562:1 1118:11389:11413:11454:1 1780:12562:15051:15417:15548:1 5798:15862:1 0150:1214:1468:11013:1 1078:11092:11117:11489:11546:11630:1 1635:11827:12024:12215:12478:1 2761:15985:16115:16218:1 0251:15578:1 However,When I use MLUtils.loadLibSVMFile(sc, path-to-data-file)I get the following error messages in mt spark-shell. Can someone please point me in right direction. java.lang.NumberFormatException: For input string: 150:1214:1 468:11013:11078:11092:11117:11489:1 1546:11630:11635:11827:12024:12215:1 2478:12761:15985:16115:16218:1 at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1241) at java.lang.Double.parseDouble(Double.java:540) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
Re: problem with HiveContext inside Actor
- dev Is it possible that you are constructing more than one HiveContext in a single JVM? Due to global state in Hive code this is not allowed. Michael On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao hao.ch...@intel.com wrote: Hi, Du I am not sure what you mean “triggers the HiveContext to create a database”, do you create the sub class of HiveContext? Just be sure you call the “HiveContext.sessionState” eagerly, since it will set the proper “hiveconf” into the SessionState, otherwise the HiveDriver will always get the null value when retrieving HiveConf. Cheng Hao *From:* Du Li [mailto:l...@yahoo-inc.com.INVALID] *Sent:* Thursday, September 18, 2014 7:51 AM *To:* user@spark.apache.org; d...@spark.apache.org *Subject:* problem with HiveContext inside Actor Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not initialized. Ironically, it works fine if my main thread directly calls actor.hiveContext to create the database. The spark version is 1.1.0. Thanks, Du
Move Spark configuration from SPARK_CLASSPATH to spark-default.conf , HiveContext went wrong with Class com.hadoop.compression.lzo.LzoCodec not found
Hi there, My product environment is AWS EMR with hadoop2.4.0 and spark1.0.2. I moved the spark configuration in SPARK_CLASSPATH to spark-default.conf, then the hiveContext went wrong. I also found WARN info “WARN DataNucleus.General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/home/hadoop/.versions/spark-1.0.2-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar is already registered, and you are trying to register an identical plugin located at URL file:/home/hadoop/spark/lib/datanucleus-rdbms-3.2.1.jar.”. But I do not know where the registration was? content of SPAKR_CLASSPATH: export SPARK_MASTER_IP=10.187.25.107 export SCALA_HOME=/home/hadoop/.versions/scala-2.10.3 export SPARK_LOCAL_DIRS=/mnt/spark/ export SPARK_CLASSPATH=/usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar export SPARK_DAEMON_JAVA_OPTS=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps contents of spark-default.conf: spark.master spark://10.187.25.107:7077 spark.eventLog.enabled true # spark.eventLog.dir hdfs://namenode:8021/directory spark.serializer org.apache.spark.serializer.KryoSerializer spark.local.dir /mnt/spark/ spark.executor.memory 10g spark.executor.extraLibraryPath /home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar # spark.executor.extraClassPath /usr/share/aws/emr/emr-fs/lib/*:/usr/share/aws/emr/lib/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar the error log: 14/09/18 02:28:45 INFO parse.ParseDriver: Parsing command: show tables 14/09/18 02:28:45 INFO parse.ParseDriver: Parse Completed 14/09/18 02:28:45 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/09/18 02:28:45 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/09/18 02:28:45 INFO analysis.Analyzer: Max iterations (2) reached for batch Check Analysis 14/09/18 02:28:45 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/09/18 02:28:45 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions 14/09/18 02:28:45 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive 14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=Driver.run 14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=TimeToSubmit 14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=compile 14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=parse 14/09/18 02:28:45 INFO parse.ParseDriver: Parsing command: show tables 14/09/18 02:28:45 INFO parse.ParseDriver: Parse Completed 14/09/18 02:28:45 INFO ql.Driver: /PERFLOG method=parse start=1411007325561 end=1411007325561 duration=0 14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=semanticAnalyze 14/09/18 02:28:45 INFO ql.Driver: Semantic Analysis Completed 14/09/18 02:28:45 INFO ql.Driver: /PERFLOG method=semanticAnalyze start=1411007325561 end=1411007325611 duration=50 14/09/18 02:28:45 INFO exec.ListSinkOperator: Initializing Self 0 OP 14/09/18 02:28:45 INFO exec.ListSinkOperator: Operator 0 OP initialized 14/09/18 02:28:45 INFO exec.ListSinkOperator: Initialization Done 0 OP 14/09/18 02:28:45 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null) 14/09/18 02:28:45 INFO ql.Driver: /PERFLOG method=compile start=1411007325538 end=1411007325677 duration=139 14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=Driver.execute 14/09/18 02:28:45 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name 14/09/18 02:28:45 INFO ql.Driver: Starting command: show tables 14/09/18 02:28:45 INFO ql.Driver: /PERFLOG method=TimeToSubmit start=1411007325538 end=1411007325692 duration=154 14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=runTasks 14/09/18 02:28:45 INFO ql.Driver: PERFLOG method=task.DDL.Stage-0 14/09/18 02:28:45 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 14/09/18 02:28:45 INFO metastore.ObjectStore: ObjectStore, initialize called 14/09/18 02:28:45 WARN DataNucleus.General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/home/hadoop/.versions/spark-1.0.2-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar is already registered, and you are trying to register an identical plugin located at URL file:/home/hadoop/spark/lib/datanucleus-rdbms-3.2.1.jar. 14/09/18 02:28:45 WARN DataNucleus.General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple
Re: MLLib: LIBSVM issue
Hi, The spacing between the inputs should be a single space, not a tab. I feel like your inputs have tabs between them instead of a single space. Therefore the parser cannot parse the input. Best, Burak - Original Message - From: Sameer Tilak ssti...@live.com To: user@spark.apache.org Sent: Wednesday, September 17, 2014 7:25:10 PM Subject: MLLib: LIBSVM issue Hi All,We have a fairly large amount of sparse data. I was following the following instructions in the manual: Sparse dataIt is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:label index1:value1 index2:value2 ... import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt) I believe that I have formatted my data as per the required Libsvm format. Here is a snippet of that: 1122:11693:11771:11974:12334:1 2378:12562:1 1118:11389:11413:11454:1 1780:12562:15051:15417:15548:1 5798:15862:1 0150:1214:1468:11013:1 1078:11092:11117:11489:11546:11630:1 1635:11827:12024:12215:12478:1 2761:15985:16115:16218:1 0251:15578:1 However,When I use MLUtils.loadLibSVMFile(sc, path-to-data-file)I get the following error messages in mt spark-shell. Can someone please point me in right direction. java.lang.NumberFormatException: For input string: 150:1214:1 468:11013:11078:11092:11117:11489:1 1546:11630:11635:11827:12024:12215:1 2478:12761:15985:16115:16218:1 at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1241) at java.lang.Double.parseDouble(Double.java:540) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SchemaRDD and RegisterAsTable
Hi, We built out SPARK 1.1.0 Version with MVN using mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive clean package And the Thrift Server has been configured to use the Hive Meta Store. When a schemaRDD is registered as table where does the metadata of this table get stored. Can it be stored in the configured hive meta-store or? Also if the thrift Server is not configured to use the HIVE metastore its using its own default (probably derby) metastore.So would the table metainfo be stored in this meta-store. Best Regards Santosh
Re: SchemaRDD and RegisterAsTable
The registered table is stored within the spark context itself. To have the table available for the thrift server to get access to, you can save the sc table into the Hive context so that way the Thrift server process can see the table. If you are using derby as your metastore, then the thrift server should be accessing this as you would want to utilize the same hive configuration (i.e. hive-site.xml). You may want to migrate your metastore to MySQL or Postgres as it will be handle concurrency better than derby. HTH! Denny On September 17, 2014 at 21:47:50, Addanki, Santosh Kumar (santosh.kumar.adda...@sap.com) wrote: Hi, We built out SPARK 1.1.0 Version with MVN using mvn –Pyarn –Phadoop-2.4 –Dhadoop.version=2.4.0 –Phive clean package And the Thrift Server has been configured to use the Hive Meta Store. When a schemaRDD is registered as table where does the metadata of this table get stored. Can it be stored in the configured hive meta-store or? Also if the thrift Server is not configured to use the HIVE metastore its using its own default (probably derby) metastore.So would the table metainfo be stored in this meta-store. Best Regards Santosh
Re: problem with HiveContext inside Actor
Thanks for your reply. Michael: No. I only create one HiveContext in the code. Hao: Yes. I subclass HiveContext and defines own function to create database and then subclass akka Actor to call that function in response to an abstract message. By your suggestion, I called println(sessionState.getConf.getAllProperties) that printed tons of properties; however, the same NullPointerException was still thrown. As mentioned, the weird thing is that everything worked fine if I simply called actor.hiveContext.createDB() directly. But it throws the null pointer exception from Driver.java if I do actor ! CreateSomeDB”, which seems to me just the same thing because the actor does nothing but call createDB(). Du From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Wednesday, September 17, 2014 at 7:40 PM To: Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com Cc: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: problem with HiveContext inside Actor - dev Is it possible that you are constructing more than one HiveContext in a single JVM? Due to global state in Hive code this is not allowed. Michael On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Hi, Du I am not sure what you mean “triggers the HiveContext to create a database”, do you create the sub class of HiveContext? Just be sure you call the “HiveContext.sessionState” eagerly, since it will set the proper “hiveconf” into the SessionState, otherwise the HiveDriver will always get the null value when retrieving HiveConf. Cheng Hao From: Du Li [mailto:l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, September 18, 2014 7:51 AM To: user@spark.apache.orgmailto:user@spark.apache.org; d...@spark.apache.orgmailto:d...@spark.apache.org Subject: problem with HiveContext inside Actor Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not initialized. Ironically, it works fine if my main thread directly calls actor.hiveContext to create the database. The spark version is 1.1.0. Thanks, Du
Python version of kmeans
Hi all, I need the kmeans code written against Pyspark for some testing purpose. Can somebody tell me the difference between these two files. spark-1.0.1/examples/src/main/python/kmeans.py and spark-1.0.1/python/pyspark/mllib/clustering.py Thanks Regards, Meethu M
SQL shell for Spark SQL?
Is there a shell available for Spark SQL, similar to the way the Shark or Hive shells work? From my reading up on Spark SQL, it seems like one can execute SQL queries in the Spark shell, but only from within code in a programming language such as Scala. There does not seem to be any way to directly issue SQL (or HQL) queries from the shell, send files of queries to the shell (i.e., using the -f flag), etc. Does this functionality exist somewhere and I'm just missing it? Thanks, DR - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org