Re: Microsoft SQL jdbc support from spark sql
At this time, the JDBC Data source is not extensible so it cannot support SQL Server. There was some thoughts - credit to Cheng Lian for this - about making the JDBC data source extensible for third party support possibly via slick. On Mon, Apr 6, 2015 at 10:41 PM bipin bipin@gmail.com wrote: Hi, I am trying to pull data from ms-sql server. I have tried using the spark.sql.jdbc CREATE TEMPORARY TABLE c USING org.apache.spark.sql.jdbc OPTIONS ( url jdbc:sqlserver://10.1.0.12:1433\;databaseName=dbname\;, dbtable Customer ); But it shows java.sql.SQLException: No suitable driver found for jdbc:sqlserver I have jdbc drivers for mssql but i am not sure how to use them I provide the jars to the sql shell and then tried the following: CREATE TEMPORARY TABLE c USING com.microsoft.sqlserver.jdbc.SQLServerDriver OPTIONS ( url jdbc:sqlserver://10.1.0.12:1433\;databaseName=dbname\;, dbtable Customer ); But this gives ERROR CliDriver: scala.MatchError: SQLServerDriver:4 (of class com.microsoft.sqlserver.jdbc.SQLServerDriver) Can anyone tell what is the proper way to connect to ms-sql server. Thanks -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark- sql-tp22399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Regarding benefits of using more than one cpu for a task in spark
Hi, In spark, there are two settings regarding number of cores, one is at task level :spark.task.cpus and there is another one, which drives number of cores per executors: spark.executor.cores Apart from using more than one core for a task which has to call some other external API etc, is there any other use case / benefit of assigning more than one core to a task? As per the code, I can only see this being used while scheduling etc , as such RDD partitions etc remains untouched from this setting. Does this mean that coder needs to take care of coding the application logic to take care of this setting? ( which again let me think over this setting ). Comments please. Thanks, Twinkle
Re: Spark Application Stages and DAG
My Spark streaming application processes the data received in each interval. In Spark Stages UI, all the stages are pointed to single line of code* windowDStream.foreachRDD* only (not the actions inside the DStream) - Following is the information from Spark Stages UI page: Stage IdDescription Submitted Duration Tasks: Succeeded/TotalInput OutputShuffle ReadShuffle Write 2foreachRDD at Parser.scala:58 +details 06-04-2015 16:21 19 min3125/3125 (43 failed) 154.4 MB23.9 MB 1foreachRDD at Parser.scala:58 +details 06-04-2015 16:19 2.3 min3125/3125 149.7 MB 0foreachRDD at Parser.scala:58 +details 06-04-2015 16:16 3.0 min3125/3125 149.7 MB - Following is the code snippet at Parser.scala:58: val windowDStream = ssc.fileStream[LongWritable, Text, CustomInputFormat](args(0), (x : Path) = true, false) *windowDStream.foreachRDD *{ IncomingFiles = println(Interval data processing +Calendar.getInstance().getTime()); if (IncomingFiles.count() == 0) { println(No files received in this interval) } else { println(IncomingFiles.count()+ files received in this interval); //convert each xml text to RDD[Elem] val inputRDD = IncomingFiles.map(eachXML = { MyXML.loadString(eachXML._2.toString().trim().replaceFirst(^([\\W]+), )) }); //Create a schema RDD for querying the data val MySchemaRDD = inputRDD.map(x = { Bd((x \\ Oied \\ oeuo).text, List(placeholder1, placeholder2, placeholder3)) //Bd is a case class - case class Bd(oeuo : String, mi : List[String]) }) // Save the file for debuging MySchemaRDD.saveAsTextFile(/home/spark/output/result.txt) //Spark SQL processing starts from here MySchemaRDD.registerTempTable(MySchemaTable) //Todo processing with Sparl-SQL MySchemaRDD.printSchema() println(end of processing); } } Spark UI Details for Stage 2 http://pastebin.com/c2QYeSJj I have tested this with 150 MB of input data. All the Spark memory options as default and with executor Memory 512.0 MB. - Is it possible to see the stages information within the *windowDStream* operation (which action inside the Dstream processing)? - During Stage 2 executor had restarted many times due to OutOfMemoryError. is this an expected behavior? (Please find the stage 2 details) Regards Vijay On 3 April 2015 at 13:21, Tathagata Das t...@databricks.com wrote: What he meant is that look it up in the Spark UI, specifically in the Stage tab to see what is taking so long. And yes code snippet helps us debug. TD On Fri, Apr 3, 2015 at 12:47 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You need open the Stage\'s page which is taking time, and see how long its spending on GC etc. Also it will be good to post that Stage and its previous transformation's code snippet to make us understand it better. Thanks Best Regards On Fri, Apr 3, 2015 at 1:05 PM, Vijay Innamuri vijay.innam...@gmail.com wrote: When I run the Spark application (streaming) in local mode I could see the execution progress as below.. [Stage 0: (1817 + 1) / 3125] [Stage 2:=== (740 + 1) / 3125] One of the stages is taking long time for execution. How to find the transformations/ actions associated with a particular stage? Is there anyway to find the execution DAG of a Spark Application? Regards Vijay
Re: task not serialize
Lets say I follow below approach and I got RddPair with huge size .. which can not fit into one machine ... what to run foreach on this RDD? On 7 April 2015 at 04:25, Jeetendra Gangele gangele...@gmail.com wrote: On 7 April 2015 at 04:03, Dean Wampler deanwamp...@gmail.com wrote: On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks a lot.That means Spark does not support the nested RDD? if I pass the javaSparkContext that also wont work. I mean passing SparkContext not possible since its not serializable That's right. RDD don't nest and SparkContexts aren't serializable. i have a requirement where I will get JavaRDDVendorRecord matchRdd and I need to return the postential matches for this record from Hbase. so for each field of VendorRecord I have to do following 1. query Hbase to get the list of potential record in RDD 2. run logistic regression on RDD return from steps 1 and each element of the passed matchRdd. If I understand you correctly, each VectorRecord could correspond to 0-to-N records in HBase, which you need to fetch, true? yes thats correct each Vendorrecord corresponds to 0 to N matches If so, you could use the RDD flatMap method, which takes a function a that accepts each record, then returns a sequence of 0-to-N new records of some other type, like your HBase records. However, running an HBase query for each VendorRecord could be expensive. If you can turn this into a range query or something like that, it would help. I haven't used HBase much, so I don't have good advice on optimizing this, if necessary. Alternatively, can you do some sort of join on the VendorRecord RDD and an RDD of query results from HBase? Join will give too big result RDD of query result is returning around 1 for each record and i have 2 millions to process so it will be huge to have this. 2 m*1 big number For #2, it sounds like you need flatMap to return records that combine the input VendorRecords and fields pulled from HBase. Whatever you can do to make this work like table scans and joins will probably be most efficient. dean On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote: The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); } -
Re: Microsoft SQL jdbc support from spark sql
Thanks for the information. Hopefully this will happen in near future. For now my best bet would be to export data and import it in spark sql. On 7 April 2015 at 11:28, Denny Lee denny.g@gmail.com wrote: At this time, the JDBC Data source is not extensible so it cannot support SQL Server. There was some thoughts - credit to Cheng Lian for this - about making the JDBC data source extensible for third party support possibly via slick. On Mon, Apr 6, 2015 at 10:41 PM bipin bipin@gmail.com wrote: Hi, I am trying to pull data from ms-sql server. I have tried using the spark.sql.jdbc CREATE TEMPORARY TABLE c USING org.apache.spark.sql.jdbc OPTIONS ( url jdbc:sqlserver://10.1.0.12:1433\;databaseName=dbname\;, dbtable Customer ); But it shows java.sql.SQLException: No suitable driver found for jdbc:sqlserver I have jdbc drivers for mssql but i am not sure how to use them I provide the jars to the sql shell and then tried the following: CREATE TEMPORARY TABLE c USING com.microsoft.sqlserver.jdbc.SQLServerDriver OPTIONS ( url jdbc:sqlserver://10.1.0.12:1433\;databaseName=dbname\;, dbtable Customer ); But this gives ERROR CliDriver: scala.MatchError: SQLServerDriver:4 (of class com.microsoft.sqlserver.jdbc.SQLServerDriver) Can anyone tell what is the proper way to connect to ms-sql server. Thanks -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark- sql-tp22399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark 1.2.0 with Play/Activator
Thanks for the information Andy. I will go through the versions mentioned in Dependencies.scala to identify the compatibility. Regards, Manish From: andy petrella [mailto:andy.petre...@gmail.com] Sent: Tuesday, April 07, 2015 11:04 AM To: Manish Gupta 8; user@spark.apache.org Subject: Re: Spark 1.2.0 with Play/Activator Hello Manish, you can take a look at the spark-notebook build, it's a bit tricky to get rid of some clashes but at least you can refer to this build to have ideas. LSS, I have stripped out akka from play deps. ref: https://github.com/andypetrella/spark-notebook/blob/master/build.sbt https://github.com/andypetrella/spark-notebook/blob/master/project/Dependencies.scala https://github.com/andypetrella/spark-notebook/blob/master/project/Shared.scala HTH, cheers andy Le mar 7 avr. 2015 07:26, Manish Gupta 8 mgupt...@sapient.commailto:mgupt...@sapient.com a écrit : Hi, We are trying to build a Play framework based web application integrated with Apache Spark. We are running Apache Spark 1.2.0 CDH 5.3.0. But struggling with akka version conflicts (errors like java.lang.NoSuchMethodError in akka). We have tried Play 2.2.6 as well as Activator 1.3.2. If anyone has successfully integrated Spark 1.2.0 with Play/Activator, please share the version we should use and akka dependencies we should add in Build.sbt. Thanks, Manish
RE: Spark 1.2.0 with Play/Activator
If I try to build spark-notebook with spark.version=1.2.0-cdh5.3.0, sbt throw these warnings before failing to compile: :: org.apache.spark#spark-yarn_2.10;1.2.0-cdh5.3.0: not found :: org.apache.spark#spark-repl_2.10;1.2.0-cdh5.3.0: not found Any suggestions? Thanks From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Tuesday, April 07, 2015 12:04 PM To: andy petrella; user@spark.apache.org Subject: RE: Spark 1.2.0 with Play/Activator Thanks for the information Andy. I will go through the versions mentioned in Dependencies.scala to identify the compatibility. Regards, Manish From: andy petrella [mailto:andy.petre...@gmail.com] Sent: Tuesday, April 07, 2015 11:04 AM To: Manish Gupta 8; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark 1.2.0 with Play/Activator Hello Manish, you can take a look at the spark-notebook build, it's a bit tricky to get rid of some clashes but at least you can refer to this build to have ideas. LSS, I have stripped out akka from play deps. ref: https://github.com/andypetrella/spark-notebook/blob/master/build.sbt https://github.com/andypetrella/spark-notebook/blob/master/project/Dependencies.scala https://github.com/andypetrella/spark-notebook/blob/master/project/Shared.scala HTH, cheers andy Le mar 7 avr. 2015 07:26, Manish Gupta 8 mgupt...@sapient.commailto:mgupt...@sapient.com a écrit : Hi, We are trying to build a Play framework based web application integrated with Apache Spark. We are running Apache Spark 1.2.0 CDH 5.3.0. But struggling with akka version conflicts (errors like java.lang.NoSuchMethodError in akka). We have tried Play 2.2.6 as well as Activator 1.3.2. If anyone has successfully integrated Spark 1.2.0 with Play/Activator, please share the version we should use and akka dependencies we should add in Build.sbt. Thanks, Manish
Re: Spark 1.2.0 with Play/Activator
Mmmh, you want it running a spark 1.2 with hadoop 2.5.0-cdh5.3.2 right? If I'm not wrong you might have to launch it like so: ``` sbt -Dspark.version=1.2.0 -Dhadoop.version=2.5.0-cdh5.3.2 ``` Or you can download it from http://spark-notebook.io if you want. HTH andy On Tue, Apr 7, 2015 at 9:06 AM Manish Gupta 8 mgupt...@sapient.com wrote: If I try to build spark-notebook with spark.version=1.2.0-cdh5.3.0, sbt throw these warnings before failing to compile: :: org.apache.spark#spark-yarn_2.10;1.2.0-cdh5.3.0: not found :: org.apache.spark#spark-repl_2.10;1.2.0-cdh5.3.0: not found Any suggestions? Thanks *From:* Manish Gupta 8 [mailto:mgupt...@sapient.com] *Sent:* Tuesday, April 07, 2015 12:04 PM *To:* andy petrella; user@spark.apache.org *Subject:* RE: Spark 1.2.0 with Play/Activator Thanks for the information Andy. I will go through the versions mentioned in Dependencies.scala to identify the compatibility. Regards, Manish *From:* andy petrella [mailto:andy.petre...@gmail.com andy.petre...@gmail.com] *Sent:* Tuesday, April 07, 2015 11:04 AM *To:* Manish Gupta 8; user@spark.apache.org *Subject:* Re: Spark 1.2.0 with Play/Activator Hello Manish, you can take a look at the spark-notebook build, it's a bit tricky to get rid of some clashes but at least you can refer to this build to have ideas. LSS, I have stripped out akka from play deps. ref: https://github.com/andypetrella/spark-notebook/blob/master/build.sbt https://github.com/andypetrella/spark-notebook/blob/master/project/Dependencies.scala https://github.com/andypetrella/spark-notebook/blob/master/project/Shared.scala HTH, cheers andy Le mar 7 avr. 2015 07:26, Manish Gupta 8 mgupt...@sapient.com a écrit : Hi, We are trying to build a Play framework based web application integrated with Apache Spark. We are running *Apache Spark 1.2.0 CDH 5.3.0*. But struggling with akka version conflicts (errors like java.lang.NoSuchMethodError in akka). We have tried Play 2.2.6 as well as Activator 1.3.2. If anyone has successfully integrated Spark 1.2.0 with Play/Activator, please share the version we should use and akka dependencies we should add in Build.sbt. Thanks, Manish
Re: Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs
Hi, One of the rational behind killing the app can be to avoid skewness in data. I have created this issue (https://issues.apache.org/jira/browse/SPARK-6735) to provide options for disabling this behaviour, as well as making the number of executor's failure to be relative with respect to a window duration. I will upload the PR shortly. Thanks, Twinkle On Tue, Apr 7, 2015 at 2:02 AM, Sandy Ryza sandy.r...@cloudera.com wrote: What's the advantage of killing an application for lack of resources? I think the rationale behind killing an app based on executor failures is that, if we see a lot of them in a short span of time, it means there's probably something going wrong in the app or on the cluster. On Wed, Apr 1, 2015 at 7:08 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Thanks Sandy. Another way to look at this is that would we like to have our long running application to die? So let's say, we create a window of around 10 batches, and we are using incremental kind of operations inside our application, as restart here is a relatively more costlier, so should it be the maximum number of executor failure's kind of criteria to fail the application or should we have some parameters around minimum number of executor's availability for some x time? So, if the application is not able to have minimum n number of executors within x period of time, then we should fail the application. Adding time factor here, will allow some window for spark to get more executors allocated if some of them fails. Thoughts please. Thanks, Twinkle On Wed, Apr 1, 2015 at 10:19 PM, Sandy Ryza sandy.r...@cloudera.com wrote: That's a good question, Twinkle. One solution could be to allow a maximum number of failures within any given time span. E.g. a max failures per hour property. -Sandy On Tue, Mar 31, 2015 at 11:52 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In spark over YARN, there is a property spark.yarn.max.executor.failures which controls the maximum number of executor's failure an application will survive. If number of executor's failures ( due to any reason like OOM or machine failure etc ), exceeds this value then applications quits. For small duration spark job, this looks fine, but for the long running jobs as this does not take into account the duration, this can lead to same treatment for two different scenarios ( mentioned below) : 1. executors failing with in 5 mins. 2. executors failing sparsely, but at some point even a single executor failure ( which application could have survived ) can make the application quit. Sending it to the community to listen what kind of behaviour / strategy they think will be suitable for long running spark jobs or spark streaming jobs. Thanks and Regards, Twinkle
[DAGSchedule][OutputCommitCoordinator] OutputCommitCoordinator.authorizedCommittersByStage Map Out Of Memory
Hi all: I am using spark streaming(1.3.1) as a long time running service and out of memory after running for 7 days. I found that the field *authorizedCommittersByStage* in *OutputCommitCoordinator* class cause the OOM. authorizedCommittersByStage is a map, key is StageId, value is Map[PartitionId, TaskAttemptId]. The OutputCommitCoordinator class has a method stageEnd which will remove stageId from authorizedCommittersByStage. *But the method stageEnd is never called by DAGSchedule*. And it cause the authorizedCommittersByStage's stage info never be cleaned, which cause OOM. It happens in my spark streaming program(1.3.1), I am not sure if it will appear in other spark components and other spark version. JIRA Links: https://issues.apache.org/jira/browse/SPARK-6737
Can not get executor's Log from Spark's History Server
Hi, Experts I run my Spark Cluster on Yarn. I used to get executors' Logs from Spark's History Server. But after I started my Hadoop jobhistory server and made configuration to aggregate logs of hadoop jobs to a HDFS directory, I found that I could not get spark's executors' Logs any more. Is there any solution so that I could get logs of my spark jobs from Spark History Server and get logs of my map-reduce jobs from Hadoop History Server? Many Thanks! Following is the configuration I made in Hadoop yarn-site.xml yarn.log-aggregation-enable=true yarn.nodemanager.remote-app-log-dir=/mr-history/agg-logs yarn.log-aggregation.retain-seconds=259200 yarn.log-aggregation.retain-check-interval-seconds=-1
Re: Processing Large Images in Spark?
On 6 Apr 2015, at 23:05, Patrick Young patrick.mckendree.yo...@gmail.commailto:patrick.mckendree.yo...@gmail.com wrote: does anyone have any thoughts on storing a really large raster in HDFS? Seems like if I just dump the image into HDFS as it, it'll get stored in blocks all across the system and when I go to read it, there will be a ton of network traffic from all the blocks to the reading node! It get splt into blocks scattered (at default 3x replication) to: 1 on current host, 2 elsewhere. I'd recommend you look @ Russell Perry's @ HPLabs's 2009 paper, High Speed Raster Image Streaming For Digital Presses Using the Hadoop File System, which was about using HDFS/MapReduce to render images, rather than analyse them. Similar to things like tile generation of google/open-street/apple maps: http://www.hpl.hp.com/techreports/2009/HPL-2009-345.pdf Russ modified the HDFS client so that rather than have it pick a block for the app to read from, the app got to make the decision itself. Then running code on the server hooked straight up to the line rate printing press, fetched data from different racks so as to get max bandwidth out of each host, and of each rack switch; 4 Gb/s overall. I don' think that patch was ever contributed back —or at least got in.
Re: Spark streaming with Kafka- couldnt find KafkaUtils
Or you could build an uber jar ( you could google that ) https://eradiating.wordpress.com/2015/02/15/getting-spark-streaming-on-kafka-to-work/ --- Original Message --- From: Akhil Das ak...@sigmoidanalytics.com Sent: April 4, 2015 11:52 PM To: Priya Ch learnings.chitt...@gmail.com Cc: user@spark.apache.org, dev d...@spark.apache.org Subject: Re: Spark streaming with Kafka- couldnt find KafkaUtils How are you submitting the application? Use a standard build tool like maven or sbt to build your project, it will download all the dependency jars, when you submit your application (if you are using spark-submit, then use --jars option to add those jars which are causing classNotFoundException). If you are running as a standalone application without using spark-submit, then while creating the SparkContext, use sc.addJar() to add those dependency jars. For Kafka streaming, when you use sbt, these will be jars that are required: sc.addJar(/root/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar) sc.addJar(/root/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar) sc.addJar(/root/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar) sc.addJar(/root/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar) Thanks Best Regards On Sun, Apr 5, 2015 at 12:00 PM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, I configured Kafka cluster on a single node and I have streaming application which reads data from kafka topic using KafkaUtils. When I execute the code in local mode from the IDE, the application runs fine. But when I submit the same to spark cluster in standalone mode, I end up with the following exception: java.lang.ClassNotFoundException: org/apache/spark/streaming/kafka/KafkaUtils. I am using spark-1.2.1 version. when i checked the source files of streaming, the source files related to kafka are missing. Are these not included in spark-1.3.0 and spark-1.2.1 versions ? Have to manually include these ?? Regards, Padma Ch - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[GraphX] aggregateMessages with active set
Hello, The old api of GraphX mapReduceTriplets has an optional parameter activeSetOpt: Option[(VertexRDD[_] that limit the input of sendMessage. However, to the new api aggregateMessages I could not find this option, why it does not offer any more? Alcaid
Re: Processing Large Images in Spark?
Heya, You might be interesting at looking at GeoTrellis They use RDDs of Tiles to process big images like Landsat ones can be (specially 8). However, I see you have only 1G per file, so I guess you only care of a single band? Or is it a reboxed pic? Note: I think the GeoTrellis image format is still single band, although it's highly optimized for distributed geoprocessing my2¢ andy On Tue, Apr 7, 2015 at 12:06 AM Patrick Young patrick.mckendree.yo...@gmail.com wrote: Hi all, I'm new to Spark and wondering if it's appropriate to use for some image processing tasks on pretty sizable (~1 GB) images. Here is an example use case. Amazon recently put the entire Landsat8 archive in S3: http://aws.amazon.com/public-data-sets/landsat/ I have a bunch of GDAL based (a C library for geospatial raster I/O) Python scripts that take a collection of Landsat images and mash them into a single mosaic. This works great for little mosaics, but if I wanted to do the entire world, I need more horse power! The scripts do the following: 1. Copy the selected rasters down from S3 to the local file system 2. Read each image into memory as numpy arrays (a big 3D array), do some image processing using various Python libs, and write the result out to the local file system 3. Blast all the processed imagery back to S3, and hooks up MapServer for viewing Step 2 takes a long time; this is what I'd like to leverage Spark for. Each image, if you stack all the bands together, can be ~1 GB in size. So here are a couple of questions: 1. If I have a large image/array, what's a good way of getting it into an RDD? I've seen some stuff about folks tiling up imagery into little chunks and storing it in HBase. I imagine I would want an image chunk in each partition of the RDD. If I wanted to apply something like a gaussian filter I'd need each chunk to to overlap a bit. 2. In a similar vain, does anyone have any thoughts on storing a really large raster in HDFS? Seems like if I just dump the image into HDFS as it, it'll get stored in blocks all across the system and when I go to read it, there will be a ton of network traffic from all the blocks to the reading node! 3. How is the numpy's ndarray support in Spark? For instance, if I do a map on my theoretical chunked image RDD, can I easily realize the image chunk as a numpy array inside the function? Most of the Python algorithms I use take in and return a numpy array. I saw some discussion in the past on image processing: These threads talk about processing lots of little images, but this isn't really my situation as I've got one very large image: http://apache-spark-user-list.1001560.n3.nabble.com/Better-way-to-process-large-image-data-set-td14533.html http://apache-spark-user-list.1001560.n3.nabble.com/Processing-audio-video-images-td6752.html Further, I'd like to have the imagery in HDFS rather than on the file system to avoid I/O bottlenecks if possible! Thanks for any ideas and advice! -Patrick
'Java heap space' error occured when query 4G data file from HDFS
In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :*spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.py* exception rised. *spark-defaults.conf* spark.master spark://cloud1:7077 spark.default.parallelism 100 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 5g spark.driver.maxResultSize 6g spark.kryoserializer.buffer.mb 256 spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4g spark.rdd.compress true spark.storage.memoryFraction 0 spark.akka.frameSize 50 spark.shuffle.compress true spark.shuffle.spill.compress false spark.local.dir /home/hadoop/tmp * spark-evn.sh* export SCALA=/home/hadoop/softsetup/scala export JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71 export SPARK_WORKER_CORES=1 export SPARK_WORKER_MEMORY=4g export HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=4g export SPARK_DRIVER_MEMORY=4g *Exception:* 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on
Issue with pyspark 1.3.0, sql package and rows
Hi all, I've already opened a bug on Jira some days ago [1] but I'm starting thinking this is not the correct way to go since I haven't got any news about it yet. Let me try to explain it briefly: with pyspark, trying to cogroup two input files with different schemas lead (nondeterministically) to some wrong behaviour: the object coming from the first input will have the fields of the second one (or vice-versa); the important fact is that the data in the row is actually correct, what's wrong is the content of the __FIELDS__ on the rows. Attached to the issue I posted a small snippet to reproduce the issue (which is a gist [2]). Does this happen to others as well? Is it a known issue? Am I doing anything wrong? Thank you all, [1]: https://issues.apache.org/jira/browse/SPARK-6677 [2]: https://gist.github.com/armisael/e08bb4567d0a11efe2db -- Dott. Stefano Parmesan Backend Web Developer and Data Lover ~ SpazioDati s.r.l. Via Adriano Olivetti, 13 – 4th floor Le Albere district – 38122 Trento – Italy
scala.MatchError: class org.apache.avro.Schema (of class java.lang.Class)
Using spark(1.2) streaming to read avro schema based topics flowing in kafka and then using spark sql context to register data as temp table. Avro maven plugin(1.7.7 version) generates the java bean class for the avro file but includes a field named SCHEMA$ of type org.apache.avro.Schema which is not supported in the JavaSQLContext class[Method : applySchema]. How to auto generate java bean class for the avro file and over come the above mentioned problem. Thanks. - Thanks, Yamini -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-MatchError-class-org-apache-avro-Schema-of-class-java-lang-Class-tp22402.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Difference between textFile Vs hadoopFile (textInoutFormat) on HDFS data
Hi , Is there any difference between Difference between textFile Vs hadoopFile (textInoutFormat) when data is present in HDFS? Will there be any performance gain that can be observed? Puneet Kumar Ojha Data Architect | PubMatichttp://www.pubmatic.com/
Pipelines for controlling workflow
Hi, I am building a pipeline and I've read most that I can find on the topic (spark.ml library and the AMPcamp version of pipelines: http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html). I do not have structured data as in the case of the new Spark.ml library which uses SchemaRDD/DataFrames so the second alternative seems the most convenient to me. I'm writing in Scala. The problem I have is that I want to build a pipeline that can be able to be branched in (at least) two ways; 1. One of my steps outputs an Either-type (where the output is either an object containing statistics to why this step/data failed or contain the expected output). So I would like to branch the pipeline to either skip the rest of the pipeline and continue in a reporting-step (write a report with the help of the statistics object) or that the pipeline is continued to the next step in the pipeline. In the generic case this could of course be two independent pipelines (like a first pipeline-node that takes multiple datatypes and passes the input to the correct pipeline in the following step). 2. The other way I would like to branch the pipeline is to send the same data to multiple new pipeline-nodes. These nodes are not dependent on each other so they should just branch of. In the generic case this could be two new pipelines themselves. Has anyone tried this or have a nice idea of how this could be performed? I like the simplicity in the AMPcamp-pipeline which relies on type-safety, but I'm confused about how to create a branching pipeline using only type-declarations. Thanks, Staffan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pipelines-for-controlling-workflow-tp22403.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Difference between textFile Vs hadoopFile (textInoutFormat) on HDFS data
There is no difference - textFile calls hadoopFile with a TextInputFormat, and maps each value to a String. — Sent from Mailbox On Tue, Apr 7, 2015 at 1:46 PM, Puneet Kumar Ojha puneet.ku...@pubmatic.com wrote: Hi , Is there any difference between Difference between textFile Vs hadoopFile (textInoutFormat) when data is present in HDFS? Will there be any performance gain that can be observed? Puneet Kumar Ojha Data Architect | PubMatichttp://www.pubmatic.com/
Does spark utilize the sorted order of hbase keys, when using hbase as data source
Hello, guys! I am a newbie to Spark and would appreciate any advice or help. Here is the detailed question: http://stackoverflow.com/questions/29493472/does-spark-utilize-the-sorted-order-of-hbase-keys-when-using-hbase-as-data-sour Regards, Yury
Re: Does spark utilize the sorted order of hbase keys, when using hbase as data source
There are 500 millions distinct users... 2015-04-07 17:45 GMT+03:00 Ted Yu yuzhih...@gmail.com: How many distinct users are stored in HBase ? TableInputFormat produces splits where number of splits matches the number of regions in a table. You can write your own InputFormat which splits according to user id. FYI On Tue, Apr 7, 2015 at 7:36 AM, Юра rvaniy@gmail.com wrote: Hello, guys! I am a newbie to Spark and would appreciate any advice or help. Here is the detailed question: http://stackoverflow.com/questions/29493472/does-spark-utilize-the-sorted-order-of-hbase-keys-when-using-hbase-as-data-sour Regards, Yury
Re: task not serialize
Foreach() runs in parallel across the cluster, like map, flatMap, etc. You'll only run into problems if you call collect(), which brings the entire RDD into memory in the driver program. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 7, 2015 at 3:50 AM, Jeetendra Gangele gangele...@gmail.com wrote: Lets say I follow below approach and I got RddPair with huge size .. which can not fit into one machine ... what to run foreach on this RDD? On 7 April 2015 at 04:25, Jeetendra Gangele gangele...@gmail.com wrote: On 7 April 2015 at 04:03, Dean Wampler deanwamp...@gmail.com wrote: On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks a lot.That means Spark does not support the nested RDD? if I pass the javaSparkContext that also wont work. I mean passing SparkContext not possible since its not serializable That's right. RDD don't nest and SparkContexts aren't serializable. i have a requirement where I will get JavaRDDVendorRecord matchRdd and I need to return the postential matches for this record from Hbase. so for each field of VendorRecord I have to do following 1. query Hbase to get the list of potential record in RDD 2. run logistic regression on RDD return from steps 1 and each element of the passed matchRdd. If I understand you correctly, each VectorRecord could correspond to 0-to-N records in HBase, which you need to fetch, true? yes thats correct each Vendorrecord corresponds to 0 to N matches If so, you could use the RDD flatMap method, which takes a function a that accepts each record, then returns a sequence of 0-to-N new records of some other type, like your HBase records. However, running an HBase query for each VendorRecord could be expensive. If you can turn this into a range query or something like that, it would help. I haven't used HBase much, so I don't have good advice on optimizing this, if necessary. Alternatively, can you do some sort of join on the VendorRecord RDD and an RDD of query results from HBase? Join will give too big result RDD of query result is returning around 1 for each record and i have 2 millions to process so it will be huge to have this. 2 m*1 big number For #2, it sounds like you need flatMap to return records that combine the input VendorRecords and fields pulled from HBase. Whatever you can do to make this work like table scans and joins will probably be most efficient. dean On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote: The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); } -
Advice using Spark SQL and Thrift JDBC Server
Hello, First of all, thank you to everyone working on Spark. I've only been using it for a few weeks now but so far I'm really enjoying it. You saved me from a big, scary elephant! :-) I was wondering if anyone might be able to offer some advice about working with the Thrift JDBC server? I'm trying to enable members of my team to connect and run some basic SQL queries on a Spark cluster using their favourite JDBC tools. Following the docs [1], I've managed to get something simple up and running but I'd really appreciate it if someone can validate my understanding here, as the docs don't go deeply into the details. Here are a few questions I've not been able to find answers to myself: 1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? 2) Am I therefore right in thinking that SQL queries sent to the thrift server are still executed on the Spark cluster, using Spark SQL, and Hive plays no active part in computation of results? 3) What SQL flavour is actually supported by the Thrift Server? Is it Spark SQL, Hive, or both? I've confused, because I've seen it accepting Hive CREATE TABLE syntax, but Spark SQL seems to work too? 4) When I run SQL queries using the Scala or Python shells, Spark seems to figure out the schema by itself from my Parquet files very well, if I use createTempTable on the DataFrame. It seems when running the thrift server, I need to create a Hive table definition first? Is that the case, or did I miss something? If it is, is there some sensible way to automate this? Many thanks! James [1] https://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server
Re: Does spark utilize the sorted order of hbase keys, when using hbase as data source
How many distinct users are stored in HBase ? TableInputFormat produces splits where number of splits matches the number of regions in a table. You can write your own InputFormat which splits according to user id. FYI On Tue, Apr 7, 2015 at 7:36 AM, Юра rvaniy@gmail.com wrote: Hello, guys! I am a newbie to Spark and would appreciate any advice or help. Here is the detailed question: http://stackoverflow.com/questions/29493472/does-spark-utilize-the-sorted-order-of-hbase-keys-when-using-hbase-as-data-sour Regards, Yury
Re: Microsoft SQL jdbc support from spark sql
That's correct, at this time MS SQL Server is not supported through the JDBC data source at this time. In my environment, we've been using Hadoop streaming to extract out data from multiple SQL Servers, pushing the data into HDFS, creating the Hive tables and/or converting them into Parquet, and then Spark can access them directly. Due to my heavy use of SQL Server, I've been thinking about seeing if i can help with the extension of the JDBC data source so it can be supported - but alas, I haven't found the time yet ;) On Tue, Apr 7, 2015 at 6:52 AM ARose ashley.r...@telarix.com wrote: I am having the same issue with my java application. String url = jdbc:sqlserver:// + host + :1433;DatabaseName= + database + ;integratedSecurity=true; String driver = com.microsoft.sqlserver.jdbc.SQLServerDriver; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); MapString, String options = new HashMap(); options.put(driver, driver); options.put(url, url); options.put(dbtable, tbTableName); DataFrame jdbcDF = sqlContext.load(jdbc, options); jdbcDF.printSchema(); jdbcDF.show(); It prints the schema of the DataFrame just fine, but as soon as it tries to evaluate it for the show() call, I get a ClassNotFoundException for the driver. But the driver is definitely included as a dependency, so is MS SQL Server just not supported? -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Microsoft-SQL-jdbc-support- from-spark-sql-tp22399p22404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issue with pyspark 1.3.0, sql package and rows
Hi all, I've already opened a bug on Jira some days ago [1] but I'm starting thinking this is not the correct way to go since I haven't got any news about it yet. Let me try to explain it briefly: with pyspark, trying to cogroup two input files with different schemas lead (nondeterministically) to some wrong behaviour: the object coming from the first input will have the fields of the second one (or vice-versa); the important fact is that the data in the row is actually correct, what's wrong is the content of the __FIELDS__ on the rows. Attached to the issue I posted a small snippet to reproduce the issue (which is a gist [2]). Does this happen to others as well? Is it a known issue? Am I doing anything wrong? Thank you all, [1]: https://issues.apache.org/jira/browse/SPARK-6677 [2]: https://gist.github.com/armisael/e08bb4567d0a11efe2db -- Dott. Stefano Parmesan Backend Web Developer and Data Lover ~ SpazioDati s.r.l. Via Adriano Olivetti, 13 – 4th floor Le Albere district – 38122 Trento – Italy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-pyspark-1-3-0-sql-package-and-rows-tp22405.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Microsoft SQL jdbc support from spark sql
I am having the same issue with my java application. String url = jdbc:sqlserver:// + host + :1433;DatabaseName= + database + ;integratedSecurity=true; String driver = com.microsoft.sqlserver.jdbc.SQLServerDriver; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); MapString, String options = new HashMap(); options.put(driver, driver); options.put(url, url); options.put(dbtable, tbTableName); DataFrame jdbcDF = sqlContext.load(jdbc, options); jdbcDF.printSchema(); jdbcDF.show(); It prints the schema of the DataFrame just fine, but as soon as it tries to evaluate it for the show() call, I get a ClassNotFoundException for the driver. But the driver is definitely included as a dependency, so is MS SQL Server just not supported? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399p22404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: --driver-memory parameter doesn't work for spark-submmit on yarn?
Sorry for reply late. I bypass this by set _JAVA_OPTIONS. And the ps aux | grep spark hadoop 14442 0.6 0.2 34334552 128560 pts/0 Sl+ 14:37 0:01 /usr/java/latest/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper --driver-memory=5G --executor-memory=10G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/Engine-2.0-jar-with-dependencies.jar hadoop 14544 158 13.4 37206420 8472272 pts/0 Sl+ 14:37 4:21 /usr/java/latest/bin/java -cp /home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar::/home/hadoop/spark/conf:/home/hadoop/spark/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/home/hadoop/spark/lib/datanucleus-core-3.2.10.jar:/home/hadoop/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/hadoop/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/hadoop/conf:/home/hadoop/conf -XX:MaxPermSize=128m -Dspark.driver.log.level=INFO -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --driver-memory=5G --executor-memory=10G --master yarn-client --class com.*executor.FinancialEngineExecutor /home/hadoop/lib/MiddlewareEngine-2.0-jar-with-dependencies.jar Above already have set _JAVA_OPTIONS=-Xmx30g, but looks like it doesn't show in the commandline. I guess SparkSubmit will read _JAVA_OPTIONS, but I just think this should be overwritten by the commandline params. Not sure what happen here, have no time to dig it out. But if you want me to provide more information. I will be happy to do that. Regards, Shuai -Original Message- From: Bozeman, Christopher [mailto:bozem...@amazon.com] Sent: Wednesday, April 01, 2015 4:59 PM To: Shuai Zheng; 'Sean Owen' Cc: 'Akhil Das'; user@spark.apache.org Subject: RE: --driver-memory parameter doesn't work for spark-submmit on yarn? Shuai, What did ps aux | grep spark-submit reveal? When you compare using _JAVA_OPTIONS and without using it, where do you see the difference? Thanks Christopher -Original Message- From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, April 01, 2015 11:12 AM To: 'Sean Owen' Cc: 'Akhil Das'; user@spark.apache.org Subject: RE: --driver-memory parameter doesn't work for spark-submmit on yarn? Nice. But when my case shows that even I use Yarn-Client, I have same issue. I do verify it several times. And I am running 1.3.0 on EMR (use the version dispatch by installSpark script from AWS). I agree _JAVA_OPTIONS is not a right solution, but I will use it until 1.4.0 out :) Regards, Shuai -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 01, 2015 10:51 AM To: Shuai Zheng Cc: Akhil Das; user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? I feel like I recognize that problem, and it's almost the inverse of https://issues.apache.org/jira/browse/SPARK-3884 which I was looking at today. The spark-class script didn't seem to handle all the ways that driver memory can be set. I think this is also something fixed by the new launcher library in 1.4.0. _JAVA_OPTIONS is not a good solution since it's global. On Wed, Apr 1, 2015 at 3:21 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi Akhil, Thanks a lot! After set export _JAVA_OPTIONS=-Xmx5g, the OutOfMemory exception disappeared. But this make me confused, so the driver-memory options doesn’t work for spark-submit to YARN (I haven’t check other clusters), is it a bug? Regards, Shuai From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, April 01, 2015 2:40 AM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: --driver-memory parameter doesn't work for spark-submmit on yarn? Once you submit the job do a ps aux | grep spark-submit and see how much is the heap space allocated to the process (the -Xmx params), if you are seeing a lower value you could try increasing it yourself with: export _JAVA_OPTIONS=-Xmx5g Thanks Best Regards On Wed, Apr 1, 2015 at 1:57 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn’t allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at
RE: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer
I have found the issue, but I think it is bug. If I change my class to: public class ModelSessionBuilder implements Serializable { /** * */ . private Properties[] propertiesList; private static final long serialVersionUID = -8139500301736028670L; } The broadcast value has no issue. But in my original form, if I broadcast it as array of my custom subclass of Properties, after broadcast, the propertiesList array will be an array of empty PropertiesUtils objects there (empty, not NULL), I am not sure why this happen (the code without any problem when run with default java serializer). So I think this is a bug, but I am not sure it is a bug of spark or a bug of Kryo. Regards, Shuai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, April 06, 2015 5:34 PM To: user@spark.apache.org Subject: Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer Hi All, I have tested my code without problem on EMR yarn (spark 1.3.0) with default serializer (java). But when I switch to org.apache.spark.serializer.KryoSerializer, the broadcast value doesn't give me right result (actually return me empty custom class on inner object). Basically I broadcast a builder object, which carry an array of propertiesUtils object. The code should not have any logical issue because it works on default java serializer. But when I turn to the org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn't initialize, propertiesList will give a right size, but then all element in the array is just a normal empty PropertiesUtils. Do I miss anything when I use this KryoSerializer? I just put the two lines, do I need to implement some special code to enable KryoSerializer, but I search all places but can't find any places mention it. sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); sparkConf.registerKryoClasses(new Class[]{ModelSessionBuilder.class, Constants.class, PropertiesUtils.class, ModelSession.class}); public class ModelSessionBuilder implements Serializable { /** * */ . private PropertiesUtils[] propertiesList; private static final long serialVersionUID = -8139500301736028670L; } public class PropertiesUtils extends Properties { /** * */ private static final long serialVersionUID = -3684043338580885551L; public PropertiesUtils(Properties prop) { super(prop); } public PropertiesUtils() { // TODO Auto-generated constructor stub } } Regards, Shuai
Incremently load big RDD file into Memory
val locations = filelines.map(line = line.split(\t)).map(t = (t(5).toLong, (t(2).toDouble, t(3).toDouble))).distinct().collect() val cartesienProduct=locations.cartesian(locations).map(t= Edge(t._1._1,t._2._1,distanceAmongPoints(t._1._2._1,t._1._2._2,t._2._2._1,t._2._2._2))) Code executes perfectly fine uptill here but when i try to use cartesienProduct it got stuck i.e. val count =cartesienProduct.count() Any help to efficiently do this will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incremently-load-big-RDD-file-into-Memory-tp22410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does spark utilize the sorted order of hbase keys, when using hbase as data source
Then splitting according to user id's is out of the question :-) On Tue, Apr 7, 2015 at 8:12 AM, Юра rvaniy@gmail.com wrote: There are 500 millions distinct users... 2015-04-07 17:45 GMT+03:00 Ted Yu yuzhih...@gmail.com: How many distinct users are stored in HBase ? TableInputFormat produces splits where number of splits matches the number of regions in a table. You can write your own InputFormat which splits according to user id. FYI On Tue, Apr 7, 2015 at 7:36 AM, Юра rvaniy@gmail.com wrote: Hello, guys! I am a newbie to Spark and would appreciate any advice or help. Here is the detailed question: http://stackoverflow.com/questions/29493472/does-spark-utilize-the-sorted-order-of-hbase-keys-when-using-hbase-as-data-sour Regards, Yury
Re: A problem with Spark 1.3 artifacts
BTW, just out of curiosity, I checked both the 1.3.0 release assembly and the spark-core_2.10 artifact downloaded from http://mvnrepository.com/, and neither contain any references to anything under org.eclipse (all referenced jetty classes are the shaded ones under org.spark-project.jetty). On Mon, Apr 6, 2015 at 10:30 PM, Josh Rosen rosenvi...@gmail.com wrote: My hunch is that this behavior was introduced by a patch to start shading Jetty in Spark 1.3: https://issues.apache.org/jira/browse/SPARK-3996. Note that Spark's MetricsSystem class is marked as private[spark] and thus isn't intended to be interacted with directly by users. It's not super likely that this API would break, but it's excluded from our MiMa checks and thus is liable to change in incompatible ways across releases. If you add these Jetty classes as a compile-only dependency but don't add them to the runtime classpath, do you get runtime errors? If the metrics system is usable at runtime and we only have errors when attempting to compile user code against non-public APIs, then I'm not sure that this is a high-priority issue to fix since. If the metrics system doesn't work at runtime, on the other hand, then that's definitely a bug that should be fixed. If you'd like to continue debugging this issue, I think we should move this discussion over to JIRA so it's easier to track and reference. Hope this helps, Josh On Thu, Apr 2, 2015 at 7:34 AM, Jacek Lewandowski jacek.lewandow...@datastax.com wrote: A very simple example which works well with Spark 1.2, and fail to compile with Spark 1.3: build.sbt: name := untitled version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 Test.scala: package org.apache.spark.metrics import org.apache.spark.SparkEnv class Test { SparkEnv.get.metricsSystem.report() } Produces: Error:scalac: bad symbolic reference. A signature in MetricsSystem.class refers to term eclipse in package org which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling MetricsSystem.class. Error:scalac: bad symbolic reference. A signature in MetricsSystem.class refers to term jetty in value org.eclipse which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling MetricsSystem.class. This looks like something wrong with shading jetty. MetricsSystem references MetricsServlet which references some classes from Jetty, in the original package instead of shaded one. I'm not sure, but adding the following dependencies solves the problem: libraryDependencies += org.eclipse.jetty % jetty-server % 8.1.14.v20131031 libraryDependencies += org.eclipse.jetty % jetty-servlet % 8.1.14.v20131031 Is it intended or is it a bug? Thanks ! Jacek -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
The differentce between SparkSql/DataFram join and Rdd join
Hi, We have 2 hive tables and want to join one with the other. Initially, we ran a sql request on HiveContext. But it did not work. It was blocked on 30/600 tasks. Then we tried to load tables into two DataFrames, we have encountered the same problem. Finally, it works with RDD.join. What we have done is basically transforming 2 tables into 2 pair RDDs, then calling a join operation. It works great in about 500 s. However, workaround is just a workaround, since we have to transform hive tables into RDD. This is really annoying. Just wondering whether the underlying code of DF/SQL's join operation is the same as rdd's, knowing that there is a syntax analysis layer for DF/SQL, while RDD's join is straightforward on two pair RDDs. SQL request: -- select v1.receipt_id, v1.sku, v1.amount, v1.qty, v2.discount from table1 as v1 left join table2 as v2 on v1.receipt_id = v2.receipt_id where v1.sku != DataFrame: - val rdd1 = ss.hiveContext.table(table1) val rdd1Filt = rdd1.filter(rdd1.col(sku) !== ) val rdd2 = ss.hiveContext.table(table2) val rddJoin = rdd1Filt.join(rdd2, rdd1Filt(receipt_id) === rdd2(receipt_id)) rddJoin.saveAsTable(testJoinTable, SaveMode.Overwrite) RDD workaround in this case is a bit cumbersome, for short, we just created 2 RDDs, join, and then apply a new schema on the result RDD. This approach works, at least all tasks were finished, while the DF/SQL approach don't. Any idea ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-differentce-between-SparkSql-DataFram-join-and-Rdd-join-tp22407.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: task not serialize
I thinking to follow the below approach(in my class hbase also return the same object which i will get in RDD) .1 First run the flatMapPairf JavaPairRDDVendorRecord, IterableVendorRecord pairvendorData =matchRdd.flatMapToPair( new PairFlatMapFunctionVendorRecord, VendorRecord, VendorRecord(){ @Override public IterableTuple2VendorRecord,VendorRecord call( VendorRecord t) throws Exception { ListTuple2VendorRecord, VendorRecord pairs = new LinkedListTuple2VendorRecord, VendorRecord(); MatcherKeys matchkeys=CompanyMatcherHelper.getBlockinkeys(t); ListVendorRecord Matchedrecords =ckdao.getMatchingRecordsWithscan(matchkeys); for(int i=0;iMatchedrecords.size();i++){ pairs.add( new Tuple2VendorRecord,VendorRecord(t,Matchedrecords.get(i))); } return pairs; } } ).groupByKey(200); Question will it store the returned RDD in one Node? or it only bring when I run the second step? in groupBy if I increase the partiotionNumber will it increase the prformance 2. Then apply mapPartition on this RDD and do logistic regression here.my my issue is my logistic regression function take On 7 April 2015 at 18:38, Dean Wampler deanwamp...@gmail.com wrote: Foreach() runs in parallel across the cluster, like map, flatMap, etc. You'll only run into problems if you call collect(), which brings the entire RDD into memory in the driver program. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 7, 2015 at 3:50 AM, Jeetendra Gangele gangele...@gmail.com wrote: Lets say I follow below approach and I got RddPair with huge size .. which can not fit into one machine ... what to run foreach on this RDD? On 7 April 2015 at 04:25, Jeetendra Gangele gangele...@gmail.com wrote: On 7 April 2015 at 04:03, Dean Wampler deanwamp...@gmail.com wrote: On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks a lot.That means Spark does not support the nested RDD? if I pass the javaSparkContext that also wont work. I mean passing SparkContext not possible since its not serializable That's right. RDD don't nest and SparkContexts aren't serializable. i have a requirement where I will get JavaRDDVendorRecord matchRdd and I need to return the postential matches for this record from Hbase. so for each field of VendorRecord I have to do following 1. query Hbase to get the list of potential record in RDD 2. run logistic regression on RDD return from steps 1 and each element of the passed matchRdd. If I understand you correctly, each VectorRecord could correspond to 0-to-N records in HBase, which you need to fetch, true? yes thats correct each Vendorrecord corresponds to 0 to N matches If so, you could use the RDD flatMap method, which takes a function a that accepts each record, then returns a sequence of 0-to-N new records of some other type, like your HBase records. However, running an HBase query for each VendorRecord could be expensive. If you can turn this into a range query or something like that, it would help. I haven't used HBase much, so I don't have good advice on optimizing this, if necessary. Alternatively, can you do some sort of join on the VendorRecord RDD and an RDD of query results from HBase? Join will give too big result RDD of query result is returning around 1 for each record and i have 2 millions to process so it will be huge to have this. 2 m*1 big number For #2, it sounds like you need flatMap to return records that combine the input VendorRecords and fields pulled from HBase. Whatever you can do to make this work like table scans and joins will probably be most efficient. dean On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote: The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext();
Re: RDD collect hangs on large input data
Zsolt - what version of Java are you running? On Mon, Mar 30, 2015 at 7:12 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: Thanks for your answer! I don't call .collect because I want to trigger the execution. I call it because I need the rdd on the driver. This is not a huge RDD and it's not larger than the one returned with 50GB input data. The end of the stack trace: The two IP's are the two worker nodes, I think they can't connect to the driver after they finished their part of the collect(). 15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage 1.0 (TID 1745). 1414 bytes result sent to driver 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200) called with curMem=405753, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as values in memory (estimated size 200.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called with curMem=405953, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as values in memory (estimated size 80.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_867 15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage 1.0 (TID 1740). 1440 bytes result sent to driver 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_868 15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage 1.0 (TID 1741). 1422 bytes result sent to driver 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:42026 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:41703 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in connection from /10.99.144.92:49021 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at
FlatMapPair run for longer time
Hi All I am running the below code and its running for very long time where input to flatMapTopair is record of 50K. and I am calling Hbase for 50K times just a range scan query to should not take time. can anybody guide me what is wrong here? JavaPairRDDVendorRecord, IterableVendorRecord pairvendorData =matchRdd.flatMapToPair( new PairFlatMapFunctionVendorRecord, VendorRecord, VendorRecord(){ @Override public IterableTuple2VendorRecord,VendorRecord call( VendorRecord t) throws Exception { ListTuple2VendorRecord, VendorRecord pairs = new LinkedListTuple2VendorRecord, VendorRecord(); MatcherKeys matchkeys=CompanyMatcherHelper.getBlockinkeys(t); ListVendorRecord Matchedrecords =ckdao.getMatchingRecordsWithscan(matchkeys); for(int i=0;iMatchedrecords.size();i++){ pairs.add( new Tuple2VendorRecord,VendorRecord(t,Matchedrecords.get(i))); } return pairs; } } ).groupByKey(200).persist(StorageLevel.DISK_ONLY_2());
Re: Using DIMSUM with ids
I have a version that works well for Netflix data but now I am validating on internal datasets..this code will work on matrix factors and sparse matrices that has rows = 100* columnsif columns are much smaller than rows then col based flow works well...basically we need both flows... I did not think on random sampling yet but LSH will work well...metric is the key here and so every optimization needs to be validated wrt the raw flow.. On Apr 6, 2015 10:15 AM, Reza Zadeh r...@databricks.com wrote: Right now dimsum is meant to be used for tall and skinny matrices, and so columnSimilarities() returns similar columns, not rows. We are working on adding an efficient row similarity as well, tracked by this JIRA: https://issues.apache.org/jira/browse/SPARK-4823 Reza On Mon, Apr 6, 2015 at 6:08 AM, James alcaid1...@gmail.com wrote: The example below illustrates how to use the DIMSUM algorithm to calculate the similarity between each two rows and output row pairs with cosine simiarity that is not less than a threshold. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala But what if I hope to hold an Id of each row, which means the input file is: id1 vector1 id2 vector2 id3 vector3 ... And we hope to output id1 id2 sim(id1, id2) id1 id3 sim(id1, id3) ... Alcaid
RE: Incremently load big RDD file into Memory
cartesian is an expensive operation. If you have 'M' records in location, then locations. cartesian(locations) will generate MxM result.If locations is a big RDD, it is hard to do the locations. cartesian(locations) efficiently.Yong Date: Tue, 7 Apr 2015 10:04:12 -0700 From: mas.ha...@gmail.com To: user@spark.apache.org Subject: Incremently load big RDD file into Memory val locations = filelines.map(line = line.split(\t)).map(t = (t(5).toLong, (t(2).toDouble, t(3).toDouble))).distinct().collect() val cartesienProduct=locations.cartesian(locations).map(t= Edge(t._1._1,t._2._1,distanceAmongPoints(t._1._2._1,t._1._2._2,t._2._2._1,t._2._2._2))) Code executes perfectly fine uptill here but when i try to use cartesienProduct it got stuck i.e. val count =cartesienProduct.count() Any help to efficiently do this will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incremently-load-big-RDD-file-into-Memory-tp22410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapPair run for longer time
It's hard for us to diagnose your performance problems, because we don't have your environment and fixing one will simply reveal the next one to be fixed. So, I suggest you use the following strategy to figure out what takes the most time and hence what you might try to optimize. Try replacing expressions that might be expensive with stubs. Your calls to HBase for example. What happens if your replace the call with fake, hard-coded data? Does performance improve dramatically? If so, then optimize how you query HBase. If it makes no significant difference, try something else. Also try looking at the Spark source code to understand what happens under the hood. At this point, your best bet is to develop your intuition about the performance overhead of various constructs in real-world scenarios and how Spark distributes computation. Then you'll find it easier to know what to optimize. You'll want to understand what happens in flatMap, filter, join, groupBy, reduce, etc. Don't forget this guide, too: https://spark.apache.org/docs/latest/tuning.html. The Learning Spark book from O'Reilly is also really helpful. I also recommend that you switch to Java 8 and Lambdas, or go all the way to Scala, so all that noisy code shrinks down to simpler expressions. You'll be surprised how helpful that is for comprehending your code and reasoning about it. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 7, 2015 at 12:54 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am running the below code and its running for very long time where input to flatMapTopair is record of 50K. and I am calling Hbase for 50K times just a range scan query to should not take time. can anybody guide me what is wrong here? JavaPairRDDVendorRecord, IterableVendorRecord pairvendorData =matchRdd.flatMapToPair( new PairFlatMapFunctionVendorRecord, VendorRecord, VendorRecord(){ @Override public IterableTuple2VendorRecord,VendorRecord call( VendorRecord t) throws Exception { ListTuple2VendorRecord, VendorRecord pairs = new LinkedListTuple2VendorRecord, VendorRecord(); MatcherKeys matchkeys=CompanyMatcherHelper.getBlockinkeys(t); ListVendorRecord Matchedrecords =ckdao.getMatchingRecordsWithscan(matchkeys); for(int i=0;iMatchedrecords.size();i++){ pairs.add( new Tuple2VendorRecord,VendorRecord(t,Matchedrecords.get(i))); } return pairs; } } ).groupByKey(200).persist(StorageLevel.DISK_ONLY_2());
Re: The differentce between SparkSql/DataFram join and Rdd join
The joins here are totally different implementations, but it is worrisome that you are seeing the SQL join hanging. Can you provide more information about the hang? jstack of the driver and a worker that is processing a task would be very useful. On Tue, Apr 7, 2015 at 8:33 AM, Hao Ren inv...@gmail.com wrote: Hi, We have 2 hive tables and want to join one with the other. Initially, we ran a sql request on HiveContext. But it did not work. It was blocked on 30/600 tasks. Then we tried to load tables into two DataFrames, we have encountered the same problem. Finally, it works with RDD.join. What we have done is basically transforming 2 tables into 2 pair RDDs, then calling a join operation. It works great in about 500 s. However, workaround is just a workaround, since we have to transform hive tables into RDD. This is really annoying. Just wondering whether the underlying code of DF/SQL's join operation is the same as rdd's, knowing that there is a syntax analysis layer for DF/SQL, while RDD's join is straightforward on two pair RDDs. SQL request: -- select v1.receipt_id, v1.sku, v1.amount, v1.qty, v2.discount from table1 as v1 left join table2 as v2 on v1.receipt_id = v2.receipt_id where v1.sku != DataFrame: - val rdd1 = ss.hiveContext.table(table1) val rdd1Filt = rdd1.filter(rdd1.col(sku) !== ) val rdd2 = ss.hiveContext.table(table2) val rddJoin = rdd1Filt.join(rdd2, rdd1Filt(receipt_id) === rdd2(receipt_id)) rddJoin.saveAsTable(testJoinTable, SaveMode.Overwrite) RDD workaround in this case is a bit cumbersome, for short, we just created 2 RDDs, join, and then apply a new schema on the result RDD. This approach works, at least all tasks were finished, while the DF/SQL approach don't. Any idea ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-differentce-between-SparkSql-DataFram-join-and-Rdd-join-tp22407.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Advice using Spark SQL and Thrift JDBC Server
1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? Underneath the covers, the Spark SQL thrift server is executing queries using a HiveContext. In this mode, nearly all computation is done with Spark SQL but we try to maintain compatibility with Hive wherever possible. This means that you can write your queries in HiveQL, read tables from the Hive metastore, and use Hive UDFs UDTs UDAFs, etc. The one exception here is Hive DDL operations (CREATE TABLE, etc). These are passed directly to Hive code and executed there. The Spark SQL DDL is sufficiently different that we always try to parse that first, and fall back to Hive when it does not parse. One possibly confusing point here, is that you can persist Spark SQL tables into the Hive metastore, but this is not the same as a Hive table. We are only use the metastore as a repo for metadata, but are not using their format for the information in this case (as we have datasources that hive does not understand, including things like schema auto discovery). HiveQL DDL, run by Hive but can be read by Spark SQL: CREATE TABLE t (x INT) SORTED AS PARQUET Spark SQL DDL, run by Spark SQL, stored in metastore, cannot be read by hive: CREATE TABLE t USING parquet (path '/path/to/data') 2) Am I therefore right in thinking that SQL queries sent to the thrift server are still executed on the Spark cluster, using Spark SQL, and Hive plays no active part in computation of results? Correct. 3) What SQL flavour is actually supported by the Thrift Server? Is it Spark SQL, Hive, or both? I've confused, because I've seen it accepting Hive CREATE TABLE syntax, but Spark SQL seems to work too? HiveQL++ (with Spark SQL DDL). You can make it use our simple SQL parser by `SET spark.sql.dialect=sql`, but honestly you probably don't want to do this. The included SQL parser is mostly there for people who have dependency conflicts with Hive. 4) When I run SQL queries using the Scala or Python shells, Spark seems to figure out the schema by itself from my Parquet files very well, if I use createTempTable on the DataFrame. It seems when running the thrift server, I need to create a Hive table definition first? Is that the case, or did I miss something? If it is, is there some sensible way to automate this? Temporary tables are only visible to the SQLContext that creates them. If you want it to be visible to the server, you need to either start the thrift server with the same context your program is using (see HiveThriftServer2.createWithContext) or make a metastore table. This can be done using Spark SQL DDL: CREATE TABLE t USING parquet (path '/path/to/data') Michael
Array[T].distinct doesn't work inside RDD
Hi, I have a question about Array[T].distinct on customized class T. My data is a like RDD[(String, Array[T])] in which T is a class written by my class. There are some duplicates in each Array[T] so I want to remove them. I override the equals() method in T and use val dataNoDuplicates = dataDuplicates.map{case(id, arr) = (id, arr.distinct)} to remove duplicates inside RDD. However this doesn't work since I did some further tests by using val dataNoDuplicates = dataDuplicates.map{case(id, arr) = val uniqArr = arr.distinct if(uniqArr.length 1) println(uniqArr.head == uniqArr.last) (id, uniqArr) } And from the worker stdout I could see that it always returns TRUE results. I then tried removing duplicates by using Array[T].toSet instead of Array[T].distinct and it is working! Could anybody explain why the Array[T].toSet and Array[T].distinct behaves differently here? And Why is Array[T].distinct not working? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Array-T-distinct-doesn-t-work-inside-RDD-tp22412.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: scala.MatchError: class org.apache.avro.Schema (of class java.lang.Class)
For more details on my question http://apache-spark-user-list.1001560.n3.nabble.com/How-to-generate-Java-bean-class-for-avro-files-using-spark-avro-project-tp22413.html Thanks, Yamini On Tue, Apr 7, 2015 at 2:23 PM, Yamini Maddirala yamini.m...@gmail.com wrote: Hi Michael, Yes, I did try spark-avro 0.2.0 databricks project. I am using CHD5.3 which is based on spark 1.2. Hence I'm bound to use spark-avro 0.2.0 instead of the latest. I'm not sure how spark-avro project can help me in this scenario. 1. I have JavaDStream of type avro generic record :JavaDStreamGenericRecord [This is the data being read from kafka topics] 2. I'm able to get JavaSchemaRDD using the avro file like this final JavaSchemaRDD schemaRDD2 = AvroUtils.avroFile(sqlContext, /xyz-Project/trunk/src/main/resources/xyz.avro); 3. I don't know how I can apply schema in step 2 to data in step 1. I chose to do something like this JavaSchemaRDD schemaRDD = sqlContext.applySchema(genericRecordJavaRDD, xyz.class); Used avro maven plugin to generate xyz class in Java. But this is not good because avro maven plugin creates a field SCHEMA which is not supported in applySchema method. Please let me know how to deal with this. Appreciate your help Thanks, Yamini On Tue, Apr 7, 2015 at 1:57 PM, Michael Armbrust mich...@databricks.com wrote: Have you looked at spark-avro? https://github.com/databricks/spark-avro On Tue, Apr 7, 2015 at 3:57 AM, Yamini yamini.m...@gmail.com wrote: Using spark(1.2) streaming to read avro schema based topics flowing in kafka and then using spark sql context to register data as temp table. Avro maven plugin(1.7.7 version) generates the java bean class for the avro file but includes a field named SCHEMA$ of type org.apache.avro.Schema which is not supported in the JavaSQLContext class[Method : applySchema]. How to auto generate java bean class for the avro file and over come the above mentioned problem. Thanks. - Thanks, Yamini -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-MatchError-class-org-apache-avro-Schema-of-class-java-lang-Class-tp22402.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] aggregateMessages with active set
We thought it would be better to simplify the interface, since the active set is a performance optimization but the result is identical to calling subgraph before aggregateMessages. The active set option is still there in the package-private method aggregateMessagesWithActiveSet. You can actually access it publicly via GraphImpl, though the API isn't guaranteed to be stable: graph.asInstanceOf[GraphImpl[VD,ED]].aggregateMessagesWithActiveSet(...) Ankur On Tue, Apr 7, 2015 at 2:56 AM, James alcaid1...@gmail.com wrote: Hello, The old api of GraphX mapReduceTriplets has an optional parameter activeSetOpt: Option[(VertexRDD[_] that limit the input of sendMessage. However, to the new api aggregateMessages I could not find this option, why it does not offer any more? Alcaid - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: scala.MatchError: class org.apache.avro.Schema (of class java.lang.Class)
Have you looked at spark-avro? https://github.com/databricks/spark-avro On Tue, Apr 7, 2015 at 3:57 AM, Yamini yamini.m...@gmail.com wrote: Using spark(1.2) streaming to read avro schema based topics flowing in kafka and then using spark sql context to register data as temp table. Avro maven plugin(1.7.7 version) generates the java bean class for the avro file but includes a field named SCHEMA$ of type org.apache.avro.Schema which is not supported in the JavaSQLContext class[Method : applySchema]. How to auto generate java bean class for the avro file and over come the above mentioned problem. Thanks. - Thanks, Yamini -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-MatchError-class-org-apache-avro-Schema-of-class-java-lang-Class-tp22402.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not get executor's Log from Spark's History Server
The Spark history server does not have the ability to serve executor logs currently. You need to use the yarn logs command for that. On Tue, Apr 7, 2015 at 2:51 AM, donhoff_h 165612...@qq.com wrote: Hi, Experts I run my Spark Cluster on Yarn. I used to get executors' Logs from Spark's History Server. But after I started my Hadoop jobhistory server and made configuration to aggregate logs of hadoop jobs to a HDFS directory, I found that I could not get spark's executors' Logs any more. Is there any solution so that I could get logs of my spark jobs from Spark History Server and get logs of my map-reduce jobs from Hadoop History Server? Many Thanks! Following is the configuration I made in Hadoop yarn-site.xml yarn.log-aggregation-enable=true yarn.nodemanager.remote-app-log-dir=/mr-history/agg-logs yarn.log-aggregation.retain-seconds=259200 yarn.log-aggregation.retain-check-interval-seconds=-1 -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: scala.MatchError: class org.apache.avro.Schema (of class java.lang.Class)
Hi Michael, Yes, I did try spark-avro 0.2.0 databricks project. I am using CHD5.3 which is based on spark 1.2. Hence I'm bound to use spark-avro 0.2.0 instead of the latest. I'm not sure how spark-avro project can help me in this scenario. 1. I have JavaDStream of type avro generic record :JavaDStreamGenericRecord [This is the data being read from kafka topics] 2. I'm able to get JavaSchemaRDD using the avro file like this final JavaSchemaRDD schemaRDD2 = AvroUtils.avroFile(sqlContext, /xyz-Project/trunk/src/main/resources/xyz.avro); 3. I don't know how I can apply schema in step 2 to data in step 1. I chose to do something like this JavaSchemaRDD schemaRDD = sqlContext.applySchema(genericRecordJavaRDD, xyz.class); Used avro maven plugin to generate xyz class in Java. But this is not good because avro maven plugin creates a field SCHEMA which is not supported in applySchema method. Please let me know how to deal with this. Appreciate your help Thanks, Yamini On Tue, Apr 7, 2015 at 1:57 PM, Michael Armbrust mich...@databricks.com wrote: Have you looked at spark-avro? https://github.com/databricks/spark-avro On Tue, Apr 7, 2015 at 3:57 AM, Yamini yamini.m...@gmail.com wrote: Using spark(1.2) streaming to read avro schema based topics flowing in kafka and then using spark sql context to register data as temp table. Avro maven plugin(1.7.7 version) generates the java bean class for the avro file but includes a field named SCHEMA$ of type org.apache.avro.Schema which is not supported in the JavaSQLContext class[Method : applySchema]. How to auto generate java bean class for the avro file and over come the above mentioned problem. Thanks. - Thanks, Yamini -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-MatchError-class-org-apache-avro-Schema-of-class-java-lang-Class-tp22402.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to get SparkSql results on a webpage on real time
Hi, I have written a scala object which can do query on the messages which I am receiving from Kafka. Now I have to show it on some webpage or dashboard which can auto refresh with new results.. Any pointer how can I do that.. Thanks, Mukund
How to generate Java bean class for avro files using spark avro project
Is there a way to generate Java bean for a given avro schema file in spark 1.2 using spark-avro project 0.2.0 for following use case? 1. Topics from kafka read and stored in the form of avro generic records :JavaDStreamGenericRecords 2. Using spark avro project able to get the schema in the following way JavaSchemaRDD schemaRDD2 = AvroUtils.avroFile(sqlContext, PathTofile.avro) 3. For each record in the above mentioned JavaDStream, need to apply schema retrieved in step 2. Chose to do this JavaSchemaRDD schemaRDD = sqlContext.applySchema(genericRecordJavaRDD, PathTofile.class) To generate Java bean class(PathTofile.class) chose to use avro maven plugin. But the generated java bean using plugin includes field named SCHEMA which is not supported my the applySchema method mentioned above. Please let me know if there is a better solution for this. - Thanks, Yamini -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-generate-Java-bean-class-for-avro-files-using-spark-avro-project-tp22413.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: From DataFrame to LabeledPoint
Solved! Thanks for ur help. I had converted null values to Double value (0.0) El 06/04/2015 19:25, Joseph Bradley jos...@databricks.com escribió: I'd make sure you're selecting the correct columns. If not that, then your input data might be corrupt. CCing user to keep it on the user list. On Mon, Apr 6, 2015 at 6:53 AM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Hi!, I had tried your solution, and I saw that the first row is null. This is important? Can I work with null rows? Some rows have some columns with null values. This is the first row of Dataframe: scala dataDF.take(1) res11: Array[org.apache.spark.sql.Row] = Array([null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null]) This is the RDD[LabeledPoint] created: scala data.take(1) 15/04/06 15:46:31 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 243, 10.101.5.194): java.lang.NullPointerException Thank's for all. Sergio J. 2015-04-03 20:14 GMT+02:00 Joseph Bradley jos...@databricks.com: I'd recommend going through each step, taking 1 RDD element (myDataFrame.take(1)), and examining it to see where this issue is happening. On Fri, Apr 3, 2015 at 9:44 AM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: This solution its really good. But I was working with feature.toString.toDouble because the feature is the type Any. Now, when I try to work with the LabeledPoint created I have a NullPointerException =/ El 02/04/2015 21:23, Joseph Bradley jos...@databricks.com escribió: Peter's suggestion sounds good, but watch out for the match case since I believe you'll have to match on: case (Row(feature1, feature2, ...), Row(label)) = On Thu, Apr 2, 2015 at 7:57 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi try next code: val labeledPoints: RDD[LabeledPoint] = features.zip(labels).map{ case Row(feture1, feture2,..., label) = LabeledPoint(label, Vectors.dense(feature1, feature2, ...)) } Thanks, Peter Rudenko On 2015-04-02 17:17, drarse wrote: Hello!, I have a questions since days ago. I am working with DataFrame and with Spark SQL I imported a jsonFile: /val df = sqlContext.jsonFile(file.json)/ In this json I have the label and de features. I selected it: / val features = df.select (feature1,feature2,feature3,...); val labels = df.select (cassification)/ But, now, I don't know create a LabeledPoint for RandomForest. I tried some solutions without success. Can you help me? Thanks for all! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/From-DataFrame-to-LabeledPoint-tp22354.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: A problem with Spark 1.3 artifacts
Maybe you have some sbt-built 1.3 version in your ~/.ivy2/ directory that's masking the maven one? That's the only explanation I can come up with... On Tue, Apr 7, 2015 at 12:22 PM, Jacek Lewandowski jacek.lewandow...@datastax.com wrote: So weird, as I said - I created a new empty project where Spark core was the only dependency... -- Marcelo
Re: Advice using Spark SQL and Thrift JDBC Server
That should totally work. The other option would be to run a persistent metastore that multiple contexts can talk to and periodically run a job that creates missing tables. The trade-off here would be more complexity, but less downtime due to the server restarting. On Tue, Apr 7, 2015 at 12:34 PM, James Aley james.a...@swiftkey.com wrote: Hi Michael, Thanks so much for the reply - that really cleared a lot of things up for me! Let me just check that I've interpreted one of your suggestions for (4) correctly... Would it make sense for me to write a small wrapper app that pulls in hive-thriftserver as a dependency, iterates my Parquet directory structure to discover tables and registers each as a temp table in some context, before calling HiveThriftServer2.createWithContext as you suggest? This would mean that to add new content, all I need to is restart that app, which presumably could also be avoided fairly trivially by periodically restarting the server with a new context internally. That certainly beats manual curation of Hive table definitions, if it will work? Thanks again, James. On 7 April 2015 at 19:30, Michael Armbrust mich...@databricks.com wrote: 1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? Underneath the covers, the Spark SQL thrift server is executing queries using a HiveContext. In this mode, nearly all computation is done with Spark SQL but we try to maintain compatibility with Hive wherever possible. This means that you can write your queries in HiveQL, read tables from the Hive metastore, and use Hive UDFs UDTs UDAFs, etc. The one exception here is Hive DDL operations (CREATE TABLE, etc). These are passed directly to Hive code and executed there. The Spark SQL DDL is sufficiently different that we always try to parse that first, and fall back to Hive when it does not parse. One possibly confusing point here, is that you can persist Spark SQL tables into the Hive metastore, but this is not the same as a Hive table. We are only use the metastore as a repo for metadata, but are not using their format for the information in this case (as we have datasources that hive does not understand, including things like schema auto discovery). HiveQL DDL, run by Hive but can be read by Spark SQL: CREATE TABLE t (x INT) SORTED AS PARQUET Spark SQL DDL, run by Spark SQL, stored in metastore, cannot be read by hive: CREATE TABLE t USING parquet (path '/path/to/data') 2) Am I therefore right in thinking that SQL queries sent to the thrift server are still executed on the Spark cluster, using Spark SQL, and Hive plays no active part in computation of results? Correct. 3) What SQL flavour is actually supported by the Thrift Server? Is it Spark SQL, Hive, or both? I've confused, because I've seen it accepting Hive CREATE TABLE syntax, but Spark SQL seems to work too? HiveQL++ (with Spark SQL DDL). You can make it use our simple SQL parser by `SET spark.sql.dialect=sql`, but honestly you probably don't want to do this. The included SQL parser is mostly there for people who have dependency conflicts with Hive. 4) When I run SQL queries using the Scala or Python shells, Spark seems to figure out the schema by itself from my Parquet files very well, if I use createTempTable on the DataFrame. It seems when running the thrift server, I need to create a Hive table definition first? Is that the case, or did I miss something? If it is, is there some sensible way to automate this? Temporary tables are only visible to the SQLContext that creates them. If you want it to be visible to the server, you need to either start the thrift server with the same context your program is using (see HiveThriftServer2.createWithContext) or make a metastore table. This can be done using Spark SQL DDL: CREATE TABLE t USING parquet (path '/path/to/data') Michael
set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?
Hi All, I am a bit confused on spark.storage.memoryFraction, this is used to set the area for RDD usage, will this RDD means only for cached and persisted RDD? So if my program has no cached RDD at all (means that I have no .cache() or .persist() call on any RDD), then I can set this spark.storage.memoryFraction to a very small number or even zero? I am writing a program which consume a lot of memory (broadcast value, runtime, etc). But I have no cached RDD, so should I just turn off this spark.storage.memoryFraction to 0 (which will help me to improve the performance)? And I have another issue on the broadcast, when I try to get a broadcast value, it throws me out of memory error, which part of memory should I allocate more (if I can't increase my overall memory size). java.lang.OutOfMemoryError: Java heap spac e at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:218) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:200) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo Serializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser ializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2 48) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13 6) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5 49) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431 ) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo ck$1.apply(TorrentBroadcast.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren tBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent Broadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s cala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast .scala:87) Regards, Shuai
Re: A problem with Spark 1.3 artifacts
So weird, as I said - I created a new empty project where Spark core was the only dependency... [image: datastax_logo.png] http://www.datastax.com/ JACEK LEWANDOWSKI DSE Software Engineer | +48.609.810.774 | jacek.lewandow...@datastax.com [image: linkedin.png] https://www.linkedin.com/company/datastax [image: facebook.png] https://www.facebook.com/datastax [image: twitter.png] https://twitter.com/datastax [image: g+.png] https://plus.google.com/+Datastax/about http://feeds.feedburner.com/datastax https://github.com/datastax/ DataStax is the fastest, most scalable distributed database technology, delivering Apache Cassandra to the world’s most innovative enterprises. Datastax is built to be agile, always-on, and predictably scalable to any size. With more than 500 customers in 45 countries, DataStax is the database technology and transactional backbone of choice for the worlds most innovative companies such as Netflix, Adobe, Intuit, and eBay. On Tue, Apr 7, 2015 at 7:08 PM, Marcelo Vanzin van...@cloudera.com wrote: BTW, just out of curiosity, I checked both the 1.3.0 release assembly and the spark-core_2.10 artifact downloaded from http://mvnrepository.com/, and neither contain any references to anything under org.eclipse (all referenced jetty classes are the shaded ones under org.spark-project.jetty). On Mon, Apr 6, 2015 at 10:30 PM, Josh Rosen rosenvi...@gmail.com wrote: My hunch is that this behavior was introduced by a patch to start shading Jetty in Spark 1.3: https://issues.apache.org/jira/browse/SPARK-3996. Note that Spark's MetricsSystem class is marked as private[spark] and thus isn't intended to be interacted with directly by users. It's not super likely that this API would break, but it's excluded from our MiMa checks and thus is liable to change in incompatible ways across releases. If you add these Jetty classes as a compile-only dependency but don't add them to the runtime classpath, do you get runtime errors? If the metrics system is usable at runtime and we only have errors when attempting to compile user code against non-public APIs, then I'm not sure that this is a high-priority issue to fix since. If the metrics system doesn't work at runtime, on the other hand, then that's definitely a bug that should be fixed. If you'd like to continue debugging this issue, I think we should move this discussion over to JIRA so it's easier to track and reference. Hope this helps, Josh On Thu, Apr 2, 2015 at 7:34 AM, Jacek Lewandowski jacek.lewandow...@datastax.com wrote: A very simple example which works well with Spark 1.2, and fail to compile with Spark 1.3: build.sbt: name := untitled version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 Test.scala: package org.apache.spark.metrics import org.apache.spark.SparkEnv class Test { SparkEnv.get.metricsSystem.report() } Produces: Error:scalac: bad symbolic reference. A signature in MetricsSystem.class refers to term eclipse in package org which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling MetricsSystem.class. Error:scalac: bad symbolic reference. A signature in MetricsSystem.class refers to term jetty in value org.eclipse which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling MetricsSystem.class. This looks like something wrong with shading jetty. MetricsSystem references MetricsServlet which references some classes from Jetty, in the original package instead of shaded one. I'm not sure, but adding the following dependencies solves the problem: libraryDependencies += org.eclipse.jetty % jetty-server % 8.1.14.v20131031 libraryDependencies += org.eclipse.jetty % jetty-servlet % 8.1.14.v20131031 Is it intended or is it a bug? Thanks ! Jacek -- Marcelo
Re: Advice using Spark SQL and Thrift JDBC Server
Hi Michael, Thanks so much for the reply - that really cleared a lot of things up for me! Let me just check that I've interpreted one of your suggestions for (4) correctly... Would it make sense for me to write a small wrapper app that pulls in hive-thriftserver as a dependency, iterates my Parquet directory structure to discover tables and registers each as a temp table in some context, before calling HiveThriftServer2.createWithContext as you suggest? This would mean that to add new content, all I need to is restart that app, which presumably could also be avoided fairly trivially by periodically restarting the server with a new context internally. That certainly beats manual curation of Hive table definitions, if it will work? Thanks again, James. On 7 April 2015 at 19:30, Michael Armbrust mich...@databricks.com wrote: 1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? Underneath the covers, the Spark SQL thrift server is executing queries using a HiveContext. In this mode, nearly all computation is done with Spark SQL but we try to maintain compatibility with Hive wherever possible. This means that you can write your queries in HiveQL, read tables from the Hive metastore, and use Hive UDFs UDTs UDAFs, etc. The one exception here is Hive DDL operations (CREATE TABLE, etc). These are passed directly to Hive code and executed there. The Spark SQL DDL is sufficiently different that we always try to parse that first, and fall back to Hive when it does not parse. One possibly confusing point here, is that you can persist Spark SQL tables into the Hive metastore, but this is not the same as a Hive table. We are only use the metastore as a repo for metadata, but are not using their format for the information in this case (as we have datasources that hive does not understand, including things like schema auto discovery). HiveQL DDL, run by Hive but can be read by Spark SQL: CREATE TABLE t (x INT) SORTED AS PARQUET Spark SQL DDL, run by Spark SQL, stored in metastore, cannot be read by hive: CREATE TABLE t USING parquet (path '/path/to/data') 2) Am I therefore right in thinking that SQL queries sent to the thrift server are still executed on the Spark cluster, using Spark SQL, and Hive plays no active part in computation of results? Correct. 3) What SQL flavour is actually supported by the Thrift Server? Is it Spark SQL, Hive, or both? I've confused, because I've seen it accepting Hive CREATE TABLE syntax, but Spark SQL seems to work too? HiveQL++ (with Spark SQL DDL). You can make it use our simple SQL parser by `SET spark.sql.dialect=sql`, but honestly you probably don't want to do this. The included SQL parser is mostly there for people who have dependency conflicts with Hive. 4) When I run SQL queries using the Scala or Python shells, Spark seems to figure out the schema by itself from my Parquet files very well, if I use createTempTable on the DataFrame. It seems when running the thrift server, I need to create a Hive table definition first? Is that the case, or did I miss something? If it is, is there some sensible way to automate this? Temporary tables are only visible to the SQLContext that creates them. If you want it to be visible to the server, you need to either start the thrift server with the same context your program is using (see HiveThriftServer2.createWithContext) or make a metastore table. This can be done using Spark SQL DDL: CREATE TABLE t USING parquet (path '/path/to/data') Michael
broken link on Spark Programming Guide
in the current Programming Guide: https://spark.apache.org/docs/1.3.0/programming-guide.html#actions under Actions, the Python link goes to: https://spark.apache.org/docs/1.3.0/api/python/pyspark.rdd.RDD-class.html which is 404 which I think should be: https://spark.apache.org/docs/1.3.0/api/python/index.html#org.apache.spark.rdd.RDD Thanks - Jonathan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/broken-link-on-Spark-Programming-Guide-tp22414.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: broken link on Spark Programming Guide
For the last link, you might have meant: https://spark.apache.org/docs/1.3.0/api/python/pyspark.html#pyspark.RDD Cheers On Tue, Apr 7, 2015 at 1:32 PM, jonathangreenleaf jonathangreenl...@gmail.com wrote: in the current Programming Guide: https://spark.apache.org/docs/1.3.0/programming-guide.html#actions under Actions, the Python link goes to: https://spark.apache.org/docs/1.3.0/api/python/pyspark.rdd.RDD-class.html which is 404 which I think should be: https://spark.apache.org/docs/1.3.0/api/python/index.html#org.apache.spark.rdd.RDD Thanks - Jonathan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/broken-link-on-Spark-Programming-Guide-tp22414.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 'Java heap space' error occured when query 4G data file from HDFS
Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :*spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.py* exception rised. *spark-defaults.conf* spark.master spark://cloud1:7077 spark.default.parallelism 100 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 5g spark.driver.maxResultSize 6g spark.kryoserializer.buffer.mb 256 spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4g spark.rdd.compress true spark.storage.memoryFraction 0 spark.akka.frameSize 50 spark.shuffle.compress true spark.shuffle.spill.compress false spark.local.dir /home/hadoop/tmp * spark-evn.sh* export SCALA=/home/hadoop/softsetup/scala export JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71 export SPARK_WORKER_CORES=1 export SPARK_WORKER_MEMORY=4g export HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=4g export SPARK_DRIVER_MEMORY=4g *Exception:* 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at
parquet partition discovery
I was unable to get this feature to work in 1.3.0. I tried building off master and it still wasn't working for me. So I dug into the code, and I'm not sure how the parsePartition() was ever working. The while loop which walks up the parent directories in the path always terminates after a single iteration. I made a minor change and the partition discovery appears to work now. Specifically, I changed var chopped = path while (!finished) { val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) maybeColumn.foreach(columns += _) chopped = chopped.getParent finished = maybeColumn.isEmpty || chopped.getParent == null } To var chopped = path while (chopped != null) { val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) maybeColumn.foreach(columns += _) chopped = chopped.getParent } Because the leaf nodes are always named data.parquet, this loop was terminating immediately after the first iteration. The only other thought I had is that the loop may have been intended to walk up the path until it stopped finding partition directories. In this case, the loop would work fine as is, but chopped should be initialized to path.getParent rather than path. I'm completely new to spark so it's possible that I misunderstood the intent here completely, but if not then I'm happy to open an issue and submit a pull request for whichever approach is the correct one. This e-mail and its attachments are intended only for the individual or entity to whom it is addressed and may contain information that is confidential, privileged, inside information, or subject to other restrictions on use or disclosure. Any unauthorized use, dissemination or copying of this transmission or the information in it is prohibited and may be unlawful. If you have received this transmission in error, please notify the sender immediately by return e-mail, and permanently delete or destroy this e-mail, any attachments, and all copies (digital or paper). Unless expressly stated in this e-mail, nothing in this message should be construed as a digital or electronic signature. For additional important disclaimers and disclosures regarding KCG's products and services, please click on the following link: http://www.kcg.com/legal/global-disclosures
RE: 'Java heap space' error occured when query 4G data file from HDFS
It is hard to guess why OOM happens without knowing your application's logic and the data size. Without knowing that, I can only guess based on some common experiences: 1) increase spark.default.parallelism2) Increase your executor-memory, maybe 6g is not just enough 3) Your environment is kind of unbalance between cup cores and available memory (8 cores vs 12G). Each core should have 3G for Spark.4) If you cache RDD, using MEMORY_ONLY_SER instead of MEMORY_ONLY5) Since your cores is much more compared with your available memory, lower the cores for executor by set -Dspark.deploy.defaultCores=. When you have not enough memory, reduce the concurrency of your executor, it will lower the memory requirement, with running in a slower speed. Yong Date: Wed, 8 Apr 2015 04:57:22 +0800 Subject: Re: 'Java heap space' error occured when query 4G data file from HDFS From: lidali...@gmail.com To: user@spark.apache.org Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.pyexception rised. spark-defaults.conf spark.master spark://cloud1:7077spark.default.parallelism 100spark.eventLog.enabled truespark.serializer org.apache.spark.serializer.KryoSerializerspark.driver.memory 5gspark.driver.maxResultSize 6gspark.kryoserializer.buffer.mb 256spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4gspark.rdd.compresstruespark.storage.memoryFraction 0spark.akka.frameSize 50spark.shuffle.compress truespark.shuffle.spill.compressfalsespark.local.dir /home/hadoop/tmp spark-evn.sh export SCALA=/home/hadoop/softsetup/scalaexport JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71export SPARK_WORKER_CORES=1export SPARK_WORKER_MEMORY=4gexport HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoopexport SPARK_EXECUTOR_MEMORY=4gexport SPARK_DRIVER_MEMORY=4g Exception: 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB)15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB)15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB)15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes)15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
Re: 'Java heap space' error occured when query 4G data file from HDFS
李铖: w.r.t. #5, you can use --executor-cores when invoking spark-submit Cheers On Tue, Apr 7, 2015 at 2:35 PM, java8964 java8...@hotmail.com wrote: It is hard to guess why OOM happens without knowing your application's logic and the data size. Without knowing that, I can only guess based on some common experiences: 1) increase spark.default.parallelism 2) Increase your executor-memory, maybe 6g is not just enough 3) Your environment is kind of unbalance between cup cores and available memory (8 cores vs 12G). Each core should have 3G for Spark. 4) If you cache RDD, using MEMORY_ONLY_SER instead of MEMORY_ONLY 5) Since your cores is much more compared with your available memory, lower the cores for executor by set -Dspark.deploy.defaultCores=. When you have not enough memory, reduce the concurrency of your executor, it will lower the memory requirement, with running in a slower speed. Yong -- Date: Wed, 8 Apr 2015 04:57:22 +0800 Subject: Re: 'Java heap space' error occured when query 4G data file from HDFS From: lidali...@gmail.com To: user@spark.apache.org Any help?please. Help me do a right configure. 李铖 lidali...@gmail.com于2015年4月7日星期二写道: In my dev-test env .I have 3 virtual machines ,every machine have 12G memory,8 cpu core. Here is spark-defaults.conf,and spark-env.sh.Maybe some config is not right. I run this command :*spark-submit --master yarn-client --driver-memory 7g --executor-memory 6g /home/hadoop/spark/main.py* exception rised. *spark-defaults.conf* spark.master spark://cloud1:7077 spark.default.parallelism 100 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 5g spark.driver.maxResultSize 6g spark.kryoserializer.buffer.mb 256 spark.kryoserializer.buffer.max.mb 512 spark.executor.memory 4g spark.rdd.compress true spark.storage.memoryFraction 0 spark.akka.frameSize 50 spark.shuffle.compress true spark.shuffle.spill.compress false spark.local.dir /home/hadoop/tmp * spark-evn.sh* export SCALA=/home/hadoop/softsetup/scala export JAVA_HOME=/home/hadoop/softsetup/jdk1.7.0_71 export SPARK_WORKER_CORES=1 export SPARK_WORKER_MEMORY=4g export HADOOP_CONF_DIR=/opt/cloud/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=4g export SPARK_DRIVER_MEMORY=4g *Exception:* 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_28 on disk on cloud3:38109 (size: 162.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 31.0 in stage 1.0 (TID 31, cloud3, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO BlockManagerInfo: Added taskresult_29 on disk on cloud2:49451 (size: 163.7 MB) 15/04/07 18:11:03 INFO TaskSetManager: Starting task 30.0 in stage 1.0 (TID 32, cloud2, NODE_LOCAL, 1296 bytes) 15/04/07 18:11:03 ERROR Utils: Uncaught exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:73) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread task-result-getter-0 java.lang.OutOfMemoryError: Java heap space at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:61) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at
Re: Array[T].distinct doesn't work inside RDD
Hi Sean, I didn't override hasCode. But the problem is that Array[T].toSet could work but Array[T].distinct couldn't. If it is because I didn't override hasCode, then toSet shouldn't work either right? I also tried using this Array[T].distinct outside RDD, and it is working alright also, returning me the same result as Array[T].toSet. Thanks! Anny On Tue, Apr 7, 2015 at 2:31 PM, Sean Owen so...@cloudera.com wrote: Did you override hashCode too? On Apr 7, 2015 2:39 PM, anny9699 anny9...@gmail.com wrote: Hi, I have a question about Array[T].distinct on customized class T. My data is a like RDD[(String, Array[T])] in which T is a class written by my class. There are some duplicates in each Array[T] so I want to remove them. I override the equals() method in T and use val dataNoDuplicates = dataDuplicates.map{case(id, arr) = (id, arr.distinct)} to remove duplicates inside RDD. However this doesn't work since I did some further tests by using val dataNoDuplicates = dataDuplicates.map{case(id, arr) = val uniqArr = arr.distinct if(uniqArr.length 1) println(uniqArr.head == uniqArr.last) (id, uniqArr) } And from the worker stdout I could see that it always returns TRUE results. I then tried removing duplicates by using Array[T].toSet instead of Array[T].distinct and it is working! Could anybody explain why the Array[T].toSet and Array[T].distinct behaves differently here? And Why is Array[T].distinct not working? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Array-T-distinct-doesn-t-work-inside-RDD-tp22412.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use Joda Time with Spark SQL?
I've been using Joda Time in all my spark jobs (by using the nscala-time package) and have not run into any issues until I started trying to use spark sql. When I try to convert a case class that has a com.github.nscala_time.time.Imports.DateTime object in it, an exception is thrown for with a MatchError My assumption is that this is because the basic types of spark sql are java.sql.Timestamp and java.sql.Date and therefor spark doesn't know what to do about the DateTime value. How can I get around this? I would prefer not to have to change my code to make the values be Timestamps but I'm concerned that might be the only way. Would something like implicit conversions work here? It seems that even if I specify the schema manually then I would still have the issue since you have to specify the column type which has to be of type org.apache.spark.sql.types.DataType -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Joda-Time-with-Spark-SQL-tp22415.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to run spark examples on cloudera
Hi There, We’ve just started to trial out Spark at Bitly. We are running Spark 1.2.1 on Cloudera(CDH-5.3.0) with Hadoop 2.5.0 and am running into issues even just trying to run the python examples. Its just being run in standalone mode i believe. $ ./bin/spark-submit —driver-memory 2g examples/src/main/python/pi.py error’s out with: 15/04/07 22:06:07 INFO DAGScheduler: Job 0 failed: reduce at /app/bitly/local/spark/examples/src/main/python/pi.py:38, took 14.660785 s Traceback (most recent call last): File /app/bitly/local/spark/examples/src/main/python/pi.py, line 38, in module count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add) File /bitly/local/spark/python/pyspark/rdd.py, line 715, in reduce vals = self.mapPartitions(func).collect() File /bitly/local/spark/python/pyspark/rdd.py, line 676, in collect bytesInJava = self._jrdd.collect().iterator() File /bitly/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /bitly/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o24.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 7,[hadoop13.b.del.bitly.net](http://hadoop13.b.del.bitly.net/)): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:170) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110) ... 10 more Driver stacktrace: at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.dagscheduler.org/)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any ideas? Let me know if you need anymore information. Thanks! Georgi -- Georgi Knox Application Engineer, Bitly http://bit.ly/bitly_website Twitter http://bit.ly/bitly_twitter | Facebook http://bit.ly/bitly_facebook | Github https://github.com/bitly @GeorgiCodes https://twitter.com/Georgicodes
Re: ML consumption time based on data volume - same cluster
This could be empirically verified in spark-perf: https://github.com/databricks/spark-perf. Theoretically, it would be 2x for k-means and logistic regression, because computation is doubled but communication cost remains the same. -Xiangrui On Tue, Apr 7, 2015 at 7:15 AM, Vasyl Harasymiv vasyl.harasy...@gmail.com wrote: Hi Spark Community, Imagine you have a stable computing cluster (e.g. 5 nodes) with Hadoop that does not run anything that your Spark jobs. Now imagine you run simple machine learning on the data (e.g. 100MB): K-means - 5 min Logistic regression - 5 min Now imagine that the volume of your data has doubled 2x to 200MB and it is still distributed around those available 5 nodes. Now, how much more time would this computation take now ? I presume more than 2x e.g. K-Means 25 min, and logistic regression 20 min? Just want to have an understanding how data growth would impact computational peformance for ML (any model in your experience is fine). Since my gut feeling if data increases 2x, the computation on the same cluster would increase 2x. Thank you! Vasyl - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Job submission API
The following might be helpful. http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/What-dependencies-to-submit-Spark-jobs-programmatically-not-via/td-p/24721 http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/ On 7 April 2015 at 16:32, michal.klo...@gmail.com michal.klo...@gmail.com wrote: A SparkContext can submit jobs remotely. The spark-submit options in general can be populated into a SparkConf and passed in when you create a SparkContext. We personally have not had too much success with yarn-client remote submission, but standalone cluster mode was easy to get going. M On Apr 7, 2015, at 7:01 PM, Prashant Kommireddi prash1...@gmail.com wrote: Hello folks, Newbie here! Just had a quick question - is there a job submission API such as the one with hadoop https://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/Job.html#submit() to submit Spark jobs to a Yarn cluster? I see in example that bin/spark-submit is what's out there, but couldn't find any APIs around it. Thanks, Prashant -- Regards vybs
Specifying Spark property from command line?
Hi, Is it possible to specify a Spark property like spark.local.dir from the command line when running an application using spark-submit? Thanks, arun
Job submission API
Hello folks, Newbie here! Just had a quick question - is there a job submission API such as the one with hadoop https://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/Job.html#submit() to submit Spark jobs to a Yarn cluster? I see in example that bin/spark-submit is what's out there, but couldn't find any APIs around it. Thanks, Prashant
Re: Job submission API
A SparkContext can submit jobs remotely. The spark-submit options in general can be populated into a SparkConf and passed in when you create a SparkContext. We personally have not had too much success with yarn-client remote submission, but standalone cluster mode was easy to get going. M On Apr 7, 2015, at 7:01 PM, Prashant Kommireddi prash1...@gmail.com wrote: Hello folks, Newbie here! Just had a quick question - is there a job submission API such as the one with hadoop https://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/Job.html#submit() to submit Spark jobs to a Yarn cluster? I see in example that bin/spark-submit is what's out there, but couldn't find any APIs around it. Thanks, Prashant
Re: ML consumption time based on data volume - same cluster
Thank you Xiangrui, Indeed, however, if the computation involves taking matrix, even locally, like random forest, if data increases 2x, even local computation time should increase 2x. But I will test it with the Spark Perf and let you know! On Tue, Apr 7, 2015 at 4:50 PM, Xiangrui Meng men...@gmail.com wrote: This could be empirically verified in spark-perf: https://github.com/databricks/spark-perf. Theoretically, it would be 2x for k-means and logistic regression, because computation is doubled but communication cost remains the same. -Xiangrui On Tue, Apr 7, 2015 at 7:15 AM, Vasyl Harasymiv vasyl.harasy...@gmail.com wrote: Hi Spark Community, Imagine you have a stable computing cluster (e.g. 5 nodes) with Hadoop that does not run anything that your Spark jobs. Now imagine you run simple machine learning on the data (e.g. 100MB): K-means - 5 min Logistic regression - 5 min Now imagine that the volume of your data has doubled 2x to 200MB and it is still distributed around those available 5 nodes. Now, how much more time would this computation take now ? I presume more than 2x e.g. K-Means 25 min, and logistic regression 20 min? Just want to have an understanding how data growth would impact computational peformance for ML (any model in your experience is fine). Since my gut feeling if data increases 2x, the computation on the same cluster would increase 2x. Thank you! Vasyl
Re: Job submission API
Hello, If you are looking for the command to submit the following command works: spark-submit --class SampleTest --master yarn-cluster --num-executors 4 --executor-cores 2 /home/priya/Spark/Func1/target/scala-2.10/simple-project_2.10-1.0.jar On Tue, Apr 7, 2015 at 6:36 PM, Veena Basavaraj vybs.apa...@gmail.com wrote: The following might be helpful. http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/What-dependencies-to-submit-Spark-jobs-programmatically-not-via/td-p/24721 http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/ On 7 April 2015 at 16:32, michal.klo...@gmail.com michal.klo...@gmail.com wrote: A SparkContext can submit jobs remotely. The spark-submit options in general can be populated into a SparkConf and passed in when you create a SparkContext. We personally have not had too much success with yarn-client remote submission, but standalone cluster mode was easy to get going. M On Apr 7, 2015, at 7:01 PM, Prashant Kommireddi prash1...@gmail.com wrote: Hello folks, Newbie here! Just had a quick question - is there a job submission API such as the one with hadoop https://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/Job.html#submit() to submit Spark jobs to a Yarn cluster? I see in example that bin/spark-submit is what's out there, but couldn't find any APIs around it. Thanks, Prashant -- Regards vybs -- Regards, Haripriya Ayyalasomayajula Graduate Student Department of Computer Science University of Houston Contact : 650-796-7112
Re: DataFrame groupBy MapType
Thanks Michael. Will submit a ticket. Justin On Mon, Apr 6, 2015 at 1:53 PM, Michael Armbrust mich...@databricks.com wrote: I'll add that I don't think there is a convenient way to do this in the Column API ATM, but would welcome a JIRA for adding it :) On Mon, Apr 6, 2015 at 1:45 PM, Michael Armbrust mich...@databricks.com wrote: In HiveQL, you should be able to express this as: SELECT ... FROM table GROUP BY m['SomeKey'] On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
Error when running Spark on Windows 8.1
Hi, We are trying to run a Spark application using spark-submit on Windows 8.1. The application runs successfully to completion on MacOS 10.10 and on Ubuntu Linux. On Windows, we get the following error messages (see below). It appears that Spark is trying to delete some temporary directory that it creates. How do we solve this problem? Thanks, arun 5/04/07 10:55:14 ERROR Utils: Exception while deleting Spark temp dir: C:\Users\JOSHMC~1\AppData\Local\Temp\spark-339bf2d9-8b89-46e9-b5c1-404caf9d3cd7\userFiles-62976ef7-ab56-41c0-a35b-793c7dca31c7 java.io.IOException: Failed to delete: C:\Users\JOSHMC~1\AppData\Local\Temp\spark-339bf2d9-8b89-46e9-b5c1-404caf9d3cd7\userFiles-62976ef7-ab56-41c0-a35b-793c7dca31c7 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:932) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:181) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:179) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:179) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177)
Re: Specifying Spark property from command line?
I just figured this out from the documentation: --conf spark.local.dir=C:\Temp On Tue, Apr 7, 2015 at 5:00 PM, Arun Lists lists.a...@gmail.com wrote: Hi, Is it possible to specify a Spark property like spark.local.dir from the command line when running an application using spark-submit? Thanks, arun
Drools in Spark
Hello, Just want to check if anyone has tried drools with Spark? Please let me know. Are there any alternate rule engine that works well with Spark? Thanks Sathish
Expected behavior for DataFrame.unionAll
Hello, I am experimenting with DataFrame. I tried to construct two DataFrames with: 1. case class A(a: Int, b: String) scala adf.printSchema() root |-- a: integer (nullable = false) |-- b: string (nullable = true) 2. case class B(a: String, c: Int) scala bdf.printSchema() root |-- a: string (nullable = true) |-- c: integer (nullable = false) Then I unioned the these two DataFrame with the unionAll function, and I get the following schema. It is kind of a mixture of A and B. scala val udf = adf.unionAll(bdf) scala udf.printSchema() root |-- a: string (nullable = false) |-- b: string (nullable = true) The unionAll documentation says it behaves like the SQL UNION ALL function. However, unioning incompatible types is not well defined for SQL. Is there any expected behavior for unioning incompatible data frames? Thanks. Justin
Re: Array[T].distinct doesn't work inside RDD
I suppose it depends a lot on the implementations. In general, distinct and toSet work when hashCode and equals are defined correctly. When that isn't the case, the result isn't defined; it might happen to work in some cases. This could well explain why you see different results. Why not implement hashCode() to see if that's the solution? certainly, in general, you must do this for correctness. On Tue, Apr 7, 2015 at 5:41 PM, Anny Chen anny9...@gmail.com wrote: Hi Sean, I didn't override hasCode. But the problem is that Array[T].toSet could work but Array[T].distinct couldn't. If it is because I didn't override hasCode, then toSet shouldn't work either right? I also tried using this Array[T].distinct outside RDD, and it is working alright also, returning me the same result as Array[T].toSet. Thanks! Anny On Tue, Apr 7, 2015 at 2:31 PM, Sean Owen so...@cloudera.com wrote: Did you override hashCode too? On Apr 7, 2015 2:39 PM, anny9699 anny9...@gmail.com wrote: Hi, I have a question about Array[T].distinct on customized class T. My data is a like RDD[(String, Array[T])] in which T is a class written by my class. There are some duplicates in each Array[T] so I want to remove them. I override the equals() method in T and use val dataNoDuplicates = dataDuplicates.map{case(id, arr) = (id, arr.distinct)} to remove duplicates inside RDD. However this doesn't work since I did some further tests by using val dataNoDuplicates = dataDuplicates.map{case(id, arr) = val uniqArr = arr.distinct if(uniqArr.length 1) println(uniqArr.head == uniqArr.last) (id, uniqArr) } And from the worker stdout I could see that it always returns TRUE results. I then tried removing duplicates by using Array[T].toSet instead of Array[T].distinct and it is working! Could anybody explain why the Array[T].toSet and Array[T].distinct behaves differently here? And Why is Array[T].distinct not working? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Array-T-distinct-doesn-t-work-inside-RDD-tp22412.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Timeout errors from Akka in Spark 1.2.1
I have a standalone and local Spark streaming process where we are reading inputs using FlumeUtils. Our longest window size is 6 hours. After about a day and a half of running without any issues, we start seeing Timeout errors while cleaning up input blocks. This seems to cause reading from Flume to cease. ERROR sparkDriver-akka.actor.default-dispatcher-78 BlockManagerSlaveActor.logError - Error in removing block input-0-1428182594000 org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, localhost, 55067),input-0-1428182594000,StorageLevel(false, false, false, false, 1),0,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) ... 17 more There was a similar query posted here http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html but did not find any resolution to that issue. Thanks in advance, NB
Re: Spark TeraSort source request
+1. I would love to have the code for this as well. Pramod On Fri, Apr 3, 2015 at 12:47 PM, Tom thubregt...@gmail.com wrote: Hi all, As we all know, Spark has set the record for sorting data, as published on: https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html. Here at our group, we would love to verify these results, and compare machine using this benchmark. We've spend quite some time trying to find the terasort source code that was used, but can not find it anywhere. We did find two candidates: A version posted by Reynold [1], the posted of the message above. This version is stuck at // TODO: Add partition-local (external) sorting using TeraSortRecordOrdering, only generating data. Here, Ewan noticed that it didn't appear to be similar to Hadoop TeraSort. [2] After this he created a version on his own [3]. With this version, we noticed problems with TeraValidate with datasets above ~10G (as mentioned by others at [4]. When examining the raw input and output files, it actually appears that the input data is sorted and the output data unsorted in both cases. Because of this, we believe we did not yet find the actual used source code. I've tried to search in the Spark User forum archive's, seeing request of people, indicating a demand, but did not succeed in finding the actual source code. My question: Could you guys please make the source code of the used TeraSort program, preferably with settings, available? If not, what are the reasons that this seems to be withheld? Thanks for any help, Tom Hubregtsen [1] https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87 [2] http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E [3] https://github.com/ehiggs/spark-terasort [4] http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: broken link on Spark Programming Guide
I fixed this a while ago in master. It should go out with the next release and next push of the site. On Tue, Apr 7, 2015 at 4:32 PM, jonathangreenleaf jonathangreenl...@gmail.com wrote: in the current Programming Guide: https://spark.apache.org/docs/1.3.0/programming-guide.html#actions under Actions, the Python link goes to: https://spark.apache.org/docs/1.3.0/api/python/pyspark.rdd.RDD-class.html which is 404 which I think should be: https://spark.apache.org/docs/1.3.0/api/python/index.html#org.apache.spark.rdd.RDD Thanks - Jonathan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/broken-link-on-Spark-Programming-Guide-tp22414.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HiveThriftServer2
Hi - I want to create an instance of HiveThriftServer2 in my Scala application, so I imported the following line: import org.apache.spark.sql.hive.thriftserver._ However, when I compile the code, I get the following error: object thriftserver is not a member of package org.apache.spark.sql.hive I tried to include the following in build.sbt, but it looks like it is not published: org.apache.spark %% spark-hive-thriftserver % 1.3.0, What library dependency do I need to include in my build.sbt to use the ThriftServer2 object? Thanks, Mohammed
Re: broken link on Spark Programming Guide
Awesome. thank you! On Apr 7, 2015 8:55 PM, Sean Owen so...@cloudera.com wrote: I fixed this a while ago in master. It should go out with the next release and next push of the site. On Tue, Apr 7, 2015 at 4:32 PM, jonathangreenleaf jonathangreenl...@gmail.com wrote: in the current Programming Guide: https://spark.apache.org/docs/1.3.0/programming-guide.html#actions under Actions, the Python link goes to: https://spark.apache.org/docs/1.3.0/api/python/pyspark.rdd.RDD-class.html which is 404 which I think should be: https://spark.apache.org/docs/1.3.0/api/python/index.html#org.apache.spark.rdd.RDD Thanks - Jonathan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/broken-link-on-Spark-Programming-Guide-tp22414.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int)]
Hello Everyone, I am trying to implement this example (Spark Streaming with Twitter). https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala I am able to do: hashTags.print() to get a live stream of filtered hashtags, but I get these warnings, not sure if they're related to the error: WARN BlockManager: Block input-0-1428450594600 replicated to only 0 peer(s) instead of 1 peers then when I try to print out topCounts60 or topCounts10, I get this error when building: /home/ec2-user/sparkApps/TwitterApp/src/main/scala/TwitterPopularTags.scala:35: error: value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int)] [INFO] val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) = (count, topic)}.transform(_.sortByKey(false)) Thank you for the help! Best, Su - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kinesis
Hey y'all, While I haven't been able to get Spark + Kinesis integration working, I pivoted to plan B: I now push data to S3 where I set up a DStream to monitor an S3 bucket with textFileStream, and that works great. I 3 Spark! Best, Vadim ᐧ On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, I am wondering, has anyone on this list been able to successfully implement Spark on top of Kinesis? Best, Vadim On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim --- Time: 142825409 ms --- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 142825409 ms.0 from job set of time 142825409 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 142825409 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 142825409 ms *** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(142825407 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */ if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming
DataFrame degraded performance after DataFrame.cache
Hello, I have a parquet file of around 55M rows (~ 1G on disk). Performing simple grouping operation is pretty efficient (I get results within 10 seconds). However, after called DataFrame.cache, I observe a significant performance degrade, the same operation now takes 3+ minutes. My hunch is that DataFrame cannot leverage its columnar format after persisting in memory. But cannot find anywhere from the doc mentioning this. Did I miss anything? Thanks! Justin
Re: DataFrame degraded performance after DataFrame.cache
Hi Justin, Does the schema of your data have any decimal, array, map, or struct type? Thanks, Yin On Tue, Apr 7, 2015 at 6:31 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a parquet file of around 55M rows (~ 1G on disk). Performing simple grouping operation is pretty efficient (I get results within 10 seconds). However, after called DataFrame.cache, I observe a significant performance degrade, the same operation now takes 3+ minutes. My hunch is that DataFrame cannot leverage its columnar format after persisting in memory. But cannot find anywhere from the doc mentioning this. Did I miss anything? Thanks! Justin
Re: DataFrame degraded performance after DataFrame.cache
The schema has a StructType. Justin On Tue, Apr 7, 2015 at 6:58 PM, Yin Huai yh...@databricks.com wrote: Hi Justin, Does the schema of your data have any decimal, array, map, or struct type? Thanks, Yin On Tue, Apr 7, 2015 at 6:31 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a parquet file of around 55M rows (~ 1G on disk). Performing simple grouping operation is pretty efficient (I get results within 10 seconds). However, after called DataFrame.cache, I observe a significant performance degrade, the same operation now takes 3+ minutes. My hunch is that DataFrame cannot leverage its columnar format after persisting in memory. But cannot find anywhere from the doc mentioning this. Did I miss anything? Thanks! Justin
Cannot change the memory of workers
Hi guys, Currently I am running Spark program on Amazon EC2. Each worker has around (less than but near to )2 gb memory. By default, I can see each worker is allocated 976 mb memory as the table shows below on Spark WEB UI. I know this value is from (Total memory minus 1 GB). But I want more than 1 gb in each of my worker. AddressStateCoresMemory ALIVE1 (0 Used)976.0 MB (0.0 B Used)Based on the instruction on Spark website, I made export SPARK_WORKER_MEMORY=1g in spark-env.sh. But it doesn't work. BTW, I can set SPARK_EXECUTOR_MEMORY=1g and it works. Can anyone help me? Is there a requirement that one worker must maintain 1 gb memory for itself aside from the memory for Spark? Thanks, Jia
Caching and Actions
I understand that RDDs are not created until an action is called. Is it a correct conclusion that it doesn't matter if .cache is used anywhere in the program if I only have one action that is called only once? Related to this question, consider this situation: val d1 = data.map((x,y,z) = (x,y)) val d2 = data.map((x,y,z) = (y,x)) I'm wondering if Spark is optimizing the execution in a way that the mappers for d1 and d2 are running in parallel and the data RDD is traversed only once. If that is not the case, would it make a difference to cache the data RDD, like this: data.cache() val d1 = data.map((x,y,z) = (x,y)) val d2 = data.map((x,y,z) = (y,x)) Furthermore, consider: val d3 = d2.map((x,y) = (y,x)) d2 and d3 are equivalent. What implementation should be preferred? Thx. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-and-Actions-tp22418.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to specify multiple directories as input
Hello, I have two HDFS directories each containing multiple avro files. I want to specify these two directories as input. In Hadoop world, one can specify list of comma separated directories. In Spark that does not work. Logs 15/04/07 21:10:11 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0 15/04/07 21:10:11 INFO spark.SparkContext: Created broadcast 2 from sequenceFile at DataUtil.scala:120 15/04/07 21:10:11 ERROR yarn.ApplicationMaster: User class threw exception: Input path does not exist: hdfs://namenode_host_name:8020/user/dvasthimal/epdatasets_small/exptsession/2015/04/06,/user/dvasthimal/epdatasets_small/exptsession/2015/04/07 org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://namenode_host_name:8020/user/dvasthimal/epdatasets_small/exptsession/2015/04/06,/user/dvasthimal/epdatasets_small/exptsession/2015/04/07 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:320) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:263) Input Code: sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path) Here path is: /user/dvasthimal/epdatasets_small/exptsession/2015/04/06,/user/dvasthimal/epdatasets_small/exptsession/2015/04/07 -- Deepak
Re: Unable to specify multiple directories as input
Spark Version 1.3 Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-company-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-company/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-company/share/hadoop/hdfs/hadoop-hdfs-2.4.1-company-2.jar --num-executors 100 --driver-memory 4g --driver-java-options -XX:MaxPermSize=4G --executor-memory 8g --executor-cores 1 --queue hdmi-express --class com. company.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem On Wed, Apr 8, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello, I have two HDFS directories each containing multiple avro files. I want to specify these two directories as input. In Hadoop world, one can specify list of comma separated directories. In Spark that does not work. Logs 15/04/07 21:10:11 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0 15/04/07 21:10:11 INFO spark.SparkContext: Created broadcast 2 from sequenceFile at DataUtil.scala:120 15/04/07 21:10:11 ERROR yarn.ApplicationMaster: User class threw exception: Input path does not exist: hdfs://namenode_host_name:8020/user/dvasthimal/epdatasets_small/exptsession/2015/04/06,/user/dvasthimal/epdatasets_small/exptsession/2015/04/07 org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://namenode_host_name:8020/user/dvasthimal/epdatasets_small/exptsession/2015/04/06,/user/dvasthimal/epdatasets_small/exptsession/2015/04/07 at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:320) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:263) Input Code: sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path) Here path is: /user/dvasthimal/epdatasets_small/exptsession/2015/04/06,/user/dvasthimal/epdatasets_small/exptsession/2015/04/07 -- Deepak -- Deepak
Re: DataFrame degraded performance after DataFrame.cache
Thanks for the explanation Yin. Justin On Tue, Apr 7, 2015 at 7:36 PM, Yin Huai yh...@databricks.com wrote: I think the slowness is caused by the way that we serialize/deserialize the value of a complex type. I have opened https://issues.apache.org/jira/browse/SPARK-6759 to track the improvement. On Tue, Apr 7, 2015 at 6:59 PM, Justin Yip yipjus...@prediction.io wrote: The schema has a StructType. Justin On Tue, Apr 7, 2015 at 6:58 PM, Yin Huai yh...@databricks.com wrote: Hi Justin, Does the schema of your data have any decimal, array, map, or struct type? Thanks, Yin On Tue, Apr 7, 2015 at 6:31 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a parquet file of around 55M rows (~ 1G on disk). Performing simple grouping operation is pretty efficient (I get results within 10 seconds). However, after called DataFrame.cache, I observe a significant performance degrade, the same operation now takes 3+ minutes. My hunch is that DataFrame cannot leverage its columnar format after persisting in memory. But cannot find anywhere from the doc mentioning this. Did I miss anything? Thanks! Justin