Re: scalac crash when compiling DataTypeConversions.scala
Hey Ryan, I've found that filing issues with the Scala/Typesafe JIRA is pretty helpful if the issue can be fully reproduced, and even sometimes helpful if it can't. You can file bugs here: https://issues.scala-lang.org/secure/Dashboard.jspa The Spark SQL code in particular is typically the source of these, as we use more fancy scala features. In a pinch it is also possible to recompile and test code without building SQL if you just run tests for the specific module (e.g. streaming). In sbt this sort of just works: sbt/sbt streaming/test-only a.b.c* In maven it's more clunky but if you do a mvn install first then (I think) you can test sub-modules independently: mvn test -pl streaming ... - Patrick On Wed, Oct 22, 2014 at 10:00 PM, Ryan Williams ryan.blake.willi...@gmail.com wrote: I started building Spark / running Spark tests this weekend and on maybe 5-10 occasions have run into a compiler crash while compiling DataTypeConversions.scala. Here is a full gist of an innocuous test command (mvn test -Dsuites='*KafkaStreamSuite') exhibiting this behavior. Problem starts on L512 and there's a final stack trace at the bottom. mvn clean or ./sbt/sbt clean fix it (I believe I've observed the issue while compiling with each tool), but are annoying/time-consuming to do, obvs, and it's happening pretty frequently for me when doing only small numbers of incremental compiles punctuated by e.g. checking out different git commits. Have other people seen this? This post on this list is basically the same error, but in TestSQLContext.scala and this SO post claims to be hitting it when trying to build in intellij. It seems likely to be a bug in scalac; would finding a consistent repro case and filing it somewhere be useful? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: About Memory usage in the Spark UI
It shows the amount of memory used to store RDD blocks, which are created when you run .cache()/.persist() on an RDD. On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, please take a look at the attached screen-shot. I wonders what's the Memory Used column mean. I give 2GB memory to the driver process and 12GB memory to the executor process. Thank you!
Re: Spark Hive Snappy Error
HI Removed export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar” It works, THANK YOU!! Regards Arthur On 23 Oct, 2014, at 1:00 pm, Shao, Saisai saisai.s...@intel.com wrote: Seems you just add snappy library into your classpath: export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar But for spark itself, it depends on snappy-0.2.jar. Is there any possibility that this problem caused by different version of snappy? Thanks Jerry From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] Sent: Thursday, October 23, 2014 11:32 AM To: Shao, Saisai Cc: arthur.hk.c...@gmail.com; user Subject: Re: Spark Hive Snappy Error Hi, Please find the attached file. my spark-default.xml # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. # # Example: # spark.masterspark://master:7077 # spark.eventLog.enabled true # spark.eventLog.dir hdfs://namenode:8021/directory # spark.serializerorg.apache.spark.serializer.KryoSerializer # spark.executor.memory 2048m spark.shuffle.spill.compressfalse spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec my spark-env.sh #!/usr/bin/env bash export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar export CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64 export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop} export SPARK_WORKER_DIR=/edh/hadoop_data/spark_work/ export SPARK_LOG_DIR=/edh/hadoop_logs/spark export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64 export SPARK_CLASSPATH=$SPARK_HOME/lib_managed/jars/mysql-connector-java-5.1.31-bin.jar export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HBASE_HOME/lib/*:$HIVE_HOME/csv-serde-1.1.2-0.11.0-all.jar: export SPARK_WORKER_MEMORY=2g export HADOOP_HEAPSIZE=2000 export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=m35:2181,m33:2181,m37:2181 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC ll $HADOOP_HOME/lib/native/Linux-amd64-64 -rw-rw-r--. 1 tester tester50523 Aug 27 14:12 hadoop-auth-2.4.1.jar -rw-rw-r--. 1 tester tester 1062640 Aug 27 12:19 libhadoop.a -rw-rw-r--. 1 tester tester 1487564 Aug 27 11:14 libhadooppipes.a lrwxrwxrwx. 1 tester tester 24 Aug 27 07:08 libhadoopsnappy.so - libhadoopsnappy.so.0.0.1 lrwxrwxrwx. 1 tester tester 24 Aug 27 07:08 libhadoopsnappy.so.0 - libhadoopsnappy.so.0.0.1 -rwxr-xr-x. 1 tester tester54961 Aug 27 07:08 libhadoopsnappy.so.0.0.1 -rwxrwxr-x. 1 tester tester 630328 Aug 27 12:19 libhadoop.so -rwxrwxr-x. 1 tester tester 630328 Aug 27 12:19 libhadoop.so.1.0.0 -rw-rw-r--. 1 tester tester 582472 Aug 27 11:14 libhadooputils.a -rw-rw-r--. 1 tester tester 298626 Aug 27 11:14 libhdfs.a -rwxrwxr-x. 1 tester tester 200370 Aug 27 11:14 libhdfs.so -rwxrwxr-x. 1 tester tester 200370 Aug 27 11:14 libhdfs.so.0.0.0 lrwxrwxrwx. 1 tester tester 55 Aug 27 07:08 libjvm.so -/usr/lib/jvm/jdk1.6.0_45/jre/lib/amd64/server/libjvm.so lrwxrwxrwx. 1 tester tester 25 Aug 27 07:08 libprotobuf-lite.so - libprotobuf-lite.so.8.0.0 lrwxrwxrwx. 1 tester tester 25 Aug 27 07:08 libprotobuf-lite.so.8 - libprotobuf-lite.so.8.0.0 -rwxr-xr-x. 1 tester tester 964689 Aug 27 07:08 libprotobuf-lite.so.8.0.0 lrwxrwxrwx. 1 tester tester 20 Aug 27 07:08 libprotobuf.so - libprotobuf.so.8.0.0 lrwxrwxrwx. 1 tester tester 20 Aug 27 07:08 libprotobuf.so.8 - libprotobuf.so.8.0.0 -rwxr-xr-x. 1 tester tester 8300050 Aug 27 07:08 libprotobuf.so.8.0.0 lrwxrwxrwx. 1 tester tester 18 Aug 27 07:08 libprotoc.so - libprotoc.so.8.0.0 lrwxrwxrwx. 1 tester tester 18 Aug 27 07:08 libprotoc.so.8 - libprotoc.so.8.0.0 -rwxr-xr-x. 1 tester tester 9935810 Aug 27 07:08 libprotoc.so.8.0.0 -rw-r--r--. 1 tester tester 233554 Aug 27 15:19 libsnappy.a lrwxrwxrwx. 1 tester tester 23 Aug 27 11:32 libsnappy.so - /usr/lib64/libsnappy.so lrwxrwxrwx. 1 tester tester 23 Aug 27 11:33 libsnappy.so.1 - /usr/lib64/libsnappy.so -rwxr-xr-x. 1 tester tester 147726 Aug 27 07:08 libsnappy.so.1.2.0 drwxr-xr-x. 2 tester tester 4096 Aug 27 07:08 pkgconfig Regards Arthur On 23 Oct, 2014, at 10:57 am, Shao, Saisai saisai.s...@intel.com wrote: Hi Arthur, I think your problem might be different from what SPARK-3958(https://issues.apache.org/jira/browse/SPARK-3958) mentioned, seems your problem is more likely to be a library link problem, would you mind checking your Spark runtime to see if the snappy.so is loaded or not? (through lsof -p). I guess your problem is more likely to be a library not found problem. Thanks
Is Spark streaming suitable for our architecture?
Hi I'm evaluating Spark streaming to see if it fits to scale or current architecture. We are currently downloading and processing 6M documents per day from online and social media. We have a different workflow for each type of document, but some of the steps are keyword extraction, language detection, clustering, classification, indexation, We are using Gearman to dispatch the job to workers and we have some queues on a database. Everything is in near real time. I'm wondering if we could integrate Spark streaming on the current workflow and if it's feasible. One of our main discussions are if we have to go to a fully distributed architecture or to a semi-distributed one. I mean, distribute everything or process some steps on the same machine (crawling, keyword extraction, language detection, indexation). We don't know which one scales more, each one has pros and cont. Now we have a semi-distributed one as we had network problems taking into account the amount of data we were moving around. So now, all documents crawled on server X, later on are dispatched through Gearman to the same server. What we dispatch on Gearman is only the document id, and the document data remains on the crawling server on a Memcached, so the network traffic is keep at minimum. It's feasible to remove all database queues and Gearman and move to Spark streaming? We are evaluating to add Kakta to the system too. Is anyone using Spark streaming for a system like ours? Should we worry about the network traffic? or it's something Spark can manage without problems. Every document is arround 50k (300Gb a day +/-). If we wanted to isolate some steps to be processed on the same machine/s (or give priority), is something we could do with Spark? Any help or comment will be appreciate. And If someone has had a similar problem and has knowledge about the architecture approach will be more than welcomed. Thanks
Dynamically loaded Spark-stream consumer
I have a use case that I need to continuously ingest data from Kafka stream. However apart from ingestion (to HBase), I also need to compute some metrics (i.e. avg for last min, etc.). The problem is that it's very likely I'll continuously add more metrics and I don't want to restart my spark program from time to time. Is there a mechanism that Spark stream can load and plugin code in runtime without restarting? Any solutions or suggestions? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
How to access objects declared and initialized outside the call() method of JavaRDD
Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal
RE: About Memory usage in the Spark UI
Patrick, thanks for the response. May I ask more questions? I'm running a Spark Streaming application which receives data from socket and does some transformations. The event injection rate is too high so the processing duration is larger than batch interval. So I see Could not compute split, block input-0-1414049609200 not found issue as discussed by others in this post: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html#a11237; If the understanding is correct, Spark is lack of storage in this case because of event pile-up, so it needs to delete some splits in order to free memory. However, even in this case, I still see very small number (like 3MB) in the Memory Used column where the total memory seems to be quite big (like 6GB). So I think the number shown in this column may have problems. How do Spark calculate the total memory based on allocated JVM heap size? I guess it's related with the spark.storage.memoryFraction configuration, but want to know the details. And why the driver also uses memory to store RDD blocks? Thanks again for the answer! From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: 2014年10月23日 14:00 To: Haopu Wang Cc: user Subject: Re: About Memory usage in the Spark UI It shows the amount of memory used to store RDD blocks, which are created when you run .cache()/.persist() on an RDD. On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, please take a look at the attached screen-shot. I wonders what's the Memory Used column mean. I give 2GB memory to the driver process and 12GB memory to the executor process. Thank you!
Which is better? One spark app listening to 10 topics vs. 10 spark apps each listening to 1 topic
The Kafka stream has 10 topics and the data rate is quite high (~ 100K/s per topic). Which configuration do you recommend? - 1 Spark app consuming all Kafka topics - 10 separate Spark app each consuming one topic Assuming they have the same resource pool. Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
NoClassDefFoundError on ThreadFactoryBuilder in Intellij
After having checked out from master/head the following error occurs when attempting to run any test in Intellij Exception in thread main java.lang.NoClassDefFoundError: com/google/common/util/concurrent/ThreadFactoryBuilder at org.apache.spark.util.Utils$.init(Utils.scala:648) There appears to be a related issue/JIRA: https://issues.apache.org/jira/browse/SPARK-3217 But the conditions described do not apply in my case: Did you by any chance do one of the following: - forget to clean after pulling that change - mix sbt and mvn built artifacts in the same build - set SPARK_PREPEND_CLASSES For reference here is the full stacktrace: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/util/concurrent/ThreadFactoryBuilder at org.apache.spark.util.Utils$.init(Utils.scala:648) at org.apache.spark.util.Utils$.clinit(Utils.scala) at org.apache.spark.SparkContext.init(SparkContext.scala:179) at org.apache.spark.SparkContext.init(SparkContext.scala:119) at org.apache.spark.SparkContext.init(SparkContext.scala:134) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:62) at org.apache.spark.sql.hbase.JavaHBaseSQLContext$.main(JavaHBaseSQLContext.scala:45) at org.apache.spark.sql.hbase.JavaHBaseSQLContext.main(JavaHBaseSQLContext.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) Caused by: java.lang.ClassNotFoundException: com.google.common.util.concurrent.ThreadFactoryBuilder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 13 more Exception in thread delete Spark temp dirs java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.util.Utils$ at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:173)
Re: How to access objects declared and initialized outside the call() method of JavaRDD
In Java, javaSparkContext would have to be declared final in order for it to be accessed inside an inner class like this. But this would still not work as the context is not serializable. You should rewrite this so you are not attempting to use the Spark context inside an RDD. On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell universal.localh...@gmail.com wrote: Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Solving linear equations
The 0 vector is a trivial solution. Is the data big, such that it can't be computed on one machine? if so I assume this system is over-determined. You can use a decomposition to find a least-squares solution, but the SVD is overkill and in any event distributed decompositions don't exist in the project. You can solve it a linear regression as Mr Das says. If it's small enough to fit locally you should just use a matrix library to solve Ax = b with the QR decomposition or something, with Breeze or Commons Math or octave or R. Lots of options if it's smallish. On Thu, Oct 23, 2014 at 12:15 AM, Martin Enzinger martin.enzin...@gmail.com wrote: Hi, I'm wondering how to use Mllib for solving equation systems following this pattern 2*x1 + x2 + 3*x3 + + xn = 0 x1 + 0*x2 + 3*x3 + + xn = 0 .. .. 0*x1 + x2 + 0*x3 + + xn = 0 I definitely still have some reading to do to really understand the direct solving techniques, but at the current state of knowledge SVD could help me with this right? Can you point me to an example or a tutorial? best regards - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is Spark streaming suitable for our architecture?
Hi Albert, Have a couple of questions: - You mentioned near real-time. What exactly is your SLA for processing each document? - Which crawler are you using and are you looking to bring in Hadoop into your overall workflow. You might want to read up on how network traffic is minimized/managed on the Hadoop cluster - as you had run into network issues with your current architecture. Thanks! On Thu, Oct 23, 2014 at 12:07 AM, Albert Vila albert.v...@augure.com wrote: Hi I'm evaluating Spark streaming to see if it fits to scale or current architecture. We are currently downloading and processing 6M documents per day from online and social media. We have a different workflow for each type of document, but some of the steps are keyword extraction, language detection, clustering, classification, indexation, We are using Gearman to dispatch the job to workers and we have some queues on a database. Everything is in near real time. I'm wondering if we could integrate Spark streaming on the current workflow and if it's feasible. One of our main discussions are if we have to go to a fully distributed architecture or to a semi-distributed one. I mean, distribute everything or process some steps on the same machine (crawling, keyword extraction, language detection, indexation). We don't know which one scales more, each one has pros and cont. Now we have a semi-distributed one as we had network problems taking into account the amount of data we were moving around. So now, all documents crawled on server X, later on are dispatched through Gearman to the same server. What we dispatch on Gearman is only the document id, and the document data remains on the crawling server on a Memcached, so the network traffic is keep at minimum. It's feasible to remove all database queues and Gearman and move to Spark streaming? We are evaluating to add Kakta to the system too. Is anyone using Spark streaming for a system like ours? Should we worry about the network traffic? or it's something Spark can manage without problems. Every document is arround 50k (300Gb a day +/-). If we wanted to isolate some steps to be processed on the same machine/s (or give priority), is something we could do with Spark? Any help or comment will be appreciate. And If someone has had a similar problem and has knowledge about the architecture approach will be more than welcomed. Thanks
SparkSQL and columnar data
Hi guys, another question: what’s the approach to working with column-oriented data, i.e. data with more than 1000 columns. Using Parquet for this should be fine, but how well does SparkSQL handle the big amount of columns? Is there a limit? Should we use standard Spark instead? Thanks for any insights, - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multitenancy in Spark - within/across spark context
Upvote for the multitanency requirement. I'm also building a data analytic platform and there'll be multiple users running queries and computations simultaneously. One of the paint point is control of resource size. Users don't really know how much nodes they need, they always use as much as possible... The result is lots of wasted resource in our Yarn cluster. A way to 1) allow multiple spark context to share the same resource or 2) add dynamic resource management for Yarn mode is very much wanted. Jianshi On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: That's not something you might want to do usually. In general, a SparkContext maps to a user application My question was basically this. In this page in the official doc, under Scheduling within an application section, it talks about multiuser and fair sharing within an app. How does multiuser within an application work(how users connect to an app,run their stuff) ? When would I want to use this ? I see. The way I read that page is that Spark supports all those scheduling options; but Spark doesn't give you the means to actually be able to submit jobs from different users to a running SparkContext hosted on a different process. For that, you'll need something like the job server that I referenced before, or write your own framework for supporting that. Personally, I'd use the information on that page when dealing with concurrent jobs in the same SparkContext, but still restricted to the same user. I'd avoid trying to create any application where a single SparkContext is trying to be shared by multiple users in any way. As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. I basically wanted to find out if there were any gotchas related to preemption on Spark. Things like say half of an application's executors got preempted say while doing reduceByKey, will the application progress with the remaining resources/fair share ? Jobs should still make progress as long as at least one executor is available. The gotcha would be the one I mentioned, where Spark will fail your job after x executors failed, which might be a common occurrence when preemption is enabled. That being said, it's a configurable option, so you can set x to a very large value and your job should keep on chugging along. The options you'd want to take a look at are: spark.task.maxFailures and spark.yarn.max.executor.failures -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Spark Cassandra Connector proper usage
I'm looking to use spark for some ETL, which will mostly consist of update statements (a column is a set, that'll be appended to, so a simple insert is likely not going to work). As such, it seems like issuing CQL queries to import the data is the best option. Using the Spark Cassandra Connector, I see I can do this: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra Now I don't want to open a session and close it for every row in the source (am I right in not wanting this? Usually, I have one session for the entire process, and keep using that in normal apps). However, it says that the connector is serializable, but the session is obviously not. So, wrapping the whole import inside a single withSessionDo seems like it'll cause problems. I was thinking of using something like this: class CassandraStorage(conf:SparkConf) { val session = CassandraConnector(conf).openSession() def store (t:Thingy) : Unit = { //session.execute cql goes here } } Is this a good approach? Do I need to worry about closing the session? Where / how best would I do that? Any pointers are appreciated. Thanks,Ashic.
Re: Is Spark streaming suitable for our architecture?
Hi Jayant, On 23 October 2014 11:14, Jayant Shekhar jay...@cloudera.com wrote: Hi Albert, Have a couple of questions: - You mentioned near real-time. What exactly is your SLA for processing each document? The minimum the best :). Right now it's between 30s - 5m, but I would like to have something stable arround 1-2m if possible. Taking into account that the system should be able to scale to 50M - 100M documents. - Which crawler are you using and are you looking to bring in Hadoop into your overall workflow. You might want to read up on how network traffic is minimized/managed on the Hadoop cluster - as you had run into network issues with your current architecture. Everything is developed by us. The network issues were not related to the crawler itself, they were related to the documents we were moving around the system to be processed for each workflow stage. And yes, we are currently researching if we can introduce Spark streaming to be able to scale and execute all workflow stages and use Hdfs/Cassandra to store the data. Should we use the DStream persist function (if we use every document as a RDD), in order to reuse the same data or it's better to create new DStreams? On each step we add additional data to the document, for example on the language extraction, we begin with a document without language, and we output the document with a new language field. Thanks Thanks! On Thu, Oct 23, 2014 at 12:07 AM, Albert Vila albert.v...@augure.com wrote: Hi I'm evaluating Spark streaming to see if it fits to scale or current architecture. We are currently downloading and processing 6M documents per day from online and social media. We have a different workflow for each type of document, but some of the steps are keyword extraction, language detection, clustering, classification, indexation, We are using Gearman to dispatch the job to workers and we have some queues on a database. Everything is in near real time. I'm wondering if we could integrate Spark streaming on the current workflow and if it's feasible. One of our main discussions are if we have to go to a fully distributed architecture or to a semi-distributed one. I mean, distribute everything or process some steps on the same machine (crawling, keyword extraction, language detection, indexation). We don't know which one scales more, each one has pros and cont. Now we have a semi-distributed one as we had network problems taking into account the amount of data we were moving around. So now, all documents crawled on server X, later on are dispatched through Gearman to the same server. What we dispatch on Gearman is only the document id, and the document data remains on the crawling server on a Memcached, so the network traffic is keep at minimum. It's feasible to remove all database queues and Gearman and move to Spark streaming? We are evaluating to add Kakta to the system too. Is anyone using Spark streaming for a system like ours? Should we worry about the network traffic? or it's something Spark can manage without problems. Every document is arround 50k (300Gb a day +/-). If we wanted to isolate some steps to be processed on the same machine/s (or give priority), is something we could do with Spark? Any help or comment will be appreciate. And If someone has had a similar problem and has knowledge about the architecture approach will be more than welcomed. Thanks
what's the best way to initialize an executor?
I have some code that I only need to be executed once per executor in my spark application. My current approach is to do something like the following: scala xmlKeyPair.foreachPartition(i = XPathProcessor.init(ats, Namespaces/NamespaceContext)) So, If I understand correctly, the XPathProcessor.init will be called once per partition. Since I have 48 partitions for this RDD and 2 million documents, that seems acceptable. The downside is that I likely will have fewer executors than 48 (each executor will handle more than 1 partition) so the executor would be called more than once with XPathProcessor.init. I have code in place to make sure this is not an issue. But, I was wondering if there is a better way to accomplish something like this. Thanks. Darin.
RE: Spark Cassandra Connector proper usage
Hi Gerard, Thanks for the response. Here's the scenario: The target cassandra schema looks like this: create table foo ( id text primary key, bar int, things settext ) The source in question is a Sql Server source providing the necessary data. The source goes over the same id multiple times adding things to the things set each time. With inserts, it'll replace things with a new set of one element, instead of appending that item. As such, the query update foo set things = things + ? where id=? solves the problem. If I had to stick with saveToCassasndra, I'd have to read in the existing row for each row, and then write it back. Since this is happening in parallel on multiple machines, that would likely cause discrepancies where a node will read and update to older values. Hence my question about session management in order to issue custom update queries. Thanks, Ashic. Date: Thu, 23 Oct 2014 14:27:47 +0200 Subject: Re: Spark Cassandra Connector proper usage From: gerard.m...@gmail.com To: as...@live.com Ashic, With the Spark-cassandra connector you would typically create an RDD from the source table, update what you need, filter out what you don't update and write it back to Cassandra. Kr, Gerard On Oct 23, 2014 1:21 PM, Ashic Mahtab as...@live.com wrote: I'm looking to use spark for some ETL, which will mostly consist of update statements (a column is a set, that'll be appended to, so a simple insert is likely not going to work). As such, it seems like issuing CQL queries to import the data is the best option. Using the Spark Cassandra Connector, I see I can do this: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra Now I don't want to open a session and close it for every row in the source (am I right in not wanting this? Usually, I have one session for the entire process, and keep using that in normal apps). However, it says that the connector is serializable, but the session is obviously not. So, wrapping the whole import inside a single withSessionDo seems like it'll cause problems. I was thinking of using something like this: class CassandraStorage(conf:SparkConf) { val session = CassandraConnector(conf).openSession() def store (t:Thingy) : Unit = { //session.execute cql goes here } } Is this a good approach? Do I need to worry about closing the session? Where / how best would I do that? Any pointers are appreciated. Thanks,Ashic.
Aggregation Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Hi, I got $TreeNodeException, few questions: Q1) How should I do aggregation in SparK? Can I use aggregation directly in SQL? or Q1) Should I use SQL to load the data to form RDD then use scala to do the aggregation? Regards Arthur MySQL (good one, without aggregation): sqlContext.sql(SELECT L_RETURNFLAG FROM LINEITEM WHERE L_SHIPDATE='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS).collect().foreach(println); [A] [N] [N] [R] My SQL (problem SQL, with aggregation): sqlContext.sql(SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS SUM_QTY, SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT )) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT ) * ( 1 + L_TAX )) AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY, AVG(L_EXTENDEDPRICE) AS AVG_PRICE, AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER FROM LINEITEM WHERE L_SHIPDATE='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS).collect().foreach(println); 14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200) Aggregate false, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS sum_disc_price#183,SUM(PartialSum#219) AS sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / CAST(SUM(PartialCount#221L), DoubleType)) AS avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / CAST(SUM(PartialCount#223L), DoubleType)) AS avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / CAST(SUM(PartialCount#225L), DoubleType)) AS avg_disc#187,SUM(PartialCount#226L) AS count_order#188L] Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200) Aggregate true, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS PartialSum#218,SUM(l_extendedprice#194) AS PartialSum#217,COUNT(l_quantity#193) AS PartialCount#221L,SUM(l_quantity#193) AS PartialSum#220,COUNT(l_extendedprice#194) AS PartialCount#223L,SUM(l_extendedprice#194) AS PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216] Project [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193] Filter (l_shipdate#197 = 1998-09-02) HiveTableScan [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193], (MetastoreRelation boc_12, lineitem, None), None at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at
Re: Spark Streaming Applications
What is the application about? I couldn't find any proper description regarding the purpose of killrweather ( I mean, other than just integrating Spark with Cassandra). Do you know if the slides of that tutorial are available somewhere? Thanks! On Wed, Oct 22, 2014 at 6:58 PM, Sameer Farooqui same...@databricks.com wrote: Hi Saiph, Patrick McFadin and Helena Edelson from DataStax taught a tutorial at NYC Strata last week where they created a prototype Spark Streaming + Kafka application for time series data. You can see the code here: https://github.com/killrweather/killrweather On Tue, Oct 21, 2014 at 4:33 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I have been trying to find a fairly complex application that makes use of the Spark Streaming framework. I checked public github repos but the examples I found were too simple, only comprising simple operations like counters and sums. On the Spark summit website, I could find very interesting projects, however no source code was available. Where can I find non-trivial spark streaming application code? Is it that difficult? Thanks.
Re: Aggregation Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Hello Arthur, You can use do aggregations in SQL. How did you create LINEITEM? Thanks, Yin On Thu, Oct 23, 2014 at 8:54 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, I got $TreeNodeException, few questions: Q1) How should I do aggregation in SparK? Can I use aggregation directly in SQL? or Q1) Should I use SQL to load the data to form RDD then use scala to do the aggregation? Regards Arthur MySQL (good one, without aggregation): sqlContext.sql(SELECT L_RETURNFLAG FROM LINEITEM WHERE L_SHIPDATE='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS).collect().foreach(println); [A] [N] [N] [R] My SQL (problem SQL, with aggregation): sqlContext.sql(SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS SUM_QTY, SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT )) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT ) * ( 1 + L_TAX )) AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY, AVG(L_EXTENDEDPRICE) AS AVG_PRICE, AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER FROM LINEITEM WHERE L_SHIPDATE='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS).collect().foreach(println); 14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200) Aggregate false, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS sum_disc_price#183,SUM(PartialSum#219) AS sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / CAST(SUM(PartialCount#221L), DoubleType)) AS avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / CAST(SUM(PartialCount#223L), DoubleType)) AS avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / CAST(SUM(PartialCount#225L), DoubleType)) AS avg_disc#187,SUM(PartialCount#226L) AS count_order#188L] Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200) Aggregate true, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS PartialSum#218,SUM(l_extendedprice#194) AS PartialSum#217,COUNT(l_quantity#193) AS PartialCount#221L,SUM(l_quantity#193) AS PartialSum#220,COUNT(l_extendedprice#194) AS PartialCount#223L,SUM(l_extendedprice#194) AS PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216] Project [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193] Filter (l_shipdate#197 = 1998-09-02) HiveTableScan [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193], (MetastoreRelation boc_12, lineitem, None), None at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at
Re: Spark Cassandra Connector proper usage
Hi Ashic, At the moment I see two options: 1) You could use the CassandraConnector object to execute your specialized query. The recommended pattern is to to that within a rdd.foreachPartition(...) in order to amortize DB connection setup over the number of elements in on partition. Something like this: val sparkContext = ??? val cassandraConnector = CassandraConnector(conf) val dataRdd = ??? // I assume this is the source of data val rddThingById = dataRdd.map(elem = transformToIdByThing(elem) ) rddThingById.foreachPartition(partition = { cassandraConnector.withSessionDo{ session = partition.foreach(record = session.execute(update foo set things = things + ? where id=? , record.id, record.thing) } } 2) You could change your datamodel slightly in order to avoid the update operation. Actually, the cassandra representation of a set is nothing more than a column - timestamp, where the column name is an element of the set. So Set (a,b,c) = Column(a)- ts, Column(b) - ts, Column(c) - tx So, if you desugarize your datamodel, you could use something like: create table foo ( id text primary key, bar int, things text, ts timestamp, primary key ((id, bar), things) ) And your Spark process would be reduced to: val sparkContext = ??? val dataRdd = ??? // I assume this is the source of data dataRdd.map(elem = transformToIdBarThingByTimeStamp(elem) ).saveToCassandra(ks, foo,Columns(id, bar, thing, ts)) Hope this helps. -kr, Gerard. On Thu, Oct 23, 2014 at 2:48 PM, Ashic Mahtab as...@live.com wrote: Hi Gerard, Thanks for the response. Here's the scenario: The target cassandra schema looks like this: create table foo ( id text primary key, bar int, things settext ) The source in question is a Sql Server source providing the necessary data. The source goes over the same id multiple times adding things to the things set each time. With inserts, it'll replace things with a new set of one element, instead of appending that item. As such, the query update foo set things = things + ? where id=? solves the problem. If I had to stick with saveToCassasndra, I'd have to read in the existing row for each row, and then write it back. Since this is happening in parallel on multiple machines, that would likely cause discrepancies where a node will read and update to older values. Hence my question about session management in order to issue custom update queries. Thanks, Ashic. -- Date: Thu, 23 Oct 2014 14:27:47 +0200 Subject: Re: Spark Cassandra Connector proper usage From: gerard.m...@gmail.com To: as...@live.com Ashic, With the Spark-cassandra connector you would typically create an RDD from the source table, update what you need, filter out what you don't update and write it back to Cassandra. Kr, Gerard On Oct 23, 2014 1:21 PM, Ashic Mahtab as...@live.com wrote: I'm looking to use spark for some ETL, which will mostly consist of update statements (a column is a set, that'll be appended to, so a simple insert is likely not going to work). As such, it seems like issuing CQL queries to import the data is the best option. Using the Spark Cassandra Connector, I see I can do this: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra Now I don't want to open a session and close it for every row in the source (am I right in not wanting this? Usually, I have one session for the entire process, and keep using that in normal apps). However, it says that the connector is serializable, but the session is obviously not. So, wrapping the whole import inside a single withSessionDo seems like it'll cause problems. I was thinking of using something like this: class CassandraStorage(conf:SparkConf) { val session = CassandraConnector(conf).openSession() def store (t:Thingy) : Unit = { //session.execute cql goes here } } Is this a good approach? Do I need to worry about closing the session? Where / how best would I do that? Any pointers are appreciated. Thanks, Ashic.
RE: Spark Cassandra Connector proper usage
Hi Gerard, I've gone with option 1, and seems to be working well. Option 2 is also quite interesting. Thanks for your help in this. Regards, Ashic. From: gerard.m...@gmail.com Date: Thu, 23 Oct 2014 17:07:56 +0200 Subject: Re: Spark Cassandra Connector proper usage To: as...@live.com CC: user@spark.apache.org Hi Ashic, At the moment I see two options: 1) You could use the CassandraConnector object to execute your specialized query. The recommended pattern is to to that within a rdd.foreachPartition(...) in order to amortize DB connection setup over the number of elements in on partition. Something like this: val sparkContext = ???val cassandraConnector = CassandraConnector(conf) val dataRdd = ??? // I assume this is the source of dataval rddThingById = dataRdd.map(elem = transformToIdByThing(elem) ) rddThingById.foreachPartition(partition = { cassandraConnector.withSessionDo{ session = partition.foreach(record = session.execute(update foo set things = things + ? where id=? , record.id, record.thing) } } 2) You could change your datamodel slightly in order to avoid the update operation. Actually, the cassandra representation of a set is nothing more than a column - timestamp, where the column name is an element of the set.So Set (a,b,c) = Column(a)- ts, Column(b) - ts, Column(c) - tx So, if you desugarize your datamodel, you could use something like:create table foo ( id text primary key, bar int, things text, ts timestamp, primary key ((id, bar), things)) And your Spark process would be reduced to:val sparkContext = ??? val dataRdd = ??? // I assume this is the source of datadataRdd.map(elem = transformToIdBarThingByTimeStamp(elem) ).saveToCassandra(ks, foo,Columns(id, bar, thing, ts)) Hope this helps. -kr, Gerard. On Thu, Oct 23, 2014 at 2:48 PM, Ashic Mahtab as...@live.com wrote: Hi Gerard, Thanks for the response. Here's the scenario: The target cassandra schema looks like this: create table foo ( id text primary key, bar int, things settext ) The source in question is a Sql Server source providing the necessary data. The source goes over the same id multiple times adding things to the things set each time. With inserts, it'll replace things with a new set of one element, instead of appending that item. As such, the query update foo set things = things + ? where id=? solves the problem. If I had to stick with saveToCassasndra, I'd have to read in the existing row for each row, and then write it back. Since this is happening in parallel on multiple machines, that would likely cause discrepancies where a node will read and update to older values. Hence my question about session management in order to issue custom update queries. Thanks, Ashic. Date: Thu, 23 Oct 2014 14:27:47 +0200 Subject: Re: Spark Cassandra Connector proper usage From: gerard.m...@gmail.com To: as...@live.com Ashic, With the Spark-cassandra connector you would typically create an RDD from the source table, update what you need, filter out what you don't update and write it back to Cassandra. Kr, Gerard On Oct 23, 2014 1:21 PM, Ashic Mahtab as...@live.com wrote: I'm looking to use spark for some ETL, which will mostly consist of update statements (a column is a set, that'll be appended to, so a simple insert is likely not going to work). As such, it seems like issuing CQL queries to import the data is the best option. Using the Spark Cassandra Connector, I see I can do this: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra Now I don't want to open a session and close it for every row in the source (am I right in not wanting this? Usually, I have one session for the entire process, and keep using that in normal apps). However, it says that the connector is serializable, but the session is obviously not. So, wrapping the whole import inside a single withSessionDo seems like it'll cause problems. I was thinking of using something like this: class CassandraStorage(conf:SparkConf) { val session = CassandraConnector(conf).openSession() def store (t:Thingy) : Unit = { //session.execute cql goes here } } Is this a good approach? Do I need to worry about closing the session? Where / how best would I do that? Any pointers are appreciated. Thanks,Ashic.
Re: what's the best way to initialize an executor?
It sounds like your code already does its initialization at most once per JVM, and that's about as good as it gets. Each partition asks for init in a thread-safe way and the first request succeeds. On Thu, Oct 23, 2014 at 1:41 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: I have some code that I only need to be executed once per executor in my spark application. My current approach is to do something like the following: scala xmlKeyPair.foreachPartition(i = XPathProcessor.init(ats, Namespaces/NamespaceContext)) So, If I understand correctly, the XPathProcessor.init will be called once per partition. Since I have 48 partitions for this RDD and 2 million documents, that seems acceptable. The downside is that I likely will have fewer executors than 48 (each executor will handle more than 1 partition) so the executor would be called more than once with XPathProcessor.init. I have code in place to make sure this is not an issue. But, I was wondering if there is a better way to accomplish something like this. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set hadoop native library path in spark-1.1
Hi, Try the --driver-library-path option of spark-submit, e.g.: /opt/spark/bin/spark-submit --driver-library-path /opt/hadoop/lib/native (...) Regards, Christophe. On 21/10/2014 20:44, Pradeep Ch wrote: Hi all, Can anyone tell me how to set the native library path in Spark. Right not I am setting it using SPARK_LIBRARY_PATH environmental variable in spark-env.sh. But still no success. I am still seeing this in spark-shell. NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Thanks, Pradeep Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multitenancy in Spark - within/across spark context
You may want to take a look at https://issues.apache.org/jira/browse/SPARK-3174. On Thu, Oct 23, 2014 at 2:56 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Upvote for the multitanency requirement. I'm also building a data analytic platform and there'll be multiple users running queries and computations simultaneously. One of the paint point is control of resource size. Users don't really know how much nodes they need, they always use as much as possible... The result is lots of wasted resource in our Yarn cluster. A way to 1) allow multiple spark context to share the same resource or 2) add dynamic resource management for Yarn mode is very much wanted. Jianshi On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: That's not something you might want to do usually. In general, a SparkContext maps to a user application My question was basically this. In this page in the official doc, under Scheduling within an application section, it talks about multiuser and fair sharing within an app. How does multiuser within an application work(how users connect to an app,run their stuff) ? When would I want to use this ? I see. The way I read that page is that Spark supports all those scheduling options; but Spark doesn't give you the means to actually be able to submit jobs from different users to a running SparkContext hosted on a different process. For that, you'll need something like the job server that I referenced before, or write your own framework for supporting that. Personally, I'd use the information on that page when dealing with concurrent jobs in the same SparkContext, but still restricted to the same user. I'd avoid trying to create any application where a single SparkContext is trying to be shared by multiple users in any way. As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. I basically wanted to find out if there were any gotchas related to preemption on Spark. Things like say half of an application's executors got preempted say while doing reduceByKey, will the application progress with the remaining resources/fair share ? Jobs should still make progress as long as at least one executor is available. The gotcha would be the one I mentioned, where Spark will fail your job after x executors failed, which might be a common occurrence when preemption is enabled. That being said, it's a configurable option, so you can set x to a very large value and your job should keep on chugging along. The options you'd want to take a look at are: spark.task.maxFailures and spark.yarn.max.executor.failures -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setting only master heap
Yeah, as Sameer commented, there is unfortunately not an equivalent `SPARK_MASTER_MEMORY` that you can set. You can work around this by starting the master and the slaves separately with different settings of SPARK_DAEMON_MEMORY each time. AFAIK there haven't been any major changes in the standalone master in 1.1.0, so I don't see an immediate explanation for what you're observing. In general the Spark master doesn't use that much memory, and even if there are many applications it will discard the old ones appropriately, so unless you have a ton (like thousands) of concurrently running applications connecting to it there's little likelihood for it to OOM. At least that's my understanding. -Andrew 2014-10-22 15:51 GMT-07:00 Sameer Farooqui same...@databricks.com: Hi Keith, Would be helpful if you could post the error message. Are you running Spark in Standalone mode or with YARN? In general, the Spark Master is only used for scheduling and it should be fine with the default setting of 512 MB RAM. Is it actually the Spark Driver's memory that you intended to change? *++ If in Standalone mode ++* You're right that SPARK_DAEMON_MEMORY set the memory to allocate to the Spark Master, Worker and even HistoryServer daemons together. SPARK_WORKER_MEMORY is slightly confusing. In Standalone mode, it is the amount of memory that a worker advertises as available for drivers to launch executors. The sum of the memory used by executors spawned from a worker cannot exceed SPARK_WORKER_MEMORY. Unfortunately, I'm not aware of a way to set the memory for Master and Worker individually, other than launching them manually. You can also try setting the config differently on each machine's spark-env.sh file. *++ If in YARN mode ++* In YARN, there is no setting for SPARK_DAEMON_MEMORY. Therefore this is only in the Standalone documentation. Remember that in YARN mode there is no Spark Worker, instead the YARN NodeManagers launches the Executors. And in YARN, there is no need to run a Spark Master JVM (since the YARN ResourceManager takes care of the scheduling). So, with YARN use SPARK_EXECUTOR_MEMORY to set the Executor's memory. And use SPARK_DRIVER_MEMORY to set the Driver's memory. Just an FYI - for compatibility's sake, even in YARN mode there is a setting for SPARK_WORKER_MEMORY, but this has been deprecated. If you do set it, it just does the same thing as setting SPARK_EXECUTOR_MEMORY would have done. - Sameer On Wed, Oct 22, 2014 at 1:46 PM, Keith Simmons ke...@pulse.io wrote: We've been getting some OOMs from the spark master since upgrading to Spark 1.1.0. I've found SPARK_DAEMON_MEMORY, but that also seems to increase the worker heap, which as far as I know is fine. Is there any setting which *only* increases the master heap size? Keith
Re: Shuffle issues in the current master
To add to Aaron's response, `spark.shuffle.consolidateFiles` only applies to hash-based shuffle, so you shouldn't have to set it for sort-based shuffle. And yes, since you changed neither `spark.shuffle.compress` nor `spark.shuffle.spill.compress` you can't possibly have run into what #2890 fixes. I'm assuming you're running master? Was it before or after this commit: https://github.com/apache/spark/commit/6b79bfb42580b6bd4c4cd99fb521534a94150693 ? -Andrew 2014-10-22 16:37 GMT-07:00 Aaron Davidson ilike...@gmail.com: You may be running into this issue: https://issues.apache.org/jira/browse/SPARK-4019 You could check by having 2000 or fewer reduce partitions. On Wed, Oct 22, 2014 at 1:48 PM, DB Tsai dbt...@dbtsai.com wrote: PS, sorry for spamming the mailing list. Based my knowledge, both spark.shuffle.spill.compress and spark.shuffle.compress are default to true, so in theory, we should not run into this issue if we don't change any setting. Is there any other big we run into? Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:37 PM, DB Tsai dbt...@dbtsai.com wrote: Or can it be solved by setting both of the following setting into true for now? spark.shuffle.spill.compress true spark.shuffle.compress ture Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:34 PM, DB Tsai dbt...@dbtsai.com wrote: It seems that this issue should be addressed by https://github.com/apache/spark/pull/2890 ? Am I right? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 11:54 AM, DB Tsai dbt...@dbtsai.com wrote: Hi all, With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but I've another exception now. I've no clue about what's going on; does anyone run into similar issue? Thanks. This is the configuration I use. spark.rdd.compress true spark.shuffle.consolidateFiles true spark.shuffle.manager SORT spark.akka.frameSize 128 spark.akka.timeout 600 spark.core.connection.ack.wait.timeout 600 spark.core.connection.auth.wait.timeout 300 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325) java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) java.io.ObjectInputStream.init(ObjectInputStream.java:299) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
Re: How to access objects declared and initialized outside the call() method of JavaRDD
Bang On Sean Before sending the issue mail, I was able to remove the compilation error by making it final but then got the Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext (As you mentioned) Now regarding your suggestion of changing the business logic, 1. *Is the current approach possible if I write the code in Scala ?* I think probably not but wanted to check with you. 2. Brief steps of what the code is doing: 1. Get raw sessions data from datatsore (C*) 2. Process the raw sessions data 3. Iterate over the processed data(derive from #2) and fetch the previously aggregated data from store for those rowkeys Add the values from this batch to previous batch values 4. Save back the updated values * This github gist might explain you more https://gist.github.com/rssvihla/6577359860858ccb0b33 https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a similar thing in scala.* I am trying to achieve a similar thing in Java using Spark Batch with C* as the datastore. I have attached the java code file to provide you some code details. (If I was not able to explain you the problem so the code will be handy) The reason why I am fetching only selective data (that I will update later) because Cassanbdra doesn't provide range queries so I thought fetching complete data might be expensive. It will be great if you can share ur thoughts. On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote: In Java, javaSparkContext would have to be declared final in order for it to be accessed inside an inner class like this. But this would still not work as the context is not serializable. You should rewrite this so you are not attempting to use the Spark context inside an RDD. On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell universal.localh...@gmail.com wrote: Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal -- --Unilocal PocAppNew.java Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access objects declared and initialized outside the call() method of JavaRDD
+1 to Sean. Is it possible to rewrite your code to not use SparkContext in RDD. Or why does javaFunctions() need the SparkContext. On Thu, Oct 23, 2014 at 10:53 AM, Localhost shell universal.localh...@gmail.com wrote: Bang On Sean Before sending the issue mail, I was able to remove the compilation error by making it final but then got the Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext (As you mentioned) Now regarding your suggestion of changing the business logic, 1. *Is the current approach possible if I write the code in Scala ?* I think probably not but wanted to check with you. 2. Brief steps of what the code is doing: 1. Get raw sessions data from datatsore (C*) 2. Process the raw sessions data 3. Iterate over the processed data(derive from #2) and fetch the previously aggregated data from store for those rowkeys Add the values from this batch to previous batch values 4. Save back the updated values * This github gist might explain you more https://gist.github.com/rssvihla/6577359860858ccb0b33 https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a similar thing in scala.* I am trying to achieve a similar thing in Java using Spark Batch with C* as the datastore. I have attached the java code file to provide you some code details. (If I was not able to explain you the problem so the code will be handy) The reason why I am fetching only selective data (that I will update later) because Cassanbdra doesn't provide range queries so I thought fetching complete data might be expensive. It will be great if you can share ur thoughts. On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote: In Java, javaSparkContext would have to be declared final in order for it to be accessed inside an inner class like this. But this would still not work as the context is not serializable. You should rewrite this so you are not attempting to use the Spark context inside an RDD. On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell universal.localh...@gmail.com wrote: Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal -- --Unilocal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Aggregation Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
HI, My step to create LINEITEM: $HADOOP_HOME/bin/hadoop fs -mkdir /tpch/lineitem $HADOOP_HOME/bin/hadoop fs -copyFromLocal lineitem.tbl /tpch/lineitem/ Create external table lineitem (L_ORDERKEY INT, L_PARTKEY INT, L_SUPPKEY INT, L_LINENUMBER INT, L_QUANTITY DOUBLE, L_EXTENDEDPRICE DOUBLE, L_DISCOUNT DOUBLE, L_TAX DOUBLE, L_RETURNFLAG STRING, L_LINESTATUS STRING, L_SHIPDATE STRING, L_COMMITDATE STRING, L_RECEIPTDATE STRING, L_SHIPINSTRUCT STRING, L_SHIPMODE STRING, L_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/lineitem’; Regards Arthur On 23 Oct, 2014, at 9:36 pm, Yin Huai huaiyin@gmail.com wrote: Hello Arthur, You can use do aggregations in SQL. How did you create LINEITEM? Thanks, Yin On Thu, Oct 23, 2014 at 8:54 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, I got $TreeNodeException, few questions: Q1) How should I do aggregation in SparK? Can I use aggregation directly in SQL? or Q1) Should I use SQL to load the data to form RDD then use scala to do the aggregation? Regards Arthur MySQL (good one, without aggregation): sqlContext.sql(SELECT L_RETURNFLAG FROM LINEITEM WHERE L_SHIPDATE='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS).collect().foreach(println); [A] [N] [N] [R] My SQL (problem SQL, with aggregation): sqlContext.sql(SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS SUM_QTY, SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT )) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT ) * ( 1 + L_TAX )) AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY, AVG(L_EXTENDEDPRICE) AS AVG_PRICE, AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER FROM LINEITEM WHERE L_SHIPDATE='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS).collect().foreach(println); 14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200) Aggregate false, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS sum_disc_price#183,SUM(PartialSum#219) AS sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / CAST(SUM(PartialCount#221L), DoubleType)) AS avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / CAST(SUM(PartialCount#223L), DoubleType)) AS avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / CAST(SUM(PartialCount#225L), DoubleType)) AS avg_disc#187,SUM(PartialCount#226L) AS count_order#188L] Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200) Aggregate true, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS PartialSum#218,SUM(l_extendedprice#194) AS PartialSum#217,COUNT(l_quantity#193) AS PartialCount#221L,SUM(l_quantity#193) AS PartialSum#220,COUNT(l_extendedprice#194) AS PartialCount#223L,SUM(l_extendedprice#194) AS PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216] Project [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193] Filter (l_shipdate#197 = 1998-09-02) HiveTableScan [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193], (MetastoreRelation boc_12, lineitem, None), None at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at
Re: How to access objects declared and initialized outside the call() method of JavaRDD
Hey Jayant, In my previous mail, I have mentioned a github gist *https://gist.github.com/rssvihla/6577359860858ccb0b33 https://gist.github.com/rssvihla/6577359860858ccb0b33 *which is doing very similar to what I want to do but its using scala language for spark. Hence my question (reiterating from previous mail): *Is the current approach possible if I write the code in Scala?* Why does javaFunctions() need the SparkContext? Because per row in the RDD, I am making a get call to the data store 'cassandra'. The reason why I am fetching only selective data (that I will update later) because Cassandra doesn't provide range queries so I thought fetching complete data might be expensive. On Thu, Oct 23, 2014 at 11:22 AM, Jayant Shekhar jay...@cloudera.com wrote: +1 to Sean. Is it possible to rewrite your code to not use SparkContext in RDD. Or why does javaFunctions() need the SparkContext. On Thu, Oct 23, 2014 at 10:53 AM, Localhost shell universal.localh...@gmail.com wrote: Bang On Sean Before sending the issue mail, I was able to remove the compilation error by making it final but then got the Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext (As you mentioned) Now regarding your suggestion of changing the business logic, 1. *Is the current approach possible if I write the code in Scala ?* I think probably not but wanted to check with you. 2. Brief steps of what the code is doing: 1. Get raw sessions data from datatsore (C*) 2. Process the raw sessions data 3. Iterate over the processed data(derive from #2) and fetch the previously aggregated data from store for those rowkeys Add the values from this batch to previous batch values 4. Save back the updated values * This github gist might explain you more https://gist.github.com/rssvihla/6577359860858ccb0b33 https://gist.github.com/rssvihla/6577359860858ccb0b33 and it does a similar thing in scala.* I am trying to achieve a similar thing in Java using Spark Batch with C* as the datastore. I have attached the java code file to provide you some code details. (If I was not able to explain you the problem so the code will be handy) The reason why I am fetching only selective data (that I will update later) because Cassanbdra doesn't provide range queries so I thought fetching complete data might be expensive. It will be great if you can share ur thoughts. On Thu, Oct 23, 2014 at 1:48 AM, Sean Owen so...@cloudera.com wrote: In Java, javaSparkContext would have to be declared final in order for it to be accessed inside an inner class like this. But this would still not work as the context is not serializable. You should rewrite this so you are not attempting to use the Spark context inside an RDD. On Thu, Oct 23, 2014 at 8:46 AM, Localhost shell universal.localh...@gmail.com wrote: Hey All, I am unable to access objects declared and initialized outside the call() method of JavaRDD. In the below code snippet, call() method makes a fetch call to C* but since javaSparkContext is defined outside the call method scope so compiler give a compilation error. stringRdd.foreach(new VoidFunctionString() { @Override public void call(String str) throws Exception { JavaRDDString vals = javaFunctions(javaSparkContext).cassandraTable(schema, table, String.class) .select(val); } }); In other languages I have used closure to do this but not able to achieve the same here. Can someone suggest how to achieve this in the current code context? --Unilocal -- --Unilocal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- --Unilocal
Re: Transforming the Dstream vs transforming each RDDs in the Dstream.
Hey Gerard, This is a very good question! *TL;DR: *The performance should be same, except in case of shuffle-based operations where the number of reducers is not explicitly specified. Let me answer in more detail by dividing the set of DStream operations into three categories. *1. Map-like operations (map, flatmap, filter, etc.) that does not involve any shuffling of data:* Performance should virtually be the same in both cases. Either ways, in each batch, the operations on the batch's RDD are first set on the driver, and then the actions like on the RDD are executed. There are very very minor differences in the two cases of early foreachRDD and late foreachRDD (e.x, cleaning up for function closures, etc.) but those should make almost not difference in the performance. *2. Operations involving shuffle: *Here is there is a subtle difference in both cases if the number of partitions is not specified. The default number of partitions used when using dstream.reduceByKey() and than when using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to play around with the number of reducers to see what performs better. But if the number of reducers is explicitly specified and is the same both cases, then the performance should be similar. Note that this difference in the default numbers are not guaranteed to be like this, it could change in future implementations. *3. Aggregation-like operations (count, reduce): *Here there is another subtle execution difference between - dstream.count() which produces a DStream of single-element RDDs, the element being the count, and - dstream.foreachRDD(_.count()) which returns the count directly. In the first case, some random worker node is chosen for the reduce, in another the driver is chosen for the reduce. There should not be a significant performance difference. *4. Other operations* including window ops and stateful ops (updateStateByKey), are obviously not part of the discussion as they cannot be (easily) done through early foreachRDD. Hope this helps! TD PS: Sorry for not noticing this question earlier. On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas gerard.m...@gmail.com wrote: PS: Just to clarify my statement: Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. With feared RDD operations on the driver I meant to contrast an rdd action like rdd.collect that would pull all rdd data to the driver, with dstream.foreachRDD(rdd = rdd.op) for which documentation says 'it runs on the driver' yet, all that it looks to be running on the driver is the scheduling of 'op' on that rdd, just like it happens for all rdd other operations (thanks to Sean for the clarification) So, not to move focus away from the original question: In Spark Streaming, would it be better to do foreachRDD early in a pipeline or instead do as much Dstream transformations before going into the foreachRDD call? Between these two pieces of code, from a performance perspective, what would be preferred and why: - Early foreachRDD: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) - As most dstream transformations as possible before foreachRDD: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} ? kr, Gerard. On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Matt, Unlike the feared RDD operations on the driver, it's my understanding that these Dstream ops on the driver are merely creating an execution plan for each RDD. My question still remains: Is it better to foreachRDD early in the process or do as much Dstream transformations before going into the foreachRDD call? Maybe this will require some empirical testing specific to each implementation? -kr, Gerard. On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell matt.narr...@gmail.com wrote: http://spark.apache.org/docs/latest/streaming-programming-guide.html foreachRDD is executed on the driver…. mn On Oct 20, 2014, at 3:07 AM, Gerard Maas gerard.m...@gmail.com wrote: Pinging TD -- I'm sure you know :-) -kr, Gerard. On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, We have been implementing several Spark Streaming jobs that are basically processing data and inserting it into Cassandra, sorting it among different keyspaces. We've been following the pattern: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) I've been wondering whether there would be a performance difference in transforming the dstream
Re: Spark Streaming Applications
Cc'ing Helena for more information on this. TD On Thu, Oct 23, 2014 at 6:30 AM, Saiph Kappa saiph.ka...@gmail.com wrote: What is the application about? I couldn't find any proper description regarding the purpose of killrweather ( I mean, other than just integrating Spark with Cassandra). Do you know if the slides of that tutorial are available somewhere? Thanks! On Wed, Oct 22, 2014 at 6:58 PM, Sameer Farooqui same...@databricks.com wrote: Hi Saiph, Patrick McFadin and Helena Edelson from DataStax taught a tutorial at NYC Strata last week where they created a prototype Spark Streaming + Kafka application for time series data. You can see the code here: https://github.com/killrweather/killrweather On Tue, Oct 21, 2014 at 4:33 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I have been trying to find a fairly complex application that makes use of the Spark Streaming framework. I checked public github repos but the examples I found were too simple, only comprising simple operations like counters and sums. On the Spark summit website, I could find very interesting projects, however no source code was available. Where can I find non-trivial spark streaming application code? Is it that difficult? Thanks.
JavaHiveContext class not found error. Help!!
I am trying to run the below Hive query on Yarn. I am using Cloudera 5.1. What can I do to make this work? /SELECT * FROM table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID;/ Below is the stack trace: Exception in thread Thread-4 java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:187) Caused by: *java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/api/java/JavaHiveContext* at HiveContextExample.main(HiveContextExample.java:57) ... 5 more Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.api.java.JavaHiveContext at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more Here is the code invoking it: /SparkConf conf = new SparkConf().setAppName(PartitionData); JavaSparkContext ctx = new JavaSparkContext(conf); JavaHiveContext hiveContext = new JavaHiveContext(ctx); String sql = SELECT * FROM table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID; JavaSchemaRDD partitionedRDD = hiveContext.sql(sql);/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaHiveContext-class-not-found-error-Help-tp17149.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access objects declared and initialized outside the call() method of JavaRDD
What I have been doing is building a JavaSparkContext the first time it is needed and keeping it as a ThreadLocal - All my code uses SparkUtilities.getCurrentContext(). On a Slave machine you build a new context and don't have to serialize it The code is in a large project at https://code.google.com/p/distributed-tools/ - a work in progress but the Spark aficionados on this list will say if the approach is Kosher public class SparkUtilities extends Serializable private transient static ThreadLocalJavaSparkContext threadContext; private static String appName = Anonymous; public static String getAppName() { return appName; } public static void setAppName(final String pAppName) { appName = pAppName; } /** * create a JavaSparkContext for the thread if none exists * * @return */ public static synchronized JavaSparkContext getCurrentContext() { if (threadContext == null) threadContext = new ThreadLocalJavaSparkContext(); JavaSparkContext ret = threadContext.get(); if (ret != null) return ret; SparkConf sparkConf = new SparkConf().setAppName(getAppName()); // Here do operations you would do to initialize a context ret = new JavaSparkContext(sparkConf); threadContext.set(ret); return ret; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-tp17094p17150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaHiveContext class not found error. Help!!
Hello there, This is more of a question for the cdh-users list, but in any case... In CDH 5.1 we skipped packaging of the Hive module in SparkSQL. That has been fixed in CDH 5.2, so if it's possible for you I'd recommend upgrading. On Thu, Oct 23, 2014 at 2:53 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to run the below Hive query on Yarn. I am using Cloudera 5.1. What can I do to make this work? /SELECT * FROM table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID;/ Below is the stack trace: Exception in thread Thread-4 java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:187) Caused by: *java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/api/java/JavaHiveContext* at HiveContextExample.main(HiveContextExample.java:57) ... 5 more Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.api.java.JavaHiveContext at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more Here is the code invoking it: /SparkConf conf = new SparkConf().setAppName(PartitionData); JavaSparkContext ctx = new JavaSparkContext(conf); JavaHiveContext hiveContext = new JavaHiveContext(ctx); String sql = SELECT * FROM table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID; JavaSchemaRDD partitionedRDD = hiveContext.sql(sql);/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaHiveContext-class-not-found-error-Help-tp17149.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark is running extremely slow with larger data set, like 2G
my spark version is 1.1.0 pre-build with hadoop 1.x my code is implemented in python trying to covert a graph data set in edge list to adjacency list spark is running in standalone mode It runs well with a small data set like soc-liveJournal1, about 1G Then I run it on 25G twitter graph, one task won't finish in hours I tried the input both from NFS and HDFS http://apache-spark-user-list.1001560.n3.nabble.com/file/n17152/48.png What's could be the problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark is running extremely slow with larger data set, like 2G
my code is here: from pyspark import SparkConf, SparkContext def Undirect(edge): vector = edge.strip().split('\t') if(vector[0].isdigit()): return [(vector[0], vector[1])] return [] conf = SparkConf() conf.setMaster(spark://compute-0-14:7077) conf.setAppName(adjacencylist) conf.set(spark.executor.memory, 1g) sc = SparkContext(conf = conf) file = sc.textFile(file:///home/xzhang/data/soc-LiveJournal1.txt) records = file.flatMap(lambda line: Undirect(line)).reduceByKey(lambda a, b: a + \t + b ) #print(records.count()) #records = records.sortByKey() records = records.map(lambda line: line[0] + \t + line[1]) records.saveAsTextFile(file:///home/xzhang/data/result) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.1.0 and Hive 0.12.0 Compatibility Issue
Hi My Spark is 1.1.0 and Hive is 0.12, I tried to run the same query in both Hive-0.12.0 then Spark-1.1.0, HiveQL works while SparkSQL failed. hive select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey where o_orderdate '1995-03-15' and l_shipdate '1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10; Ended Job = job_1414067367860_0011 MapReduce Jobs Launched: Job 0: Map: 1 Reduce: 1 Cumulative CPU: 2.0 sec HDFS Read: 261 HDFS Write: 96 SUCCESS Job 1: Map: 1 Reduce: 1 Cumulative CPU: 0.88 sec HDFS Read: 458 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 2 seconds 880 msec OK Time taken: 38.771 seconds scala sqlContext.sql(select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey where o_orderdate '1995-03-15' and l_shipdate '1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10).collect().foreach(println); org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 5.0 failed 4 times, most recent failure: Lost task 14.3 in stage 5.0 (TID 568, m34): java.lang.ClassCastException: java.lang.String cannot be cast to scala.math.BigDecimal scala.math.Numeric$BigDecimalIsFractional$.minus(Numeric.scala:182) org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114) org.apache.spark.sql.catalyst.expressions.Subtract.eval(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108) org.apache.spark.sql.catalyst.expressions.Multiply.eval(arithmetic.scala:70) org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:47) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108) org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58) org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69) org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at
Spark 1.1.0 and Hive 0.12.0 Compatibility Issue
(Please ignore if duplicated) Hi, My Spark is 1.1.0 and Hive is 0.12, I tried to run the same query in both Hive-0.12.0 then Spark-1.1.0, HiveQL works while SparkSQL failed. hive select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey where o_orderdate '1995-03-15' and l_shipdate '1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10; Ended Job = job_1414067367860_0011 MapReduce Jobs Launched: Job 0: Map: 1 Reduce: 1 Cumulative CPU: 2.0 sec HDFS Read: 261 HDFS Write: 96 SUCCESS Job 1: Map: 1 Reduce: 1 Cumulative CPU: 0.88 sec HDFS Read: 458 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 2 seconds 880 msec OK Time taken: 38.771 seconds scala sqlContext.sql(select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey where o_orderdate '1995-03-15' and l_shipdate '1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10).collect().foreach(println); org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 5.0 failed 4 times, most recent failure: Lost task 14.3 in stage 5.0 (TID 568, m34): java.lang.ClassCastException: java.lang.String cannot be cast to scala.math.BigDecimal scala.math.Numeric$BigDecimalIsFractional$.minus(Numeric.scala:182) org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114) org.apache.spark.sql.catalyst.expressions.Subtract.eval(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108) org.apache.spark.sql.catalyst.expressions.Multiply.eval(arithmetic.scala:70) org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:47) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108) org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58) org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69) org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at
Re: unable to make a custom class as a key in a pairrdd
Hi Jao, I don't really know why this doesn't work but I have two hints. You don't need to override hashCode and equals. The modifier case is doing that for you. Writing case class PersonID(id: String) would be enough to get the class you want I think. If I change the type of the id param to Int it works for me but I don't know why. case class PersonID(id: Int) Looks like a strange behavior to me. Have a try. Good luck, Niklas On 23.10.2014 21:52, Jaonary Rabarisoa wrote: Hi all, I have the following case class that I want to use as a key in a key-value rdd. I defined the equals and hashCode methode but it's not working. What I'm doing wrong ? /case class PersonID(id: String) {/ / / / override def hashCode = id.hashCode/ / / / override def equals(other: Any) = other match {/ / / / case that: PersonID = this.id http://this.id == that.id http://that.id this.getClass == that.getClass/ / case _ = false/ / } / / } / / / / / / val p = sc.parallelize((1 until 10).map(x = (PersonID(1),x )))/ / / / / /p.groupByKey.collect foreach println/ / / /(PersonID(1),CompactBuffer(5))/ /(PersonID(1),CompactBuffer(6))/ /(PersonID(1),CompactBuffer(7))/ /(PersonID(1),CompactBuffer(8, 9))/ /(PersonID(1),CompactBuffer(1))/ /(PersonID(1),CompactBuffer(2))/ /(PersonID(1),CompactBuffer(3))/ /(PersonID(1),CompactBuffer(4))/ / / / / Best, Jao
Re: Exceptions not caught?
Can you show the stack trace ? Also, how do you catch exceptions ? Did you specify TProtocolException ? Cheers On Thu, Oct 23, 2014 at 3:40 PM, ankits ankitso...@gmail.com wrote: Hi, I'm running a spark job and encountering an exception related to thrift. I wanted to know where this is being thrown, but the stack trace is completely useless. So I started adding try catches, to the point where my whole main method that does everything is surrounded with a try catch. Even then, nothing is being caught. I still see this message though: 2014-10-23 15:39:50,845 ERROR [] Exception in task 1.0 in stage 1.0 (TID 1) java.io.IOException: org.apache.thrift.protocol.TProtocolException: . What is going on? Why isn't the exception just being handled by the try-catch? (BTW this is in Scala) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-not-caught-tp17157.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exceptions not caught?
I am simply catching all exceptions (like case e:Throwable = println(caught: +e) ) Here is the stack trace: 2014-10-23 15:51:10,766 ERROR [] Exception in task 1.0 in stage 1.0 (TID 1) java.io.IOException: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, env_type:PROD,) at com.A.thrift.Y.writeObject(Y.java:8489) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, env_type:PROD, ...) at com.A.thrift.Y.validate(Y:8428) at com.A.thrift.Y$YStandardScheme.write(Y.java:9359) at com.A.thrift.Y$FlatAdserverEventStandardScheme.write(Y.java:8509) at com.A.thrift.Y.write(Y.java:7646) at com.A.thrift.Y.writeObject(Y.java:8487) ... 27 more 2014-10-23 15:51:10,766 11234 ERROR [] Exception in task 0.0 in stage 1.0 (TID 0) java.io.IOException: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, ...) at com.A.thrift.YwriteObject(Y.java:8489) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) at
Re: Spark 1.1.0 and Hive 0.12.0 Compatibility Issue
Can you show the DDL for the table? It looks like the SerDe might be saying it will produce a decimal type but is actually producing a string. On Thu, Oct 23, 2014 at 3:17 PM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi My Spark is 1.1.0 and Hive is 0.12, I tried to run the same query in both Hive-0.12.0 then Spark-1.1.0, HiveQL works while SparkSQL failed. hive select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey where o_orderdate '1995-03-15' and l_shipdate '1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10; Ended Job = job_1414067367860_0011 MapReduce Jobs Launched: Job 0: Map: 1 Reduce: 1 Cumulative CPU: 2.0 sec HDFS Read: 261 HDFS Write: 96 SUCCESS Job 1: Map: 1 Reduce: 1 Cumulative CPU: 0.88 sec HDFS Read: 458 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 2 seconds 880 msec OK Time taken: 38.771 seconds scala sqlContext.sql(select l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority from customer c join orders o on c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey where o_orderdate '1995-03-15' and l_shipdate '1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10).collect().foreach(println); org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 5.0 failed 4 times, most recent failure: Lost task 14.3 in stage 5.0 (TID 568, m34): java.lang.ClassCastException: java.lang.String cannot be cast to scala.math.BigDecimal scala.math.Numeric$BigDecimalIsFractional$.minus(Numeric.scala:182) org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Subtract$$anonfun$eval$3.apply(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114) org.apache.spark.sql.catalyst.expressions.Subtract.eval(arithmetic.scala:64) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108) org.apache.spark.sql.catalyst.expressions.Multiply.eval(arithmetic.scala:70) org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:47) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108) org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58) org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69) org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
Re: Exceptions not caught?
Also everything is running locally on my box, driver and workers. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-not-caught-tp17157p17160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exceptions not caught?
bq. Required field 'X' is unset! Struct:Y Can you check your class Y and fix the above ? Cheers On Thu, Oct 23, 2014 at 3:55 PM, ankits ankitso...@gmail.com wrote: I am simply catching all exceptions (like case e:Throwable = println(caught: +e) ) Here is the stack trace: 2014-10-23 15:51:10,766 ERROR [] Exception in task 1.0 in stage 1.0 (TID 1) java.io.IOException: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, env_type:PROD,) at com.A.thrift.Y.writeObject(Y.java:8489) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, env_type:PROD, ...) at com.A.thrift.Y.validate(Y:8428) at com.A.thrift.Y$YStandardScheme.write(Y.java:9359) at com.A.thrift.Y$FlatAdserverEventStandardScheme.write(Y.java:8509) at com.A.thrift.Y.write(Y.java:7646) at com.A.thrift.Y.writeObject(Y.java:8487) ... 27 more 2014-10-23 15:51:10,766 11234 ERROR [] Exception in task 0.0 in stage 1.0 (TID 0) java.io.IOException: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, ...) at com.A.thrift.YwriteObject(Y.java:8489) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at
Re: Exceptions not caught?
Can you check your class Y and fix the above ? I can, but this is about catching the exception should it be thrown by any class in the spark job. Why is the exception not being caught? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-not-caught-tp17157p17163.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exceptions not caught?
On Thu, Oct 23, 2014 at 3:40 PM, ankits ankitso...@gmail.com wrote: 2014-10-23 15:39:50,845 ERROR [] Exception in task 1.0 in stage 1.0 (TID 1) java.io.IOException: org.apache.thrift.protocol.TProtocolException: This looks like an exception that's happening on an executor and just being reported in the driver's logs, so there's nothing to catch in the driver, which might explain why you're not catching anything. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: About Memory usage in the Spark UI
The memory usage of blocks of data received through Spark Streaming is not reflected in the Spark UI. It only shows the memory usage due to cached RDDs. I didnt find a JIRA for this, so I opened a new one. https://issues.apache.org/jira/browse/SPARK-4072 TD On Thu, Oct 23, 2014 at 12:47 AM, Haopu Wang hw...@qilinsoft.com wrote: Patrick, thanks for the response. May I ask more questions? I'm running a Spark Streaming application which receives data from socket and does some transformations. The event injection rate is too high so the processing duration is larger than batch interval. So I see Could not compute split, block input-0-1414049609200 not found issue as discussed by others in this post: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html#a11237 If the understanding is correct, Spark is lack of storage in this case because of event pile-up, so it needs to delete some splits in order to free memory. However, even in this case, I still see very small number (like 3MB) in the Memory Used column where the total memory seems to be quite big (like 6GB). So I think the number shown in this column may have problems. How do Spark calculate the total memory based on allocated JVM heap size? I guess it's related with the spark.storage.memoryFraction configuration, but want to know the details. And why the driver also uses memory to store RDD blocks? Thanks again for the answer! -- *From:* Patrick Wendell [mailto:pwend...@gmail.com] *Sent:* 2014年10月23日 14:00 *To:* Haopu Wang *Cc:* user *Subject:* Re: About Memory usage in the Spark UI It shows the amount of memory used to store RDD blocks, which are created when you run .cache()/.persist() on an RDD. On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, please take a look at the attached screen-shot. I wonders what's the Memory Used column mean. I give 2GB memory to the driver process and 12GB memory to the executor process. Thank you!
Spark using HDFS data [newb]
Hi, I would like to verify or correct my understanding of Spark at the conceptual level. As I understand, ignoring streaming mode for a minute, Spark takes some input data (which can be an hdfs file), lets your code transform the data, and ultimately dispatches some computation over the data across its cluster, thus being a distributed computing platform. It probably/typically splits the dataset for the computation across the cluster machines, so that each Spark machine participating in the Spark cluster performs the computation on its subset of the data. Is that the case? In case I nailed it, how then does it handle a distributed hdfs file? does it pull all of the file to/through one Spark server, and partition it from there across its cluster, or does it partition the hdfs file across its cluster without such a bottleneck - somehow intelligently letting each Spark server pull some of the data from HDFS, or, does it all rely on Spark being installed on each hdfs server and just using the hdfs file chunks of that server locally, without transporting any input hdfs data at all? Many thanks! Matan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-using-HDFS-data-newb-tp17169.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem packing spark-assembly jar
Hi folks, I'm trying to deploy the latest from master branch and having some trouble with the assembly jar. In the spark-1.1 official distribution(I use cdh version), I see the following jars, where spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar contains a ton of stuff: datanucleus-api-jdo-3.2.1.jar datanucleus-core-3.2.2.jar datanucleus-rdbms-3.2.1.jar spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar spark-examples-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar spark-hive-thriftserver_2.10-1.1.0.jar spark-hive_2.10-1.1.0.jar spark-sql_2.10-1.1.0.jar I tried to create a similar distribution off of master running mvn -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests -Pbigtop-dist package and ./make-distribution.sh -Pbigtop-dist -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests but in either case all I get in spark-assembly is near empty: spark_official/dist/lib$ jar -tvf spark-assembly-1.2.0-SNAPSHOT-hadoop2.0.0-mr1-cdh4.2.0.jar META-INF/ META-INF/MANIFEST.MF org/ org/apache/ org/apache/spark/ org/apache/spark/unused/ org/apache/spark/unused/UnusedStubClass.class META-INF/maven/ META-INF/maven/org.spark-project.spark/ META-INF/maven/org.spark-project.spark/unused/ META-INF/maven/org.spark-project.spark/unused/pom.xml META-INF/maven/org.spark-project.spark/unused/pom.properties META-INF/NOTICE Any advice on how to get spark-core and the rest packaged into the assembly jar -- I'd like to have fewer things to copy around.
RE: About Memory usage in the Spark UI
TD, thanks for the clarification. From the UI, it looks like the driver also allocate memory to store blocks, what's the purpose for that because I think driver doesn't need to run tasks? From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: 2014年10月24日 8:07 To: Haopu Wang Cc: Patrick Wendell; user Subject: Re: About Memory usage in the Spark UI The memory usage of blocks of data received through Spark Streaming is not reflected in the Spark UI. It only shows the memory usage due to cached RDDs. I didnt find a JIRA for this, so I opened a new one. https://issues.apache.org/jira/browse/SPARK-4072 TD On Thu, Oct 23, 2014 at 12:47 AM, Haopu Wang hw...@qilinsoft.com wrote: Patrick, thanks for the response. May I ask more questions? I'm running a Spark Streaming application which receives data from socket and does some transformations. The event injection rate is too high so the processing duration is larger than batch interval. So I see Could not compute split, block input-0-1414049609200 not found issue as discussed by others in this post: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html#a11237; If the understanding is correct, Spark is lack of storage in this case because of event pile-up, so it needs to delete some splits in order to free memory. However, even in this case, I still see very small number (like 3MB) in the Memory Used column where the total memory seems to be quite big (like 6GB). So I think the number shown in this column may have problems. How do Spark calculate the total memory based on allocated JVM heap size? I guess it's related with the spark.storage.memoryFraction configuration, but want to know the details. And why the driver also uses memory to store RDD blocks? Thanks again for the answer! From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: 2014年10月23日 14:00 To: Haopu Wang Cc: user Subject: Re: About Memory usage in the Spark UI It shows the amount of memory used to store RDD blocks, which are created when you run .cache()/.persist() on an RDD. On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, please take a look at the attached screen-shot. I wonders what's the Memory Used column mean. I give 2GB memory to the driver process and 12GB memory to the executor process. Thank you!
Re: unable to make a custom class as a key in a pairrdd
Are you doing this in REPL ? Then there is a bug filed for this, I just can't recall the bug ID at the moment. Prashant Sharma On Fri, Oct 24, 2014 at 4:07 AM, Niklas Wilcke 1wil...@informatik.uni-hamburg.de wrote: Hi Jao, I don't really know why this doesn't work but I have two hints. You don't need to override hashCode and equals. The modifier case is doing that for you. Writing case class PersonID(id: String) would be enough to get the class you want I think. If I change the type of the id param to Int it works for me but I don't know why. case class PersonID(id: Int) Looks like a strange behavior to me. Have a try. Good luck, Niklas On 23.10.2014 21:52, Jaonary Rabarisoa wrote: Hi all, I have the following case class that I want to use as a key in a key-value rdd. I defined the equals and hashCode methode but it's not working. What I'm doing wrong ? *case class PersonID(id: String) {* * override def hashCode = id.hashCode* * override def equals(other: Any) = other match {* * case that: PersonID = this.id http://this.id == that.id http://that.id this.getClass == that.getClass* * case _ = false* * } * * } * * val p = sc.parallelize((1 until 10).map(x = (PersonID(1),x )))* *p.groupByKey.collect foreach println* *(PersonID(1),CompactBuffer(5))* *(PersonID(1),CompactBuffer(6))* *(PersonID(1),CompactBuffer(7))* *(PersonID(1),CompactBuffer(8, 9))* *(PersonID(1),CompactBuffer(1))* *(PersonID(1),CompactBuffer(2))* *(PersonID(1),CompactBuffer(3))* *(PersonID(1),CompactBuffer(4))* Best, Jao
Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.
Hi all, (Waking up an old thread just for future reference) We've had a very similar issue just a couple of days ago: executing a spark driver on the same host as where the mesos master runs succeeds, but executing it on our remote dev station hangs fails after mesos report the spark driver disconnects immediately. In our case, just setting the LIBPROCESS_IP variable as described below resolved the issue. https://github.com/airbnb/chronos/issues/193 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Local-Dev-Env-with-Mesos-Spark-Streaming-on-Docker-Can-t-submit-jobs-tp5397p17174.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.0.0 on yarn cluster problem
Hi, I am facing same problem. My spark-env.sh has below entries yet I see the yarn container with only 1G and yarn only spawns two workers. SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=3G SPARK_EXECUTOR_INSTANCES=5 Please let me know if you are able to resolve this issue. Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-on-yarn-cluster-problem-tp7560p17175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
large benchmark sets for MLlib experiments
Hi MLlib users, In August when I gave a talk at Databricks, Xiangrui mentioned the need of large public data for the development of MLlib. At this moment many use problems in libsvm data sets for experiments. The file size of larger ones (e.g., kddb) is about 20-30G. To fullfill the need, we have provided a much larger data set splice_site at libsvm data sets (http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/). The training file is around 600G, while the test file is 300G. The full set of the same problem (3TB) was used in A Reliable Effective Terascale Linear Learning System by Agarwal et al. The original set is from COFFIN : A Computational Framework for Linear SVMs by Sonnenburg and Franc. Please note that this problem is highly unbalanced, so accuracy is NOT a suitable criterion. You may use auPRC (area under precision-recall curve). We thank the data providers, Olivier Chapelle for providing a script for data generation, and my students for their help. We will keep adding more large sets in the future. Enjoy, Chih-Jen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Memory requirement of using Spark
Hello, I am new to Spark. I have a basic question about the memory requirement of using Spark. I need to join multiple data sources between multiple data sets. The join is not a straightforward join. The logic is more like: first join T1 on column A with T2, then for all the records that couldn't find the match in the Join, join T1 on column B with T2, and then join on C and son on. I was using HIVE, but it requires multiple scans on T1, which turns out slow. It seems like if I load T1 and T2 in memory using Spark, I could improve the performance. However, T1 and T2 totally is around 800G. Does that mean I need to have 800G memory (I don't have that amount of memory)? Or Spark could do something like streaming but then again will the performance sacrifice as a result? Thanks JT -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-requirement-of-using-Spark-tp17177.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.0.0 on yarn cluster problem
Did you `export` the environment variables? Also, are you running in client mode or cluster mode? If it still doesn't work you can try to set these through the spark-submit command lines --num-executors, --executor-cores, and --executor-memory. 2014-10-23 19:25 GMT-07:00 firemonk9 dhiraj.peech...@gmail.com: Hi, I am facing same problem. My spark-env.sh has below entries yet I see the yarn container with only 1G and yarn only spawns two workers. SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=3G SPARK_EXECUTOR_INSTANCES=5 Please let me know if you are able to resolve this issue. Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-on-yarn-cluster-problem-tp7560p17175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multitenancy in Spark - within/across spark context
Ashwin, I would say the strategies in general are: 1) Have each user submit separate Spark app (each its own Spark Context), with its own resource settings, and share data through HDFS or something like Tachyon for speed. 2) Share a single spark context amongst multiple users, using fair scheduler. This is sort of like having a Hadoop resource pool.It has some obvious HA/SPOF issues, namely that if the context dies then every user using it is also dead. Also, sharing RDDs in cached memory has the same resiliency problems, namely that if any executor dies then Spark must recompute / rebuild the RDD (it tries to only rebuild the missing part, but sometimes it must rebuild everything). Job server can help with 1 or 2, 2 in particular. If you have any questions about job server, feel free to ask at the spark-jobserver google group. I am the maintainer. -Evan On Thu, Oct 23, 2014 at 1:06 PM, Marcelo Vanzin van...@cloudera.com wrote: You may want to take a look at https://issues.apache.org/jira/browse/SPARK-3174. On Thu, Oct 23, 2014 at 2:56 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Upvote for the multitanency requirement. I'm also building a data analytic platform and there'll be multiple users running queries and computations simultaneously. One of the paint point is control of resource size. Users don't really know how much nodes they need, they always use as much as possible... The result is lots of wasted resource in our Yarn cluster. A way to 1) allow multiple spark context to share the same resource or 2) add dynamic resource management for Yarn mode is very much wanted. Jianshi On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: That's not something you might want to do usually. In general, a SparkContext maps to a user application My question was basically this. In this page in the official doc, under Scheduling within an application section, it talks about multiuser and fair sharing within an app. How does multiuser within an application work(how users connect to an app,run their stuff) ? When would I want to use this ? I see. The way I read that page is that Spark supports all those scheduling options; but Spark doesn't give you the means to actually be able to submit jobs from different users to a running SparkContext hosted on a different process. For that, you'll need something like the job server that I referenced before, or write your own framework for supporting that. Personally, I'd use the information on that page when dealing with concurrent jobs in the same SparkContext, but still restricted to the same user. I'd avoid trying to create any application where a single SparkContext is trying to be shared by multiple users in any way. As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. I basically wanted to find out if there were any gotchas related to preemption on Spark. Things like say half of an application's executors got preempted say while doing reduceByKey, will the application progress with the remaining resources/fair share ? Jobs should still make progress as long as at least one executor is available. The gotcha would be the one I mentioned, where Spark will fail your job after x executors failed, which might be a common occurrence when preemption is enabled. That being said, it's a configurable option, so you can set x to a very large value and your job should keep on chugging along. The options you'd want to take a look at are: spark.task.maxFailures and spark.yarn.max.executor.failures -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org