Implementing FIRST_VALUE, LEAD, LAG in Spark
Hello, To convert existing Map Reduce jobs to Spark, I need to implement window functions such as FIRST_VALUE, LEAD, LAG and so on. For example, FIRST_VALUE function: Source (1st column is key): A, A1 A, A2 A, A3 B, B1 B, B2 C, C1 and the result should be A, A1, A1 A, A2, A1 A, A3, A1 B, B1, B1 B, B2, B1 C, C1, C1 You can see that the first value in a group is repeated in each row. My current Spark/Scala code: def firstValue(b: Iterable[String]) : List[(String, String)] = { val c = scala.collection.mutable.MutableList[(String, String)]() var f = b.foreach(d = { if(f.isEmpty()) f = d; c += d - f}) c.toList } val data=sc.parallelize(List( (A, A1), (A, A2), (A, A3), (B, B1), (B, B2), (C, C1))) data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect So I create a new list after groupByKey. Is it right approach to do this in Spark? Are there any other options? Please point me to any drawbacks. Thanks, Dmitry
Interact with streams in a non-blocking way
Hi all, I am trying to come up with a workflow where I can query streams asynchronously. The problem I have is a ssc.awaitTermination() line blocks the whole thread, so it is not straightforward to me whether it is possible to get hold of objects from streams once they are started. any suggestion on what is the best way to implement this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Interact with streams in a non-blocking way
You call awaitTermination() in the main thread, and indeed it blocks there forever. From there Spark Streaming takes over, and is invoking the operations you set up. Your operations have access to the data of course. That's the model; you don't make external threads that reach in to Spark Streaming's objects, but can easily create operations that take whatever actions you want and invoke them in Streaming. On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote: Hi all, I am trying to come up with a workflow where I can query streams asynchronously. The problem I have is a ssc.awaitTermination() line blocks the whole thread, so it is not straightforward to me whether it is possible to get hold of objects from streams once they are started. any suggestion on what is the best way to implement this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can spark job server be used to visualize streaming data?
Frankly no good/standard way to visualize streaming data. So far I have found HBase as good intermediate store to store data from streams visualize it by a play based framework d3.js. Regards Mayur On Fri Feb 13 2015 at 4:22:58 PM Kevin (Sangwoo) Kim kevin...@apache.org wrote: I'm not very sure for CDH 5.3, but now Zeppelin works for Spark 1.2 as spark-repl has been published in Spark 1.2.1 Please try again! On Fri Feb 13 2015 at 3:55:19 PM Su She suhsheka...@gmail.com wrote: Thanks Kevin for the link, I have had issues trying to install zeppelin as I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please correct me if I am mistaken. On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim kevin...@apache.org wrote: Apache Zeppelin also has a scheduler and then you can reload your chart periodically, Check it out: http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito silvio.fior...@granturing.com wrote: One method I’ve used is to publish each batch to a message bus or queue with a custom UI listening on the other end, displaying the results in d3.js or some other app. As far as I’m aware there isn’t a tool that will directly take a DStream. Spark Notebook seems to have some support for updating graphs periodically. I haven’t used it myself yet so not sure how well it works. See here: https://github.com/andypetrella/spark-notebook From: Su She Date: Thursday, February 12, 2015 at 1:55 AM To: Felix C Cc: Kelvin Chu, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hello Felix, I am already streaming in very simple data using Kafka (few messages / second, each record only has 3 columns...really simple, but looking to scale once I connect everything). I am processing it in Spark Streaming and am currently writing word counts to hdfs. So the part where I am confused is... Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data - Spark Word Count - *How do I visualize?* is there a viz tool that I can set up to visualize JavaPairDStreams? or do I have to write to hbase/hdfs first? Thanks! On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com wrote: What kind of data do you have? Kafka is a popular source to use with spark streaming. But, spark streaming also support reading from a file. Its called basic source https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers --- Original Message --- From: Su She suhsheka...@gmail.com Sent: February 11, 2015 10:23 AM To: Felix C felixcheun...@hotmail.com Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib. It seems the best way to stream data is by storing in hbase and then using an api in my viz to extract data? Does anyone have any thoughts on this? Thanks! On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com wrote: Checkout https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html In there are links to how that is done. --- Original Message --- From: Kelvin Chu 2dot7kel...@gmail.com Sent: February 10, 2015 12:48 PM To: Su She suhsheka...@gmail.com Cc: user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hi Su, Out of the box, no. But, I know people integrate it with Spark Streaming to do real-time visualization. It will take some work though. Kelvin On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
SparkSQL and star schema
Hi, is SparkSQL + Parquet suitable to replicate a star schema ? Paolo Platter AgileLab CTO
Re: An interesting and serious problem I encountered
A number of comments: 310GB is probably too large for an executor. You probably want many smaller executors per machine. But this is not your problem. You didn't say where the OutOfMemoryError occurred. Executor or driver? Tuple2 is a Scala type, and a general type. It is appropriate for general pairs. You're asking about optimizing for a primitive array, yes, but of course Spark handles other types. I don't quite understand your test result. An array doesn't change size because it's referred to in a Tuple2. You are still dealing with a primitive array. There is no general answer to your question. Usually you have to consider the overhead of Java references, which does matter significantly, but there is no constant multiplier of course. It's up to you if it matters to implement more efficient data structures. Here however you're using just about the most efficient rep of an array of integers. I think you have plenty of memory in general, so the question is what was throwing the memory error? I'd also confirm that the configuration your executors actually used is what you expect to rule out config problems. On Fri, Feb 13, 2015 at 6:26 AM, Landmark fangyixiang...@gmail.com wrote: Hi foks, My Spark cluster has 8 machines, each of which has 377GB physical memory, and thus the total maximum memory can be used for Spark is more than 2400+GB. In my program, I have to deal with 1 billion of (key, value) pairs, where the key is an integer and the value is an integer array with 43 elements. Therefore, the memory cost of this raw dataset is [(1+43) * 10 * 4] / (1024 * 1024 * 1024) = 164GB. Since I have to use this dataset repeatedly, I have to cache it in memory. Some key parameter settings are: spark.storage.fraction=0.6 spark.driver.memory=30GB spark.executor.memory=310GB. But it failed on running a simple countByKey() and the error message is java.lang.OutOfMemoryError: Java heap space Does this mean a Spark cluster of 2400+GB memory cannot keep 164GB raw data in memory? The codes of my program is as follows: def main(args: Array[String]):Unit = { val sc = new SparkContext(new SparkConfig()); val rdd = sc.parallelize(0 until 10, 25600).map(i = (i, new Array[Int](43))).cache(); println(The number of keys is + rdd.countByKey()); //some other operations following here ... } To figure out the issue, I evaluated the memory cost of key-value pairs and computed their memory cost using SizeOf.jar. The codes are as follows: val arr = new Array[Int](43); println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr))); val tuple = (1, arr.clone); println(SizeOf.humanReadable(SizeOf.deepSizeOf(tuple))); The output is: 192.0b 992.0b *Hard to believe, but it is true!! This result means, to store a key-value pair, Tuple2 needs more than 5+ times memory than the simplest method with array. Even though it may take 5+ times memory, its size is less than 1000GB, which is still much less than the total memory size of my cluster, i.e., 2400+GB. I really do not understand why this happened.* BTW, if the number of pairs is 1 million, it works well. If the arr contains only 1 integer, to store a pair, Tuples needs around 10 times memory. So I have some questions: 1. Why does Spark choose such a poor data structure, Tuple2, for key-value pairs? Is there any better data structure for storing (key, value) pairs with less memory cost ? 2. Given a dataset with size of M, in general Spark how many times of memory to handle it? Best, Landmark -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Distributed Join
Hello, I have the following scenario and was wondering if I can use Spark to address it. I want to query two different data stores (say, ElasticSearch and MySQL) and then merge the two result sets based on a join key between the two. Is it appropriate to use Spark to do this join, if the intermediate data sets are large? (This is a No-ETL scenario) I was thinking of two possibilities - 1) Send the intermediate data sets to Spark through a stream and get Spark to do the join. The complexity here is that there would be multiple concurrent streams to deal with. If I don't use streams, there would be intermediate disk writes and data transfer to the Spark master. 2) Don't use Spark and do the same with some in-memory distributed engine like MemSQL or Redis. What's the experts' view on this? Regards, Ashish
Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]
It is a wrapper whose API is logically the same, but whose method signature make more sense in Java. You can call the Scala API in Java without too much trouble, but it gets messy when you have to manually grapple with ClassTag from Java for example. There is not an implicit conversion since it is used from Java, which doesn't have implicits. On Fri, Feb 13, 2015 at 5:57 AM, Vladimir Protsenko protsenk...@gmail.com wrote: Thank's for reply. I solved porblem with importing org.apache.spark.SparkContext._ by Imran Rashid suggestion. In the sake of interest, does JavaPairRDD intended for use from java? What is the purpose of this class? Does my rdd implicitly converted to it in some circumstances? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can spark job server be used to visualize streaming data?
I'm not very sure for CDH 5.3, but now Zeppelin works for Spark 1.2 as spark-repl has been published in Spark 1.2.1 Please try again! On Fri Feb 13 2015 at 3:55:19 PM Su She suhsheka...@gmail.com wrote: Thanks Kevin for the link, I have had issues trying to install zeppelin as I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please correct me if I am mistaken. On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim kevin...@apache.org wrote: Apache Zeppelin also has a scheduler and then you can reload your chart periodically, Check it out: http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito silvio.fior...@granturing.com wrote: One method I’ve used is to publish each batch to a message bus or queue with a custom UI listening on the other end, displaying the results in d3.js or some other app. As far as I’m aware there isn’t a tool that will directly take a DStream. Spark Notebook seems to have some support for updating graphs periodically. I haven’t used it myself yet so not sure how well it works. See here: https://github.com/andypetrella/spark-notebook From: Su She Date: Thursday, February 12, 2015 at 1:55 AM To: Felix C Cc: Kelvin Chu, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hello Felix, I am already streaming in very simple data using Kafka (few messages / second, each record only has 3 columns...really simple, but looking to scale once I connect everything). I am processing it in Spark Streaming and am currently writing word counts to hdfs. So the part where I am confused is... Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data - Spark Word Count - *How do I visualize?* is there a viz tool that I can set up to visualize JavaPairDStreams? or do I have to write to hbase/hdfs first? Thanks! On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com wrote: What kind of data do you have? Kafka is a popular source to use with spark streaming. But, spark streaming also support reading from a file. Its called basic source https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers --- Original Message --- From: Su She suhsheka...@gmail.com Sent: February 11, 2015 10:23 AM To: Felix C felixcheun...@hotmail.com Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib. It seems the best way to stream data is by storing in hbase and then using an api in my viz to extract data? Does anyone have any thoughts on this? Thanks! On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com wrote: Checkout https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html In there are links to how that is done. --- Original Message --- From: Kelvin Chu 2dot7kel...@gmail.com Sent: February 10, 2015 12:48 PM To: Su She suhsheka...@gmail.com Cc: user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hi Su, Out of the box, no. But, I know people integrate it with Spark Streaming to do real-time visualization. It will take some work though. Kelvin On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
Tuning number of partitions per CPU
Hello, In Spark programming guide (http://spark.apache.org/docs/1.2.0/programming-guide.html) there is a recommendation: Typically you want 2-4 partitions for each CPU in your cluster. We have a Spark Master and two Spark workers each with 18 cores and 18 GB of RAM. In our application we use JdbcRDD to load data from a DB and then cache it. We load entities from a single table, now we have 76 million of entities (entity size in memory is about 160 bytes). We call count() during application startup to force entities loading. Here are our measurements for count() operation (cores x partitions = time): 36x36 = 6.5 min 36x72 = 7.7 min 36x108 = 9.4 min So despite recommendations the most efficient setup is one partition per core. What is the reason for above recommendation? Java 8, Apache Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tuning-number-of-partitions-per-CPU-tp21642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Tuning number of partitions per CPU
18 cores or 36? doesn't probably matter. For this case where you have some overhead per partition of setting up the DB connection, it may indeed not help to chop up the data more finely than your total parallelism. Although that would imply quite an overhead. Are you doing any other expensive initialization per partition in your code? You might check some other basic things, like, are you bottlenecked on the DB (probably not) and are there task stragglers drawing out the completion time. On Fri, Feb 13, 2015 at 11:06 AM, Igor Petrov igorpetrov...@gmail.com wrote: Hello, In Spark programming guide (http://spark.apache.org/docs/1.2.0/programming-guide.html) there is a recommendation: Typically you want 2-4 partitions for each CPU in your cluster. We have a Spark Master and two Spark workers each with 18 cores and 18 GB of RAM. In our application we use JdbcRDD to load data from a DB and then cache it. We load entities from a single table, now we have 76 million of entities (entity size in memory is about 160 bytes). We call count() during application startup to force entities loading. Here are our measurements for count() operation (cores x partitions = time): 36x36 = 6.5 min 36x72 = 7.7 min 36x108 = 9.4 min So despite recommendations the most efficient setup is one partition per core. What is the reason for above recommendation? Java 8, Apache Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tuning-number-of-partitions-per-CPU-tp21642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Which version to use for shuffle service if I'm going to run multiple versions of Spark
Get it. Thanks Reynold and Andrew! Jianshi On Thu, Feb 12, 2015 at 12:25 AM, Andrew Or and...@databricks.com wrote: Hi Jianshi, For YARN, there may be an issue with how a recently patch changes the accessibility of the shuffle files by the external shuffle service: https://issues.apache.org/jira/browse/SPARK-5655. It is likely that you will hit this with 1.2.1, actually. For this reason I would have to recommend that you use 1.2.2 when it is released, but for now you should use 1.2.0 for this specific use case. -Andrew 2015-02-10 23:38 GMT-08:00 Reynold Xin r...@databricks.com: I think we made the binary protocol compatible across all versions, so you should be fine with using any one of them. 1.2.1 is probably the best since it is the most recent stable release. On Tue, Feb 10, 2015 at 8:43 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I need to use branch-1.2 and sometimes master builds of Spark for my project. However the officially supported Spark version by our Hadoop admin is only 1.2.0. So, my question is which version/build of spark-yarn-shuffle.jar should I use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Shuffle on joining two RDDs
Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(64) scala d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(100) val badJoined = d.join(d3) d.setName(d) d2.setName(d2) d3.setName(d3) joined.setName(joined) badJoined.setName(badJoined) //unfortunatley, just looking at the immediate dependencies of joined badJoined is misleading, b/c join actually creates // one more step after the shuffle scala joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)} } //full dependencies of the good join scala flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at console:16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at console:16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at console:12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at console:12,org.apache.spark.OneToOneDependency@7bc172ec) (d2 ShuffledRDD[6] at groupByKey at console:12,org.apache.spark.ShuffleDependency@5960236d) (MappedRDD[5] at map at console:12,org.apache.spark.OneToOneDependency@36b5f6f2) //full dependencies of the bad join -- notice the ShuffleDependency! scala flattenDeps(badJoined).foreach{println} (badJoined FlatMappedValuesRDD[15] at join at console:16,org.apache.spark.OneToOneDependency@1cf356cc) (MappedValuesRDD[14] at join at console:16,org.apache.spark.OneToOneDependency@5dea4db) (CoGroupedRDD[13] at join at console:16,org.apache.spark.ShuffleDependency@5c1928df) (CoGroupedRDD[13] at join at console:16,org.apache.spark.OneToOneDependency@77ca77b5) (d ShuffledRDD[3] at groupByKey at console:12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at console:12,org.apache.spark.OneToOneDependency@7bc172ec) (d3 ShuffledRDD[12] at groupByKey at console:12,org.apache.spark.ShuffleDependency@d794984) (MappedRDD[11] at map at console:12,org.apache.spark.OneToOneDependency@15c98005) On Thu, Feb 12, 2015 at 10:05 AM, Karlson ksonsp...@siberie.de wrote: Hi Imran, thanks for your quick reply. Actually I am doing this: rddA = rddA.partitionBy(n).cache() rddB = rddB.partitionBy(n).cache() followed by rddA.count() rddB.count() then joinedRDD =
Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]
Thank you for clarification, Sean. 2015-02-13 14:16 GMT+04:00 Sean Owen so...@cloudera.com: It is a wrapper whose API is logically the same, but whose method signature make more sense in Java. You can call the Scala API in Java without too much trouble, but it gets messy when you have to manually grapple with ClassTag from Java for example. There is not an implicit conversion since it is used from Java, which doesn't have implicits. On Fri, Feb 13, 2015 at 5:57 AM, Vladimir Protsenko protsenk...@gmail.com wrote: Thank's for reply. I solved porblem with importing org.apache.spark.SparkContext._ by Imran Rashid suggestion. In the sake of interest, does JavaPairRDD intended for use from java? What is the purpose of this class? Does my rdd implicitly converted to it in some circumstances?
Re: Interact with streams in a non-blocking way
Thanks for the reply, I am trying to setup a streaming as a service approach, using the framework that is used for spark-jobserver. for that I would need to handle asynchronous operations that are initiated from outside of the stream. Do you think it is not possible? On Fri Feb 13 2015 at 10:14:18 Sean Owen so...@cloudera.com wrote: You call awaitTermination() in the main thread, and indeed it blocks there forever. From there Spark Streaming takes over, and is invoking the operations you set up. Your operations have access to the data of course. That's the model; you don't make external threads that reach in to Spark Streaming's objects, but can easily create operations that take whatever actions you want and invoke them in Streaming. On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote: Hi all, I am trying to come up with a workflow where I can query streams asynchronously. The problem I have is a ssc.awaitTermination() line blocks the whole thread, so it is not straightforward to me whether it is possible to get hold of objects from streams once they are started. any suggestion on what is the best way to implement this? thanks, -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Interact-with-streams-in-a- non-blocking-way-tp21640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Interact with streams in a non-blocking way
Sure it's possible, but you would use Streaming to update some shared state, and create another service that accessed that shared state too. On Fri, Feb 13, 2015 at 11:57 AM, Tamas Jambor jambo...@gmail.com wrote: Thanks for the reply, I am trying to setup a streaming as a service approach, using the framework that is used for spark-jobserver. for that I would need to handle asynchronous operations that are initiated from outside of the stream. Do you think it is not possible? On Fri Feb 13 2015 at 10:14:18 Sean Owen so...@cloudera.com wrote: You call awaitTermination() in the main thread, and indeed it blocks there forever. From there Spark Streaming takes over, and is invoking the operations you set up. Your operations have access to the data of course. That's the model; you don't make external threads that reach in to Spark Streaming's objects, but can easily create operations that take whatever actions you want and invoke them in Streaming. On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote: Hi all, I am trying to come up with a workflow where I can query streams asynchronously. The problem I have is a ssc.awaitTermination() line blocks the whole thread, so it is not straightforward to me whether it is possible to get hold of objects from streams once they are started. any suggestion on what is the best way to implement this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle on joining two RDDs
In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, wouldn't it help to change the lines vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) to vs = rdd.mapValues(lambda v: (1, v)) ws = other.mapValues(lambda v: (2, v)) ? As I understand, this would preserve the original partitioning. On 2015-02-13 12:43, Karlson wrote: Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(64) scala d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(100) val badJoined = d.join(d3) d.setName(d) d2.setName(d2) d3.setName(d3) joined.setName(joined) badJoined.setName(badJoined) //unfortunatley, just looking at the immediate dependencies of joined badJoined is misleading, b/c join actually creates // one more step after the shuffle scala joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)} } //full dependencies of the good join scala flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at console:16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at console:16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at console:12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at console:12,org.apache.spark.OneToOneDependency@7bc172ec) (d2 ShuffledRDD[6] at groupByKey at console:12,org.apache.spark.ShuffleDependency@5960236d) (MappedRDD[5] at map at console:12,org.apache.spark.OneToOneDependency@36b5f6f2) //full dependencies of the bad join -- notice the ShuffleDependency! scala flattenDeps(badJoined).foreach{println} (badJoined FlatMappedValuesRDD[15] at join at console:16,org.apache.spark.OneToOneDependency@1cf356cc) (MappedValuesRDD[14] at join at console:16,org.apache.spark.OneToOneDependency@5dea4db) (CoGroupedRDD[13] at join at console:16,org.apache.spark.ShuffleDependency@5c1928df) (CoGroupedRDD[13] at join at console:16,org.apache.spark.OneToOneDependency@77ca77b5) (d ShuffledRDD[3] at groupByKey at console:12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at console:12,org.apache.spark.OneToOneDependency@7bc172ec) (d3 ShuffledRDD[12] at groupByKey at
how to get SchemaRDD SQL exceptions i.e. table not found exception
Hi, can some one guide how to get SQL Exception trapped for query executed using SchemaRDD, i mean suppose table not found thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-get-SchemaRDD-SQL-exceptions-i-e-table-not-found-exception-tp21645.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Tuning number of partitions per CPU
Use below configuration if u r using 1.2 version:- SET spark.shuffle.consolidateFiles=true; SET spark.rdd.compress=true; SET spark.default.parallelism=1000; SET spark.deploy.defaultCores=54; Thanks Puneet. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 13, 2015 4:46 PM To: Igor Petrov Cc: user@spark.apache.org Subject: Re: Tuning number of partitions per CPU 18 cores or 36? doesn't probably matter. For this case where you have some overhead per partition of setting up the DB connection, it may indeed not help to chop up the data more finely than your total parallelism. Although that would imply quite an overhead. Are you doing any other expensive initialization per partition in your code? You might check some other basic things, like, are you bottlenecked on the DB (probably not) and are there task stragglers drawing out the completion time. On Fri, Feb 13, 2015 at 11:06 AM, Igor Petrov igorpetrov...@gmail.com wrote: Hello, In Spark programming guide (http://spark.apache.org/docs/1.2.0/programming-guide.html) there is a recommendation: Typically you want 2-4 partitions for each CPU in your cluster. We have a Spark Master and two Spark workers each with 18 cores and 18 GB of RAM. In our application we use JdbcRDD to load data from a DB and then cache it. We load entities from a single table, now we have 76 million of entities (entity size in memory is about 160 bytes). We call count() during application startup to force entities loading. Here are our measurements for count() operation (cores x partitions = time): 36x36 = 6.5 min 36x72 = 7.7 min 36x108 = 9.4 min So despite recommendations the most efficient setup is one partition per core. What is the reason for above recommendation? Java 8, Apache Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tuning-number-of-p artitions-per-CPU-tp21642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Columnar-Oriented RDDs
Hi all, I'd like to build/use column oriented RDDs in some of my Spark code. A normal Spark RDD is stored as row oriented object if I understand correctly. I'd like to leverage some of the advantages of a columnar memory format. Shark (used to) and SparkSQL uses a columnar storage format using primitive arrays for each column. I'd be interested to know more about this approach and how I could build my own custom columnar-oriented RDD which I can use outside of Spark SQL. Could anyone give me some pointers on where to look to do something like this, either from scratch or using whats there in the SparkSQL libs or elsewhere. I know Evan Chan in a presentation made mention of building a custom RDD of column-oriented blocks of data. Cheers, ~N
Re: running spark project using java -cp command
You can also export the variable SPARK_PRINT_LAUNCH_COMMAND before launching a spark-submit command to display the java command that will be launched, e.g.: export SPARK_PRINT_LAUNCH_COMMAND=1 /opt/spark/bin/spark-submit --master yarn --deploy-mode cluster --class kelkoo.SparkAppTemplate --jars hdfs://prod-cluster/user/preaudc/jars/apps/joda-convert-1.6.jar,hdfs://prod-cluster/user/preaudc/jars/apps/joda-time-2.3.jar,hdfs://prod-cluster/user/preaudc/jars/apps/logReader-1.0.22.jar --driver-memory 512M --driver-library-path /opt/hadoop/lib/native --driver-class-path /usr/share/java/mysql-connector-java.jar --executor-memory 1G --executor-cores 1 --queue spark-batch --num-executors 2 hdfs://prod-cluster/user/preaudc/jars/apps/logProcessing-1.0.10.jar --log_dir /user/kookel/logs --country fr a b c Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: /usr/lib/jvm/java-openjdk/bin/java -cp :/usr/share/java/mysql-connector-java.jar:/opt/spark/conf:/opt/spark/lib/spark-assembly-hadoop.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/etc/hadoop:/etc/hadoop -XX:MaxPermSize=128m -Djava.library.path=/opt/hadoop/lib/native -Xms512M -Xmx512M org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --class kelkoo.SparkAppTemplate --jars hdfs://prod-cluster/user/preaudc/jars/apps/joda-convert-1.6.jar,hdfs://prod-cluster/user/preaudc/jars/apps/joda-time-2.3.jar,hdfs://prod-cluster/user/preaudc/jars/apps/logReader-1.0.22.jar --driver-memory 512M --driver-library-path /opt/hadoop/lib/native --driver-class-path /usr/share/java/mysql-connector-java.jar --executor-memory 1G --executor-cores 1 --queue spark-batch --num-executors 2 hdfs://prod-cluster/user/preaudc/jars/apps/logProcessing-1.0.10.jar --log_dir /user /kookel/logs --country fr a b c (...) Christophe. On 10/02/2015 07:26, Akhil Das wrote: Yes like this: /usr/lib/jvm/java-7-openjdk-i386/bin/java -cp ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main spark-shell It launches spark-shell. Thanks Best Regards On Tue, Feb 10, 2015 at 11:36 AM, Hafiz Mujadid hafizmujadi...@gmail.commailto:hafizmujadi...@gmail.com wrote: hi experts! Is there any way to run spark application using java -cp command ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/running-spark-project-using-java-cp-command-tp21567.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 158 Ter Rue du Temple 75003 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.
Documentation error in MLlib - Clustering?
Hello, I was trying the streaming kmeans clustering example in the official documentation at: http://spark.apache.org/docs/1.2.0/mllib-clustering.html But I've got a type error when I tried to compile the code: [error] found : org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint][error] required: org.apache.spark.streaming.dstream.DStream[(?, org.apache.spark.mllib.linalg.Vector)][error] model.predictOnValues(testData).print()[error] ^[error] one error found[error] (compile:compile) Compilation failed And it seems like the solution is to use model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print () as shown in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala instead of model.predictOnValues(testData).print() as written in the documentation. I just wanted to draw the attention to this, so that one of the maintainers can fix the documentation. -- Emre Sevinç
Re: Counters in Spark
Except that transformations don't have an exactly-once guarantee, so this way of doing counters may produce different answers across various forms of failures and speculative execution. On Fri, Feb 13, 2015 at 8:56 AM, Sean McNamara sean.mcnam...@webtrends.com wrote: .map is just a transformation, so no work will actually be performed until something takes action against it. Try adding a .count(), like so: inputRDD.map { x = { counter += 1 } }.count() In case it is helpful, here are the docs on what exactly the transformations and actions are: http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations http://spark.apache.org/docs/1.2.0/programming-guide.html#actions Cheers, Sean On Feb 13, 2015, at 9:50 AM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement counters in Spark and I guess Accumulators are the way to do it. My motive is to update a counter in map function and access/reset it in the driver code. However the /println/ statement at the end still yields value 0(It should 9). Am I doing something wrong? def main(args : Array[String]){ val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching) val sc = new SparkContext(conf) var counter = sc.accumulable(0, Counter) var inputFilePath = args(0) val inputRDD = sc.textFile(inputFilePath) inputRDD.map { x = { counter += 1 } } println(counter.value) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Documentation error in MLlib - Clustering?
Agree, it's correct in the linear methods doc page too. You can open a PR for this simple 'typo' fix; I imagine it just wasn't updated or fixed with the others. On Fri, Feb 13, 2015 at 4:26 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I was trying the streaming kmeans clustering example in the official documentation at: http://spark.apache.org/docs/1.2.0/mllib-clustering.html But I've got a type error when I tried to compile the code: [error] found : org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint] [error] required: org.apache.spark.streaming.dstream.DStream[(?, org.apache.spark.mllib.linalg.Vector)] [error] model.predictOnValues(testData).print() [error] ^ [error] one error found [error] (compile:compile) Compilation failed And it seems like the solution is to use model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() as shown in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala instead of model.predictOnValues(testData).print() as written in the documentation. I just wanted to draw the attention to this, so that one of the maintainers can fix the documentation. -- Emre Sevinç - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Counters in Spark
I am trying to implement counters in Spark and I guess Accumulators are the way to do it. My motive is to update a counter in map function and access/reset it in the driver code. However the /println/ statement at the end still yields value 0(It should 9). Am I doing something wrong? def main(args : Array[String]){ val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching) val sc = new SparkContext(conf) var counter = sc.accumulable(0, Counter) var inputFilePath = args(0) val inputRDD = sc.textFile(inputFilePath) inputRDD.map { x = { counter += 1 } } println(counter.value) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Counters in Spark
.map is just a transformation, so no work will actually be performed until something takes action against it. Try adding a .count(), like so: inputRDD.map { x = { counter += 1 } }.count() In case it is helpful, here are the docs on what exactly the transformations and actions are: http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations http://spark.apache.org/docs/1.2.0/programming-guide.html#actions Cheers, Sean On Feb 13, 2015, at 9:50 AM, nitinkak001 nitinkak...@gmail.commailto:nitinkak...@gmail.com wrote: I am trying to implement counters in Spark and I guess Accumulators are the way to do it. My motive is to update a counter in map function and access/reset it in the driver code. However the /println/ statement at the end still yields value 0(It should 9). Am I doing something wrong? def main(args : Array[String]){ val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching) val sc = new SparkContext(conf) var counter = sc.accumulable(0, Counter) var inputFilePath = args(0) val inputRDD = sc.textFile(inputFilePath) inputRDD.map { x = { counter += 1 } } println(counter.value) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Cluster launch
I'm playing around with Spark on Windows and have to worker nodes running by starting them manually using a script that contains the following set SPARK_HOME=C:\dev\programs\spark-1.2.0 set SPARK_MASTER_IP=master.brad.com spark-class org.apache.spark.deploy.worker.Worker spark://master.brad.com:7077 I then create a copy of this script with a different SPARK_HOME defined to run my second worker from. This works, however I am unable to configure a different log4j.properties file for each worker which is a pain as they then both log to the same log file. The issue seems to be that it's the conf directory on the master that sends the JVM arguments to each worker to execute a Task. Therefore the same -Dlog4j.configuration=file:... argument is sent to both. Does anyone know how the JVM command sent to the workers form the master can read an environment variable, so this can be defined per worker? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cluster-launch-tp1484p21647.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: An interesting and serious problem I encountered
Hi, I believe SizeOf.jar may calculate the wrong size for you. Spark has a util call SizeEstimator located in org.apache.spark.util.SizeEstimator. And some one extracted it out in https://github.com/phatak-dev/java-sizeof/blob/master/src/main/scala/com/madhukaraphatak/sizeof/SizeEstimator.scala You can try that out in the scala repl. The size for Array[Int](43) is 192bytes (12 bytes object size + 4 bytes length variable + (43 * 4 round to 176 bytes)) And the size for (1, Array[Int](43)) is 240 bytes { Tuple2 Object: 12 bytes object size + 4 bytes filed _1 + 4 byes field _2 = round to 24 bytes 1 = java.lang.Number 12 bytes = round to 16 bytes - java.lang.Integer: 16 bytes + 4 bytes int = round to 24 bytes ( Integer extends Number. I thought Scala Tuple2 will specialized Int and this should be 4, but it seems not) Array = 192 bytes } So, 24 + 24 + 192 = 240 bytes. This is my calculation based on the spark SizeEstimator. However I am not sure what an Integer will occupy for 64 bits JVM with compressedOps on. It should be 12 + 4 = 16 bytes, then that means the SizeEstimator gives the wrong result. @Sean what do you think? -- Ye Xianjin Sent with Sparrow (http://www.sparrowmailapp.com/?sig) On Friday, February 13, 2015 at 2:26 PM, Landmark wrote: Hi foks, My Spark cluster has 8 machines, each of which has 377GB physical memory, and thus the total maximum memory can be used for Spark is more than 2400+GB. In my program, I have to deal with 1 billion of (key, value) pairs, where the key is an integer and the value is an integer array with 43 elements. Therefore, the memory cost of this raw dataset is [(1+43) * 10 * 4] / (1024 * 1024 * 1024) = 164GB. Since I have to use this dataset repeatedly, I have to cache it in memory. Some key parameter settings are: spark.storage.fraction=0.6 spark.driver.memory=30GB spark.executor.memory=310GB. But it failed on running a simple countByKey() and the error message is java.lang.OutOfMemoryError: Java heap space Does this mean a Spark cluster of 2400+GB memory cannot keep 164GB raw data in memory? The codes of my program is as follows: def main(args: Array[String]):Unit = { val sc = new SparkContext(new SparkConfig()); val rdd = sc.parallelize(0 until 10, 25600).map(i = (i, new Array[Int](43))).cache(); println(The number of keys is + rdd.countByKey()); //some other operations following here ... } To figure out the issue, I evaluated the memory cost of key-value pairs and computed their memory cost using SizeOf.jar. The codes are as follows: val arr = new Array[Int](43); println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr))); val tuple = (1, arr.clone); println(SizeOf.humanReadable(SizeOf.deepSizeOf(tuple))); The output is: 192.0b 992.0b *Hard to believe, but it is true!! This result means, to store a key-value pair, Tuple2 needs more than 5+ times memory than the simplest method with array. Even though it may take 5+ times memory, its size is less than 1000GB, which is still much less than the total memory size of my cluster, i.e., 2400+GB. I really do not understand why this happened.* BTW, if the number of pairs is 1 million, it works well. If the arr contains only 1 integer, to store a pair, Tuples needs around 10 times memory. So I have some questions: 1. Why does Spark choose such a poor data structure, Tuple2, for key-value pairs? Is there any better data structure for storing (key, value) pairs with less memory cost ? 2. Given a dataset with size of M, in general Spark how many times of memory to handle it? Best, Landmark -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
How to union RDD and remove duplicated keys
I have multiple RDD[(String, String)] that store (docId, docText) pairs, e.g. rdd1: (id1, Long text 1), (id2, Long text 2), (id3, Long text 3) rdd2: (id1, Long text 1 A), (id2, Long text 2 A) rdd3: (id1, Long text 1 B) Then, I want to merge all RDDs. If there is duplicated docids, later RDD should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1 for id1 and id2, then rdd3 will overwrite rdd2 for id1. The final merged rdd should be rddFinal: (id1, Long text 1 B), (id2, Long text 2 A), (id3, Long text 3) Note that I have many such RDDs and each rdd have lots of elements. How can I do it efficiently? Ningjun
Size exceeds Integer.MAX_VALUE exception when broadcasting large variable
I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get the following exception when the size of the broadcast variable exceeds 2GB. Any ideas on how I can resolve this issue? java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to union RDD and remove duplicated keys
reducebyKey should work, but you need to define the ordering by using some sort of index. On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have multiple RDD[(String, String)] that store (docId, docText) pairs, e.g. rdd1: (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text 3”) rdd2: (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”) rdd3: (“id1”, “Long text 1 B”) Then, I want to merge all RDDs. If there is duplicated docids, later RDD should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final merged rdd should be rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”, “Long text 3”) Note that I have many such RDDs and each rdd have lots of elements. How can I do it efficiently? Ningjun
Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable
I think you've hit the nail on the head. Since the serialization ultimately creates a byte array, and arrays can have at most ~2 billion elements in the JVM, the broadcast can be at most ~2GB. At that scale, you might consider whether you really have to broadcast these values, or want to handle them as RDDs and join and so on. Or consider whether you can break it up into several broadcasts? On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote: I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get the following exception when the size of the broadcast variable exceeds 2GB. Any ideas on how I can resolve this issue? java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle on joining two RDDs
yeah I thought the same thing at first too, I suggested something equivalent w/ preservesPartitioning = true, but that isn't enough. the join is done by union-ing the two transformed rdds, which is very different from the way it works under the hood in scala to enable narrow dependencies. It really needs a bigger change to pyspark. I filed this issue: https://issues.apache.org/jira/browse/SPARK-5785 (and the somewhat related issue about documentation: https://issues.apache.org/jira/browse/SPARK-5786) partitioning should still work in pyspark, you still need some notion of distributing work, and the pyspark functions have a partitionFunc to decide that. But, I am not an authority on pyspark, so perhaps there are more holes I'm not aware of ... Imran On Fri, Feb 13, 2015 at 8:36 AM, Karlson ksonsp...@siberie.de wrote: In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, wouldn't it help to change the lines vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) to vs = rdd.mapValues(lambda v: (1, v)) ws = other.mapValues(lambda v: (2, v)) ? As I understand, this would preserve the original partitioning. On 2015-02-13 12:43, Karlson wrote: Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(64) scala d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(100) val badJoined = d.join(d3) d.setName(d) d2.setName(d2) d3.setName(d3) joined.setName(joined) badJoined.setName(badJoined) //unfortunatley, just looking at the immediate dependencies of joined badJoined is misleading, b/c join actually creates // one more step after the shuffle scala joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)} } //full dependencies of the good join scala flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at console:16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at console:16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at console:12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at
Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable
unfortunately this is a known issue: https://issues.apache.org/jira/browse/SPARK-1476 as Sean suggested, you need to think of some other way of doing the same thing, even if its just breaking your one big broadcast var into a few smaller ones On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen so...@cloudera.com wrote: I think you've hit the nail on the head. Since the serialization ultimately creates a byte array, and arrays can have at most ~2 billion elements in the JVM, the broadcast can be at most ~2GB. At that scale, you might consider whether you really have to broadcast these values, or want to handle them as RDDs and join and so on. Or consider whether you can break it up into several broadcasts? On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote: I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get the following exception when the size of the broadcast variable exceeds 2GB. Any ideas on how I can resolve this issue? java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL doesn't seem to like $'s in column names
Try using `backticks` to escape non-standard characters. On Fri, Feb 13, 2015 at 11:30 AM, Corey Nolet cjno...@gmail.com wrote: I don't remember Oracle ever enforcing that I couldn't include a $ in a column name, but I also don't thinking I've ever tried. When using sqlContext.sql(...), I have a SELECT * from myTable WHERE locations_$homeAddress = '123 Elm St' It's telling me $ is invalid. Is this a bug?
Spark standalone and HDFS 2.6
Hi guys, Probably a dummy question. Do you know how to compile Spark 0.9 to easily integrate with HDFS 2.6.0 ? I was trying sbt/sbt -Pyarn -Phadoop-2.6 assembly ormvn -Dhadoop.version=2.6.0 -DskipTests clean package but none of these approaches succeeded. Thanks,Robert
SparkSQL doesn't seem to like $'s in column names
I don't remember Oracle ever enforcing that I couldn't include a $ in a column name, but I also don't thinking I've ever tried. When using sqlContext.sql(...), I have a SELECT * from myTable WHERE locations_$homeAddress = '123 Elm St' It's telling me $ is invalid. Is this a bug?
New ColumnType For Decimal Caching
Thanks Michael for the pointer Sorry for the delayed reply. Taking a quick inventory of scope of change - Is the column type for Decimal caching needed only in the caching layer (4 files in org.apache.spark.sql.columnar - ColumnAccessor.scala, ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala) Or do other SQL components also need to be touched ? Hoping for a quick feedback of top of your head ... Thanks, On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust mich...@databricks.com wrote: You could add a new ColumnType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala . PRs welcome :) On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: Columnar-Oriented RDDs
Shark's in-memory code was ported to Spark SQL and is used by default when you run .cache on a SchemaRDD or CACHE TABLE. I'd also look at parquet which is more efficient and handles nested data better. On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf nightwolf...@gmail.com wrote: Hi all, I'd like to build/use column oriented RDDs in some of my Spark code. A normal Spark RDD is stored as row oriented object if I understand correctly. I'd like to leverage some of the advantages of a columnar memory format. Shark (used to) and SparkSQL uses a columnar storage format using primitive arrays for each column. I'd be interested to know more about this approach and how I could build my own custom columnar-oriented RDD which I can use outside of Spark SQL. Could anyone give me some pointers on where to look to do something like this, either from scratch or using whats there in the SparkSQL libs or elsewhere. I know Evan Chan in a presentation made mention of building a custom RDD of column-oriented blocks of data. Cheers, ~N
Has Spark 1.2.0 changed EC2 persistent-hdfs?
I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour appears to have changed. My launch script is spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5 --ebs-vol-size=1000 launch myproject When I ssh into master I get: $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 2.9G 5.0G 37% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.3G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 33M 1000G 1% /vol0 that /vol0 is the place I want (and assume) persistent-hdfs to go. But when I look at the size I get: $ persistent-hdfs/bin/start-all.sh $ persistent-hdfs/bin/hadoop dfsadmin -report Warning: $HADOOP_HOME is deprecated. Configured Capacity: 42275430400 (39.37 GB) Present Capacity: 2644878 (24.63 GB) DFS Remaining: 26448601088 (24.63 GB) DFS Used: 143360 (140 KB) DFS Used%: 0% Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 - Datanodes available: 5 (5 total, 0 dead) Name: 10.46.11.156:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165372416 (2.95 GB) DFS Remaining: 5289684992(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.41.51.155:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165364224 (2.95 GB) DFS Remaining: 5289693184(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.38.30.254:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165249536 (2.95 GB) DFS Remaining: 5289807872(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.204.134.84:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165343744 (2.95 GB) DFS Remaining: 5289713664(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.33.15.134:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165356032 (2.95 GB) DFS Remaining: 5289701376(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 That's tiny. My suspicions are aroused when I see: $ ls /vol persistent-hdfs /vol is on the small /dev/xvda1 not the large EBS /dev/xvds I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change the volume: property namehadoop.tmp.dir/name value/vol0/persistent-hdfs/value !-- was /vol/persistent-hdfs -- /property And then persistent-hdfs/bin/stop-all.sh persistent-hdfs/bin/start-all.sh but when I do that, the persistent HDFS won't start for whatever reason. I can't run $ persistent-hdfs/bin/hadoop dfsadmin -report 15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 0 time(s). 15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 1 time(s). 15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 2 time(s). So, it looks like I can't use the EBS for persistent-hdfs. I was doing it before, so something must have changed in the last couple of weeks (last time I was using 1.1.0). Is this a bug? Has the behaviour of AWS changed? Am I doing something stupid? How do I fix it? Thanks in advance! Joe
Re: Spark standalone and HDFS 2.6
I am trying to run BlinkDB(https://github.com/sameeragarwal/blinkdb) which seems to work only with Spark 0.9. However, if I want to access HDFS I need to compile Spark against Hadoop version which is running on my cluster(2.6.0). Hence, the versions problem ... On Friday, February 13, 2015 11:28 AM, Sean Owen so...@cloudera.com wrote: Oh right, you said Spark 0.9. Those profiles won't exist back then. I don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles were introduced later to fix up some compatibility. Why not use 1.2.1? On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote: Thanks Sean for your prompt response. I was trying to compile as following: mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and all hadoop jars are in .m2 repository. Do you have any idea what might happens ? Robert [WARNING] Class com.google.protobuf.Parser not found - continuing with a stub. [ERROR] error while loading RpcResponseHeaderProto, class file '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)' is broken (class java.lang.NullPointerException/null) [WARNING] one warning found [ERROR] one error found [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .. SUCCESS [2.537s] [INFO] Spark Project Core FAILURE [25.917s] [INFO] Spark Project Bagel ... SKIPPED [INFO] Spark Project GraphX .. SKIPPED [INFO] Spark Project ML Library .. SKIPPED [INFO] Spark Project Streaming ... SKIPPED [INFO] Spark Project Tools ... SKIPPED [INFO] Spark Project REPL SKIPPED [INFO] Spark Project Assembly SKIPPED [INFO] Spark Project External Twitter SKIPPED [INFO] Spark Project External Kafka .. SKIPPED [INFO] Spark Project External Flume .. SKIPPED [INFO] Spark Project External ZeroMQ . SKIPPED [INFO] Spark Project External MQTT ... SKIPPED [INFO] Spark Project Examples SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 30.002s [INFO] Finished at: Fri Feb 13 11:21:36 PST 2015 [INFO] Final Memory: 49M/1226M [INFO] [WARNING] The requested profile hadoop-2.4 could not be activated because it does not exist. [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.1.5:compile (scala-compile-first) on project spark-core_2.10: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.1.5:compile failed. CompileFailed - [Help 1] On Friday, February 13, 2015 11:16 AM, Sean Owen so...@cloudera.com wrote: If you just need standalone mode, you don't need -Pyarn. There is no -Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set -Dhadoop.version=2.6.0. That should be it. If that still doesn't work, define doesn't succeed. On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert rgra...@yahoo.com.invalid wrote: Hi guys, Probably a dummy question. Do you know how to compile Spark 0.9 to easily integrate with HDFS 2.6.0 ? I was trying sbt/sbt -Pyarn -Phadoop-2.6 assembly or mvn -Dhadoop.version=2.6.0 -DskipTests clean package but none of these approaches succeeded. Thanks, Robert On Friday, February 13, 2015 11:28 AM, Sean Owen so...@cloudera.com wrote: Oh right, you said Spark 0.9. Those profiles won't exist back then. I don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles were introduced later to fix up some compatibility. Why not use 1.2.1? On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote: Thanks Sean for your prompt response. I was trying to compile as following: mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and all hadoop jars are in .m2 repository. Do you have any idea what might happens ? Robert [WARNING] Class com.google.protobuf.Parser not found - continuing with a stub. [ERROR] error while loading RpcResponseHeaderProto, class file '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)' is broken (class
Re: NaiveBayes classifier causes ShuffleDependency class cast exception
You cannot have two Spark Contexts in the same JVM active at the same time. Just create one SparkContext and then use it for both purpose. TD On Fri, Feb 6, 2015 at 8:49 PM, VISHNU SUBRAMANIAN johnfedrickena...@gmail.com wrote: Can you try creating just a single spark context and then try your code. If you want to use it for streaming pass the same sparkcontext object instead of conf. Note: Instead of just replying to me , try to use reply to all so that the post is visible for the community . That way you can expect immediate responses. On Fri, Feb 6, 2015 at 6:09 AM, aanilpala aanilp...@gmail.com wrote: I have the following code: SparkConf conf = new SparkConf().setAppName(streamer).setMaster(local[2]); conf.set(spark.driver.allowMultipleContexts, true); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(batch_interval)); ssc.checkpoint(/tmp/spark/checkpoint); SparkConf conf2 = new SparkConf().setAppName(classifier).setMaster(local[1]); conf2.set(spark.driver.allowMultipleContexts, true); JavaSparkContext sc = new JavaSparkContext(conf); JavaReceiverInputDStreamString stream = ssc.socketTextStream(localhost, ); // String to Tuple3 Conversion JavaDStreamTuple3lt;Long, String, String tuple_stream = stream.map(new FunctionString, Tuple3lt;Long, String, String() { ... }); JavaPairDStreamInteger, DictionaryEntry raw_dictionary_stream = tuple_stream.filter(new FunctionTuple3lt;Long, String,String, Boolean() { @Override public Boolean call(Tuple3Long, String,String tuple) throws Exception { if((tuple._1()/Time.scaling_factor % training_interval) training_dur) NaiveBayes.train(sc.parallelize(training_set).rdd()); return true; } }). I am working on a text mining project and I want to use NaiveBayesClassifier of MLlib to classify some stream items. So, I have two Spark contexts one of which is a streaming context. The call to NaiveBayes.train causes the following exception. Any ideas? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.SparkContext$$anonfun$runJob$4 cannot be cast to org.apache.spark.ShuffleDependency at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
Re: Counters in Spark
this is more-or-less the best you can do now, but as has been pointed out, accumulators don't quite fit the bill for counters. There is an open issue to do something better, but no progress on that so far https://issues.apache.org/jira/browse/SPARK-603 On Fri, Feb 13, 2015 at 11:12 AM, Mark Hamstra m...@clearstorydata.com wrote: Except that transformations don't have an exactly-once guarantee, so this way of doing counters may produce different answers across various forms of failures and speculative execution. On Fri, Feb 13, 2015 at 8:56 AM, Sean McNamara sean.mcnam...@webtrends.com wrote: .map is just a transformation, so no work will actually be performed until something takes action against it. Try adding a .count(), like so: inputRDD.map { x = { counter += 1 } }.count() In case it is helpful, here are the docs on what exactly the transformations and actions are: http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations http://spark.apache.org/docs/1.2.0/programming-guide.html#actions Cheers, Sean On Feb 13, 2015, at 9:50 AM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement counters in Spark and I guess Accumulators are the way to do it. My motive is to update a counter in map function and access/reset it in the driver code. However the /println/ statement at the end still yields value 0(It should 9). Am I doing something wrong? def main(args : Array[String]){ val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching) val sc = new SparkContext(conf) var counter = sc.accumulable(0, Counter) var inputFilePath = args(0) val inputRDD = sc.textFile(inputFilePath) inputRDD.map { x = { counter += 1 } } println(counter.value) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark standalone and HDFS 2.6
If you just need standalone mode, you don't need -Pyarn. There is no -Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set -Dhadoop.version=2.6.0. That should be it. If that still doesn't work, define doesn't succeed. On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert rgra...@yahoo.com.invalid wrote: Hi guys, Probably a dummy question. Do you know how to compile Spark 0.9 to easily integrate with HDFS 2.6.0 ? I was trying sbt/sbt -Pyarn -Phadoop-2.6 assembly or mvn -Dhadoop.version=2.6.0 -DskipTests clean package but none of these approaches succeeded. Thanks, Robert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Interact with streams in a non-blocking way
Here is an example of how you can do. Lets say myDStream contains the data that you may want to asynchornously query, say using, Spark SQL. val sqlContext = new SqlContext(streamingContext.sparkContext) myDStream.foreachRDD { rdd = // rdd is a RDD of case class sqlContext.registerRDDAsTable(rdd, mytable) } This will make sure that the table mytable always refers to the latest RDD generated by the DStream. Then from a diffrent thread you can asynchronously query sqlContext.sql(select * from mytable) Hope this helps. TD On Fri, Feb 13, 2015 at 3:59 AM, Sean Owen so...@cloudera.com wrote: Sure it's possible, but you would use Streaming to update some shared state, and create another service that accessed that shared state too. On Fri, Feb 13, 2015 at 11:57 AM, Tamas Jambor jambo...@gmail.com wrote: Thanks for the reply, I am trying to setup a streaming as a service approach, using the framework that is used for spark-jobserver. for that I would need to handle asynchronous operations that are initiated from outside of the stream. Do you think it is not possible? On Fri Feb 13 2015 at 10:14:18 Sean Owen so...@cloudera.com wrote: You call awaitTermination() in the main thread, and indeed it blocks there forever. From there Spark Streaming takes over, and is invoking the operations you set up. Your operations have access to the data of course. That's the model; you don't make external threads that reach in to Spark Streaming's objects, but can easily create operations that take whatever actions you want and invoke them in Streaming. On Fri, Feb 13, 2015 at 10:04 AM, jamborta jambo...@gmail.com wrote: Hi all, I am trying to come up with a workflow where I can query streams asynchronously. The problem I have is a ssc.awaitTermination() line blocks the whole thread, so it is not straightforward to me whether it is possible to get hold of objects from streams once they are started. any suggestion on what is the best way to implement this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interact-with-streams-in-a-non-blocking-way-tp21640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark standalone and HDFS 2.6
OK, from scanning the pom.xml, I think you would try: -Pyarn -Dhadoop.version=2.6.0 If it doesn't package or pass tests, then I'd assume it's not supported :( On Fri, Feb 13, 2015 at 7:33 PM, Grandl Robert rgra...@yahoo.com wrote: I am trying to run BlinkDB(https://github.com/sameeragarwal/blinkdb) which seems to work only with Spark 0.9. However, if I want to access HDFS I need to compile Spark against Hadoop version which is running on my cluster(2.6.0). Hence, the versions problem ... On Friday, February 13, 2015 11:28 AM, Sean Owen so...@cloudera.com wrote: Oh right, you said Spark 0.9. Those profiles won't exist back then. I don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles were introduced later to fix up some compatibility. Why not use 1.2.1? On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote: Thanks Sean for your prompt response. I was trying to compile as following: mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and all hadoop jars are in .m2 repository. Do you have any idea what might happens ? Robert [WARNING] Class com.google.protobuf.Parser not found - continuing with a stub. [ERROR] error while loading RpcResponseHeaderProto, class file '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)' is broken (class java.lang.NullPointerException/null) [WARNING] one warning found [ERROR] one error found [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .. SUCCESS [2.537s] [INFO] Spark Project Core FAILURE [25.917s] [INFO] Spark Project Bagel ... SKIPPED [INFO] Spark Project GraphX .. SKIPPED [INFO] Spark Project ML Library .. SKIPPED [INFO] Spark Project Streaming ... SKIPPED [INFO] Spark Project Tools ... SKIPPED [INFO] Spark Project REPL SKIPPED [INFO] Spark Project Assembly SKIPPED [INFO] Spark Project External Twitter SKIPPED [INFO] Spark Project External Kafka .. SKIPPED [INFO] Spark Project External Flume .. SKIPPED [INFO] Spark Project External ZeroMQ . SKIPPED [INFO] Spark Project External MQTT ... SKIPPED [INFO] Spark Project Examples SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 30.002s [INFO] Finished at: Fri Feb 13 11:21:36 PST 2015 [INFO] Final Memory: 49M/1226M [INFO] [WARNING] The requested profile hadoop-2.4 could not be activated because it does not exist. [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.1.5:compile (scala-compile-first) on project spark-core_2.10: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.1.5:compile failed. CompileFailed - [Help 1] On Friday, February 13, 2015 11:16 AM, Sean Owen so...@cloudera.com wrote: If you just need standalone mode, you don't need -Pyarn. There is no -Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set -Dhadoop.version=2.6.0. That should be it. If that still doesn't work, define doesn't succeed. On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert rgra...@yahoo.com.invalid wrote: Hi guys, Probably a dummy question. Do you know how to compile Spark 0.9 to easily integrate with HDFS 2.6.0 ? I was trying sbt/sbt -Pyarn -Phadoop-2.6 assembly or mvn -Dhadoop.version=2.6.0 -DskipTests clean package but none of these approaches succeeded. Thanks, Robert On Friday, February 13, 2015 11:28 AM, Sean Owen so...@cloudera.com wrote: Oh right, you said Spark 0.9. Those profiles won't exist back then. I don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles were introduced later to fix up some compatibility. Why not use 1.2.1? On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote: Thanks Sean for your prompt response. I was trying to compile as following: mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and all hadoop jars are in .m2 repository. Do you have any idea what might happens ? Robert [WARNING] Class com.google.protobuf.Parser not found - continuing with a stub. [ERROR] error while loading
Re: Has Spark 1.2.0 changed EC2 persistent-hdfs?
Looks like this is caused by issue SPARK-5008: https://issues.apache.org/jira/browse/SPARK-5008 On 13 February 2015 at 19:04, Joe Wass jw...@crossref.org wrote: I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour appears to have changed. My launch script is spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5 --ebs-vol-size=1000 launch myproject When I ssh into master I get: $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 2.9G 5.0G 37% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.3G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 33M 1000G 1% /vol0 that /vol0 is the place I want (and assume) persistent-hdfs to go. But when I look at the size I get: $ persistent-hdfs/bin/start-all.sh $ persistent-hdfs/bin/hadoop dfsadmin -report Warning: $HADOOP_HOME is deprecated. Configured Capacity: 42275430400 (39.37 GB) Present Capacity: 2644878 (24.63 GB) DFS Remaining: 26448601088 (24.63 GB) DFS Used: 143360 (140 KB) DFS Used%: 0% Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 - Datanodes available: 5 (5 total, 0 dead) Name: 10.46.11.156:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165372416 (2.95 GB) DFS Remaining: 5289684992(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.41.51.155:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165364224 (2.95 GB) DFS Remaining: 5289693184(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.38.30.254:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165249536 (2.95 GB) DFS Remaining: 5289807872(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.204.134.84:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165343744 (2.95 GB) DFS Remaining: 5289713664(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.33.15.134:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165356032 (2.95 GB) DFS Remaining: 5289701376(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 That's tiny. My suspicions are aroused when I see: $ ls /vol persistent-hdfs /vol is on the small /dev/xvda1 not the large EBS /dev/xvds I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change the volume: property namehadoop.tmp.dir/name value/vol0/persistent-hdfs/value !-- was /vol/persistent-hdfs -- /property And then persistent-hdfs/bin/stop-all.sh persistent-hdfs/bin/start-all.sh but when I do that, the persistent HDFS won't start for whatever reason. I can't run $ persistent-hdfs/bin/hadoop dfsadmin -report 15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 0 time(s). 15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 1 time(s). 15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 2 time(s). So, it looks like I can't use the EBS for persistent-hdfs. I was doing it before, so something must have changed in the last couple of weeks (last time I was using 1.1.0). Is this a bug? Has the behaviour of AWS changed? Am I doing something stupid? How do I fix it? Thanks in advance! Joe
RE: How to union RDD and remove duplicated keys
Do you mean first union all RDDs together and then do a reduceByKey()? Suppose my unioned RDD is rdd : (“id1”, “text 1”), (“id1”, “text 2”), (“id1”, “text 3”) How can I use reduceByKey to return (“id1”, “text 3”) ? I mean to take the last one entry for the same key Code snippet is appreciated because I am new to Spark. Ningjun From: Boromir Widas [mailto:vcsub...@gmail.com] Sent: Friday, February 13, 2015 1:28 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to union RDD and remove duplicated keys reducebyKey should work, but you need to define the ordering by using some sort of index. On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: I have multiple RDD[(String, String)] that store (docId, docText) pairs, e.g. rdd1: (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text 3”) rdd2: (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”) rdd3: (“id1”, “Long text 1 B”) Then, I want to merge all RDDs. If there is duplicated docids, later RDD should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final merged rdd should be rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”, “Long text 3”) Note that I have many such RDDs and each rdd have lots of elements. How can I do it efficiently? Ningjun
Re: Spark standalone and HDFS 2.6
Thanks Sean for your prompt response. I was trying to compile as following: mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and all hadoop jars are in .m2 repository. Do you have any idea what might happens ? Robert [WARNING] Class com.google.protobuf.Parser not found - continuing with a stub. [ERROR] error while loading RpcResponseHeaderProto, class file '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)' is broken (class java.lang.NullPointerException/null) [WARNING] one warning found [ERROR] one error found [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .. SUCCESS [2.537s] [INFO] Spark Project Core FAILURE [25.917s] [INFO] Spark Project Bagel ... SKIPPED [INFO] Spark Project GraphX .. SKIPPED [INFO] Spark Project ML Library .. SKIPPED [INFO] Spark Project Streaming ... SKIPPED [INFO] Spark Project Tools ... SKIPPED [INFO] Spark Project REPL SKIPPED [INFO] Spark Project Assembly SKIPPED [INFO] Spark Project External Twitter SKIPPED [INFO] Spark Project External Kafka .. SKIPPED [INFO] Spark Project External Flume .. SKIPPED [INFO] Spark Project External ZeroMQ . SKIPPED [INFO] Spark Project External MQTT ... SKIPPED [INFO] Spark Project Examples SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 30.002s [INFO] Finished at: Fri Feb 13 11:21:36 PST 2015 [INFO] Final Memory: 49M/1226M [INFO] [WARNING] The requested profile hadoop-2.4 could not be activated because it does not exist. [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.1.5:compile (scala-compile-first) on project spark-core_2.10: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.1.5:compile failed. CompileFailed - [Help 1] On Friday, February 13, 2015 11:16 AM, Sean Owen so...@cloudera.com wrote: If you just need standalone mode, you don't need -Pyarn. There is no -Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set -Dhadoop.version=2.6.0. That should be it. If that still doesn't work, define doesn't succeed. On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert rgra...@yahoo.com.invalid wrote: Hi guys, Probably a dummy question. Do you know how to compile Spark 0.9 to easily integrate with HDFS 2.6.0 ? I was trying sbt/sbt -Pyarn -Phadoop-2.6 assembly or mvn -Dhadoop.version=2.6.0 -DskipTests clean package but none of these approaches succeeded. Thanks, Robert
Re: Spark standalone and HDFS 2.6
Oh right, you said Spark 0.9. Those profiles won't exist back then. I don't even know if Hadoop 2.6 will work with 0.9 as-is. The profiles were introduced later to fix up some compatibility. Why not use 1.2.1? On Fri, Feb 13, 2015 at 7:26 PM, Grandl Robert rgra...@yahoo.com wrote: Thanks Sean for your prompt response. I was trying to compile as following: mvn -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests clean package but I got a bunch of errors(see below). Hadoop-2.6.0 compiled correctly, and all hadoop jars are in .m2 repository. Do you have any idea what might happens ? Robert [WARNING] Class com.google.protobuf.Parser not found - continuing with a stub. [ERROR] error while loading RpcResponseHeaderProto, class file '/home/rgrandl/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0/hadoop-common-2.6.0.jar(org/apache/hadoop/ipc/protobuf/RpcHeaderProtos$RpcResponseHeaderProto.class)' is broken (class java.lang.NullPointerException/null) [WARNING] one warning found [ERROR] one error found [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .. SUCCESS [2.537s] [INFO] Spark Project Core FAILURE [25.917s] [INFO] Spark Project Bagel ... SKIPPED [INFO] Spark Project GraphX .. SKIPPED [INFO] Spark Project ML Library .. SKIPPED [INFO] Spark Project Streaming ... SKIPPED [INFO] Spark Project Tools ... SKIPPED [INFO] Spark Project REPL SKIPPED [INFO] Spark Project Assembly SKIPPED [INFO] Spark Project External Twitter SKIPPED [INFO] Spark Project External Kafka .. SKIPPED [INFO] Spark Project External Flume .. SKIPPED [INFO] Spark Project External ZeroMQ . SKIPPED [INFO] Spark Project External MQTT ... SKIPPED [INFO] Spark Project Examples SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 30.002s [INFO] Finished at: Fri Feb 13 11:21:36 PST 2015 [INFO] Final Memory: 49M/1226M [INFO] [WARNING] The requested profile hadoop-2.4 could not be activated because it does not exist. [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.1.5:compile (scala-compile-first) on project spark-core_2.10: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.1.5:compile failed. CompileFailed - [Help 1] On Friday, February 13, 2015 11:16 AM, Sean Owen so...@cloudera.com wrote: If you just need standalone mode, you don't need -Pyarn. There is no -Phadoop-2.6; you should use -Phadoop-2.4 for 2.4+. Yes, set -Dhadoop.version=2.6.0. That should be it. If that still doesn't work, define doesn't succeed. On Fri, Feb 13, 2015 at 7:13 PM, Grandl Robert rgra...@yahoo.com.invalid wrote: Hi guys, Probably a dummy question. Do you know how to compile Spark 0.9 to easily integrate with HDFS 2.6.0 ? I was trying sbt/sbt -Pyarn -Phadoop-2.6 assembly or mvn -Dhadoop.version=2.6.0 -DskipTests clean package but none of these approaches succeeded. Thanks, Robert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable
Thanks Sean and Imran, I'll try splitting the broadcast variable into smaller ones. I had tried a regular join but it was failing due to high garbage collection overhead during the shuffle. One of the RDDs is very large and has a skewed distribution where a handful of keys account for 90% of the data. Do you have any pointers on how to handle skewed key distributions during a join. Soila On Fri, Feb 13, 2015 at 10:49 AM, Imran Rashid iras...@cloudera.com wrote: unfortunately this is a known issue: https://issues.apache.org/jira/browse/SPARK-1476 as Sean suggested, you need to think of some other way of doing the same thing, even if its just breaking your one big broadcast var into a few smaller ones On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen so...@cloudera.com wrote: I think you've hit the nail on the head. Since the serialization ultimately creates a byte array, and arrays can have at most ~2 billion elements in the JVM, the broadcast can be at most ~2GB. At that scale, you might consider whether you really have to broadcast these values, or want to handle them as RDDs and join and so on. Or consider whether you can break it up into several broadcasts? On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote: I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get the following exception when the size of the broadcast variable exceeds 2GB. Any ideas on how I can resolve this issue? java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Learning GraphX Questions
Hello, I was looking at GraphX as I believe it can be useful in my research on temporal data and I had a number of questions about the system: 1) How do you actually run programs in GraphX? At the moment I've been doing everything live through the shell, but I'd obviously like to be able to work on it by writing and running scripts. 2) Is there a way to check the status of the partitions of a graph? For example, I want to determine for starters if the number of partitions requested are always made, like if I ask for 8 partitions but only have 4 cores what happens? 3) Would I be able to partition by vertex instead of edges, even if I had to write it myself? I know partitioning by edges is favored in a majority of the cases, but for the sake of research I'd like to be able to do both. 4) Is there a better way to time processes outside of using built-in unix timing through the logs or something? Thank you very much for your insight, Matthew Bucci -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Learning-GraphX-Questions-tp21649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why are there different parts in my CSV?
Thanks Akhil for the suggestion, it is now only giving me one part - . Is there anyway I can just create a file rather than a directory? It doesn't seem like there is just a saveAsTextFile option for JavaPairRecieverDstream. Also, for the copy/merge api, how would I add that to my spark job? Thanks Akhil! Best, Su On Thu, Feb 12, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new *directory *that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Re: pyspark: Java null pointer exception when accessing broadcast variables
This is fixed in 1.2.1, could you upgrade to 1.2.1? On Thu, Feb 12, 2015 at 4:55 AM, Rok Roskar rokros...@gmail.com wrote: Hi again, I narrowed down the issue a bit more -- it seems to have to do with the Kryo serializer. When I use it, then this results in a Null Pointer: rdd = sc.parallelize(range(10)) d = {} from random import random for i in range(10) : d[i] = random() rdd.map(lambda x: d[x]).collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-97-7cd5df24206c in module() 1 rdd.map(lambda x: d[x]).collect() /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in collect(self) 674 675 with SCCallSiteSync(self.context) as css: -- 676 bytesInJava = self._jrdd.collect().iterator() 677 return list(self._collect_iterator_through_file(bytesInJava)) 678 /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o768.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 87, e1305.hpc-lca.ethz.ch): java.lang.NullPointerException at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) If I use a dictionary
Re: Columnar-Oriented RDDs
i wrote a proof of concept to automatically store any RDD of tuples or case classes in columar format using arrays (and strongly typed, so you get the benefit of primitive arrays). see: https://github.com/tresata/spark-columnar On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust mich...@databricks.com wrote: Shark's in-memory code was ported to Spark SQL and is used by default when you run .cache on a SchemaRDD or CACHE TABLE. I'd also look at parquet which is more efficient and handles nested data better. On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf nightwolf...@gmail.com wrote: Hi all, I'd like to build/use column oriented RDDs in some of my Spark code. A normal Spark RDD is stored as row oriented object if I understand correctly. I'd like to leverage some of the advantages of a columnar memory format. Shark (used to) and SparkSQL uses a columnar storage format using primitive arrays for each column. I'd be interested to know more about this approach and how I could build my own custom columnar-oriented RDD which I can use outside of Spark SQL. Could anyone give me some pointers on where to look to do something like this, either from scratch or using whats there in the SparkSQL libs or elsewhere. I know Evan Chan in a presentation made mention of building a custom RDD of column-oriented blocks of data. Cheers, ~N
SQLContext.applySchema strictness
Per the documentation: It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. However, it appears that this is not being enforced. import org.apache.spark.sql._ val sqlContext = new SqlContext(sc) val struct = StructType(List(StructField(test, BooleanType, true))) val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff))) val schemaData = sqlContext.applySchema(myData, struct) //No error schemaData.collect()(0).getBoolean(0) //Only now will I receive an error Is this expected or a bug? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to union RDD and remove duplicated keys
I have not run the following, but will be on these lines - rdd.zipWithIndex().map(x = (x._1._1, (x._1._2, x._2))).reduceByKey((a, b) = { if(a._2 b._2) a else b }).map(x = (x._1, x._2._1)) On Fri, Feb 13, 2015 at 3:27 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Do you mean first union all RDDs together and then do a reduceByKey()? Suppose my unioned RDD is rdd : (“id1”, “text 1”), (“id1”, “text 2”), (“id1”, “text 3”) How can I use reduceByKey to return (“id1”, “text 3”) ? I mean to take the last one entry for the same key Code snippet is appreciated because I am new to Spark. Ningjun *From:* Boromir Widas [mailto:vcsub...@gmail.com] *Sent:* Friday, February 13, 2015 1:28 PM *To:* Wang, Ningjun (LNG-NPV) *Cc:* user@spark.apache.org *Subject:* Re: How to union RDD and remove duplicated keys reducebyKey should work, but you need to define the ordering by using some sort of index. On Fri, Feb 13, 2015 at 12:38 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have multiple RDD[(String, String)] that store (docId, docText) pairs, e.g. rdd1: (“id1”, “Long text 1”), (“id2”, “Long text 2”), (“id3”, “Long text 3”) rdd2: (“id1”, “Long text 1 A”), (“id2”, “Long text 2 A”) rdd3: (“id1”, “Long text 1 B”) Then, I want to merge all RDDs. If there is duplicated docids, later RDD should overwrite previous RDD. In the above case, rdd2 will overwrite rddd1 for “id1” and “id2”, then rdd3 will overwrite rdd2 for “id1”. The final merged rdd should be rddFinal: (“id1”, “Long text 1 B”), (“id2”, “Long text 2 A”), (“id3”, “Long text 3”) Note that I have many such RDDs and each rdd have lots of elements. How can I do it efficiently? Ningjun
Re: SQLContext.applySchema strictness
Hi Justin, It is expected. We do not check if the provided schema matches rows since all rows need to be scanned to give a correct answer. Thanks, Yin On Fri, Feb 13, 2015 at 1:33 PM, Justin Pihony justin.pih...@gmail.com wrote: Per the documentation: It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. However, it appears that this is not being enforced. import org.apache.spark.sql._ val sqlContext = new SqlContext(sc) val struct = StructType(List(StructField(test, BooleanType, true))) val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff))) val schemaData = sqlContext.applySchema(myData, struct) //No error schemaData.collect()(0).getBoolean(0) //Only now will I receive an error Is this expected or a bug? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQLContext.applySchema strictness
OK, but what about on an action, like collect()? Shouldn't it be able to determine the correctness at that time? On Fri, Feb 13, 2015 at 4:49 PM, Yin Huai yh...@databricks.com wrote: Hi Justin, It is expected. We do not check if the provided schema matches rows since all rows need to be scanned to give a correct answer. Thanks, Yin On Fri, Feb 13, 2015 at 1:33 PM, Justin Pihony justin.pih...@gmail.com wrote: Per the documentation: It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. However, it appears that this is not being enforced. import org.apache.spark.sql._ val sqlContext = new SqlContext(sc) val struct = StructType(List(StructField(test, BooleanType, true))) val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff))) val schemaData = sqlContext.applySchema(myData, struct) //No error schemaData.collect()(0).getBoolean(0) //Only now will I receive an error Is this expected or a bug? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL doesn't seem to like $'s in column names
This doesn't seem to have helped. On Fri, Feb 13, 2015 at 2:51 PM, Michael Armbrust mich...@databricks.com wrote: Try using `backticks` to escape non-standard characters. On Fri, Feb 13, 2015 at 11:30 AM, Corey Nolet cjno...@gmail.com wrote: I don't remember Oracle ever enforcing that I couldn't include a $ in a column name, but I also don't thinking I've ever tried. When using sqlContext.sql(...), I have a SELECT * from myTable WHERE locations_$homeAddress = '123 Elm St' It's telling me $ is invalid. Is this a bug?
Re: Boolean values as predicates in SQL string
Nevermind- I think I may have had a schema-related issue (sometimes booleans were represented as string and sometimes as raw booleans but when I populated the schema one or the other was chosen. On Fri, Feb 13, 2015 at 8:03 PM, Corey Nolet cjno...@gmail.com wrote: Here are the results of a few different SQL strings (let's assume the schemas are valid for the data types used): SELECT * from myTable where key1 = true - no filters are pushed to my PrunedFilteredScan SELECT * from myTable where key1 = true and key2 = 5 - 1 filter (key2) is pushed to my PrunedFilteredScan SELECT * from myTable where key1 = false and key3 = 'val3' - 1 filter (key3) is pushed to my PrunedFilteredScan SELECT * from myTable where key1 = 'false' - (as expected) it passed down an EqualTo Filter that's matching 'false' as a string. I was going to file a bug for this but before I did that I wanted to make sure I wasn't missing something (like a special way to handle booleans). I'm assuming the filter is being optimized out because it's being assigned the literal true but in this case I really want to match against a boolean datatype which is true.
Re: SparkException: Task not serializable - Jackson Json
I'm having the same problem with the same sample code. Any progress on this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkException-Task-not-serializable-Jackson-Json-tp21347p21651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Boolean values as predicates in SQL string
Here are the results of a few different SQL strings (let's assume the schemas are valid for the data types used): SELECT * from myTable where key1 = true - no filters are pushed to my PrunedFilteredScan SELECT * from myTable where key1 = true and key2 = 5 - 1 filter (key2) is pushed to my PrunedFilteredScan SELECT * from myTable where key1 = false and key3 = 'val3' - 1 filter (key3) is pushed to my PrunedFilteredScan SELECT * from myTable where key1 = 'false' - (as expected) it passed down an EqualTo Filter that's matching 'false' as a string. I was going to file a bug for this but before I did that I wanted to make sure I wasn't missing something (like a special way to handle booleans). I'm assuming the filter is being optimized out because it's being assigned the literal true but in this case I really want to match against a boolean datatype which is true.
Re: Learning GraphX Questions
At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote: 1) How do you actually run programs in GraphX? At the moment I've been doing everything live through the shell, but I'd obviously like to be able to work on it by writing and running scripts. You can create your own projects that build against Spark and GraphX through a Maven dependency [1], then run those applications using the bin/spark-submit script included with Spark [2]. These guides assume you already know how to do this using your preferred build tool (SBT or Maven). In short, here's how to do it with SBT: 1. Install SBT locally (`brew install sbt` on OS X). 2. Inside your project directory, create a build.sbt file listing Spark and GraphX as a dependency, as in [3]. 3. Run `sbt package` in a shell. 4. Pass the JAR in your_project_dir/target/scala-2.10/ to bin/spark-submit. [1] http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark [2] http://spark.apache.org/docs/latest/submitting-applications.html [3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4 2) Is there a way to check the status of the partitions of a graph? For example, I want to determine for starters if the number of partitions requested are always made, like if I ask for 8 partitions but only have 4 cores what happens? You can look at `graph.vertices` and `graph.edges`, which are both RDDs, so you can do for example: graph.vertices.partitions 3) Would I be able to partition by vertex instead of edges, even if I had to write it myself? I know partitioning by edges is favored in a majority of the cases, but for the sake of research I'd like to be able to do both. If you pass PartitionStrategy.EdgePartition1D, this will partition edges by their source vertices, so all edges with the same source will be co-partitioned, and the communication pattern will be similar to vertex-partitioned (edge-cut) systems like Giraph. 4) Is there a better way to time processes outside of using built-in unix timing through the logs or something? I think the options are Unix timing, log file timestamp parsing, looking at the web UI, or writing timing code within your program (System.currentTimeMillis and System.nanoTime). Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: An interesting and serious problem I encountered
Thanks for Ye Xianjin's suggestions. The SizeOf.jar may indeed have some problems. I did a simple test as follows. The codes are val n = 1; //5; //10; //100; //1000; val arr1 = new Array[(Int, Array[Int])](n); for(i - 0 until arr1.length){ arr1(i) = (i, new Array[Int](43)); } println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr1))); val arr2 = new Array[Array[Int]](n); for(i - 0 until arr2.length){ arr2(i) = new Array[Int](43); } println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr2))); I changed the value of n, and its results are n=1 1016.0b 216.0b n=5 1.9140625Kb 1000.0b n=10 3.0625Kb 1.9296875Kb n=100 23.8046875Kb 19.15625Kb n=1000 231.2265625Kb 191.421875Kb As suggested by Ye Xianjin, I tried to use SizeEstimator (https://github.com/phatak-dev/java-sizeof/blob/master/src/main/scala/com/madhukaraphatak/sizeof/SizeEstimator.scala) The results are n=1 264 216 n=5 1240 1000 n=10 2456 1976 n=100 24416 19616 n=1000 227216 182576 It seems that SizeEstimator computes the memory correctly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637p21652.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org