How to enforce RDD to be cached?
Hi, I noticed that rdd.cache() is not happening immediately rather due to lazy feature of Spark, it is happening just at the moment you perform some map/reduce actions. Is this true? If this is the case, how can I enforce Spark to cache immediately at its cache() statement? I need this to perform some benchmarking and I need to separate rdd caching and rdd transformation/action processing time. best, /Shahab
Re: Filter using the Vertex Ids
And one more thing, the given tupes (1, 1.0) (2, 1.0) (3, 2.0) (4, 2.0) (5, 0.0) are a part of RDD and they are not just tuples. graph.vertices return me the above tuples which is a part of VertexRDD. On Wed, Dec 3, 2014 at 3:43 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: This is just an example but if my graph is big, there will be so many tuples to handle. I cannot manually do val a: RDD[(Int, Double)] = sc.parallelize(List( (1, 1.0), (2, 1.0), (3, 2.0), (4, 2.0), (5, 0.0))) for all the vertices in the graph. What should I do in that case? We cannot do *sc.parallelize(List(VertexRDD)), *can we? On Wed, Dec 3, 2014 at 3:32 PM, Ankur Dave ankurd...@gmail.com wrote: At 2014-12-02 22:01:20 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote: I have a graph which returns the following on doing graph.vertices (1, 1.0) (2, 1.0) (3, 2.0) (4, 2.0) (5, 0.0) I want to group all the vertices with the same attribute together, like into one RDD or something. I want all the vertices with same attribute to be together. You can do this by flipping the tuples so the values become the keys, then using one of the by-key functions in PairRDDFunctions: val a: RDD[(Int, Double)] = sc.parallelize(List( (1, 1.0), (2, 1.0), (3, 2.0), (4, 2.0), (5, 0.0))) val b: RDD[(Double, Int)] = a.map(kv = (kv._2, kv._1)) val c: RDD[(Double, Iterable[Int])] = b.groupByKey(numPartitions = 5) c.collect.foreach(println) // (0.0,CompactBuffer(5)) // (1.0,CompactBuffer(1, 2)) // (2.0,CompactBuffer(3, 4)) Ankur
Re: Help understanding - Not enough space to cache rdd
Set spark.storage.memoryFraction flag to 1 while creating the sparkContext to utilize upto 73Gb of your memory, default it 0.6 and hence you are getting 33.6Gb. Also set rdd.compression and StorageLevel as MEMORY_ONLY_SER if your data is kind of larger than your available memory. (you could try MEMORY_AND_DISK_SER also) Thanks Best Regards On Wed, Dec 3, 2014 at 12:23 AM, akhandeshi ami.khande...@gmail.com wrote: I am running in local mode. I am using google n1-highmem-16 (16 vCPU, 104 GB memory) machine. I have allocated the SPARK_DRIVER_MEMORY=95g I see Memory: 33.6 GB Used (73.7 GB Total) that the exeuctor is using. In the log out put below, I see 33.6 gb blocks are used by 2 rdds that I have cached. I should still have 40.2 gb left. However, I see messages like: 14/12/02 18:15:04 WARN storage.MemoryStore: Not enough space to cache rdd_15_9 in memory! (computed 8.1 GB so far) 14/12/02 18:15:04 INFO storage.MemoryStore: Memory use = 33.6 GB (blocks) + 40.1 GB (scratch space shared across 14 thread(s)) = 73.7 GB. Storage limit = 73.7 GB. 14/12/02 18:15:04 WARN spark.CacheManager: Persisting partition rdd_15_9 to disk instead. . . . . further down I see: 4/12/02 18:30:08 INFO storage.BlockManagerInfo: Added rdd_15_9 on disk on localhost:41889 (size: 6.9 GB) 4/12/02 18:30:08 INFO storage.BlockManagerMaster: Updated info of block rdd_15_9 14/12/02 18:30:08 ERROR executor.Executor: Exception in task 9.0 in stage 2.0 (TID 348) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE I don't understand couple of things: 1) In this case, I am joining 2 RDDs (size 16.3 G and 17.2 GB) both rdds are create from reading from HDFS files. The size of each .part is 24.87 MB, I am reading this files into 250 partitions, so I shouldn't have any individual partition over 25MB, so how could rdd_15_9 have 8.1g? 2) Even if the data is 8.1g, spark should have enough memory to write, but I would expect Integer.MAX_VALUE 2gb limitation! However, I don't get that error message, and partial dataset is written to disk (6.9 gb). I don't understand how and why only partial dataset is written. 3) Why do get java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE after writing partial dataset. I would love to hear from anyone that can shed some light into this... None -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-understanding-Not-enough-space-to-cache-rdd-tp20186.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: WordCount fails in .textFile() method
Try running it in local mode. Looks like a jar conflict/missing. SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(*local[2]*).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(“myJar.jar”); new JavaWordCount(jsc).doJob(); Thanks Best Regards On Wed, Dec 3, 2014 at 2:49 AM, Rahul Swaminathan rahul.swaminat...@duke.edu wrote: Hi, I am trying to run JavaWordCount without using the spark-submit script. I have copied the source code for JavaWordCount and am using a JavaSparkContext with the following: SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(spark://127.0.0.1:7077 ).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(“myJar.jar”); new JavaWordCount(jsc).doJob(); I am getting the following error in the .textFile() method: Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) What can I do to solve this issue? It works fine when running from command line with spark-submit script. Thanks, Rahul
Re: Spark with HBase
You could go through these to start with http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark Thanks Best Regards On Wed, Dec 3, 2014 at 11:51 AM, Jai jaidishhari...@gmail.com wrote: I am trying to use Apache Spark with a psuedo distributed Hadoop Hbase Cluster and I am looking for some links regarding the same. Can someone please guide me through the steps to accomplish this. Thanks a lot for Helping -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-HBase-tp20226.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi, Yes, as Jerry mentioned, the Spark -3129 ( https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature which solves the Driver failure problem. The way 3129 is designed , it solved the driver failure problem agnostic of the source of the stream ( like Kafka or Flume etc) But with just 3129 you can not achieve complete solution for data loss. You need a reliable receiver which should also solves the data loss issue on receiver failure. The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer) for which this email thread was started has solved that problem with Kafka Low Level API. And Spark-4062 as Jerry mentioned also recently solved the same problem using Kafka High Level API. On the Kafka High Level Consumer API approach , I would like to mention that Kafka 0.8 has some issue as mentioned in this wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design) where consumer re-balance sometime fails and that is one of the key reason Kafka is re-writing consumer API in Kafka 0.9. I know there are few folks already have faced this re-balancing issues while using Kafka High Level API , and If you ask my opinion, we at Pearson are still using the Low Level Consumer as this seems to be more robust and performant and we have been using this for few months without any issue ..and also I may be little biased :) Regards, Dibyendu On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi Rod, The purpose of introducing WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain. The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler. Thanks Jerry -Original Message- From: RodrigoB [mailto:rodrigo.boav...@aspect.com] Sent: Wednesday, December 3, 2014 5:44 AM To: u...@spark.incubator.apache.org Subject: Re: Low Level Kafka Consumer for Spark Dibyendu, Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How does 2.3.4-spark differ from typesafe 2.3.4 akka?
Using sbt-assemble I'm creating a fat jar that includes spark and akka. I've encountered this error: [error] /home/dev/.ivy2/cache/com.typesafe.akka/akka-actor_2.10/jars/akka-actor_2.10-2.3.4.jar:akka/util/ByteIterator$$anonfun$getLongPart$1.class [error] /home/dev/.ivy2/cache/org.spark-project.akka/akka-actor_2.10/jars/akka-actor_2.10-2.3.4-spark.jar:akka/util/ByteIterator$$anonfun$getLongPart$1.class This brought the org.spark-project.akka group to my attention. Can someone explain why the spark-project builds and uses it's own release of akka? How does it differ from the com.typesafe.akka release? Thanks, David -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-2-3-4-spark-differ-from-typesafe-2-3-4-akka-tp20229.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pySpark saveAsSequenceFile append overwrite
You can't append to a file with spark using the native saveAs* calls, it will always check if the directory already exists and if yes, it will throw error. People usually use hadoop's getMerge utilities to combine the output. Thanks Best Regards On Tue, Dec 2, 2014 at 8:10 PM, Csaba Ragany rag...@gmail.com wrote: Dear Spark community, Has the pySpark saveAsSequenceFile(folder) method the ability to append the new sequencefile into an other one or to overwrite an existing sequencefile? If the folder already exists then I get an error message... Thank You! Csaba
getting firs N messages froma Kafka topic using Spark Streaming
Hi Experts! Is there a way to read first N messages from kafka stream and put them in some collection and return to the caller for visualization purpose and close spark streaming. I will be glad to hear from you and will be thankful to you. Currently I have following code that def getsample(params: scala.collection.immutable.Map[String, String]): Unit = { if (params.contains(zookeeperQourum)) zkQuorum = params.get(zookeeperQourum).get if (params.contains(userGroup)) group = params.get(userGroup).get if (params.contains(topics)) topics = params.get(topics).get if (params.contains(numberOfThreads)) numThreads = params.get(numberOfThreads).get if (params.contains(sink)) sink = params.get(sink).get if (params.contains(batchInterval)) interval = params.get(batchInterval).get.toInt val sparkConf = new SparkConf().setAppName(KafkaConsumer).setMaster(spark://cloud2-server:7077) val ssc = new StreamingContext(sparkConf, Seconds(interval)) val topicMap = topics.split(,).map((_, numThreads.toInt)).toMap var consumerConfig = scala.collection.immutable.Map.empty[String, String] consumerConfig += (auto.offset.reset - smallest) consumerConfig += (zookeeper.connect - zkQuorum) consumerConfig += (group.id - group) var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2) val streams = data.window(Seconds(interval), Seconds(interval)).map(x = new String(x)) streams.foreach(rdd = rdd.foreachPartition(itr = { while (itr.hasNext size = 0) { var msg=itr.next println(msg) sample.append(msg) sample.append(\n) size -= 1 } })) ssc.start() ssc.awaitTermination(5000) ssc.stop(true) } Where sample is a StringBuilder, when I print the contents of this string builder after getSample method call is returned. I got nothing in it. Any help will be appreciated -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-firs-N-messages-froma-Kafka-topic-using-Spark-Streaming-tp20227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL UDF returning a list?
Hi, Can a UDF return a list of values that can be used in a WHERE clause? Something like: sqlCtx.registerFunction(myudf, { Array(1, 2, 3) }) val sql = select doc_id, doc_value from doc_table where doc_id in myudf() This does not work: Exception in thread main java.lang.RuntimeException: [1.57] failure: ``('' expected but identifier myudf found I also tried returning a List of Ints, that did not work either. Is there a way to write a UDF that returns a list? Thanks -Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark with HBase
Which hbase release are you running ? If it is 0.98, take a look at: https://issues.apache.org/jira/browse/SPARK-1297 Thanks On Dec 2, 2014, at 10:21 PM, Jai jaidishhari...@gmail.com wrote: I am trying to use Apache Spark with a psuedo distributed Hadoop Hbase Cluster and I am looking for some links regarding the same. Can someone please guide me through the steps to accomplish this. Thanks a lot for Helping -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-HBase-tp20226.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does count() evaluate all mapped functions?
Hi, I have an RDD and a function that should be called on every item in this RDD once (say it updates an external database). So far, I used rdd.map(myFunction).count() or rdd.mapPartitions(iter = iter.map(myFunction)) but I am wondering if this always triggers the call of myFunction in both cases. Actually, in the first case, the count() will be the same whether or not myFunction is called for each element, so I was just wondering if I can rely on count() evaluating the whole pipeline including functions that cannot change the count. Thanks Tobias
Re: Filter using the Vertex Ids
This is just an example but if my graph is big, there will be so many tuples to handle. I cannot manually do val a: RDD[(Int, Double)] = sc.parallelize(List( (1, 1.0), (2, 1.0), (3, 2.0), (4, 2.0), (5, 0.0))) for all the vertices in the graph. What should I do in that case? We cannot do *sc.parallelize(List(VertexRDD)), *can we? On Wed, Dec 3, 2014 at 3:32 PM, Ankur Dave ankurd...@gmail.com wrote: At 2014-12-02 22:01:20 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote: I have a graph which returns the following on doing graph.vertices (1, 1.0) (2, 1.0) (3, 2.0) (4, 2.0) (5, 0.0) I want to group all the vertices with the same attribute together, like into one RDD or something. I want all the vertices with same attribute to be together. You can do this by flipping the tuples so the values become the keys, then using one of the by-key functions in PairRDDFunctions: val a: RDD[(Int, Double)] = sc.parallelize(List( (1, 1.0), (2, 1.0), (3, 2.0), (4, 2.0), (5, 0.0))) val b: RDD[(Double, Int)] = a.map(kv = (kv._2, kv._1)) val c: RDD[(Double, Iterable[Int])] = b.groupByKey(numPartitions = 5) c.collect.foreach(println) // (0.0,CompactBuffer(5)) // (1.0,CompactBuffer(1, 2)) // (2.0,CompactBuffer(3, 4)) Ankur
Re: WordCount fails in .textFile() method
dump your classpath, looks like you have multiple versions of guava jars in the classpath. Thanks Best Regards On Wed, Dec 3, 2014 at 2:30 PM, Rahul Swaminathan rahul.swaminat...@duke.edu wrote: I’ve tried that and the same error occurs. Do you have any other suggestions? Thanks! Rahul From: Akhil Das ak...@sigmoidanalytics.com Date: Wednesday, December 3, 2014 at 3:55 AM To: Rahul Swaminathan rahul.swaminat...@duke.edu Cc: u...@spark.incubator.apache.org u...@spark.incubator.apache.org Subject: Re: WordCount fails in .textFile() method Resent-From: rahul.swaminat...@duke.edu Try running it in local mode. Looks like a jar conflict/missing. SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(*local[2]*).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(“myJar.jar”); new JavaWordCount(jsc).doJob(); Thanks Best Regards On Wed, Dec 3, 2014 at 2:49 AM, Rahul Swaminathan rahul.swaminat...@duke.edu wrote: Hi, I am trying to run JavaWordCount without using the spark-submit script. I have copied the source code for JavaWordCount and am using a JavaSparkContext with the following: SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(spark://127.0.0.1:7077 ).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(“myJar.jar”); new JavaWordCount(jsc).doJob(); I am getting the following error in the .textFile() method: Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) What can I do to solve this issue? It works fine when running from command line with spark-submit script. Thanks, Rahul
Re: WordCount fails in .textFile() method
I've tried that and the same error occurs. Do you have any other suggestions? Thanks! Rahul From: Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com Date: Wednesday, December 3, 2014 at 3:55 AM To: Rahul Swaminathan rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: WordCount fails in .textFile() method Resent-From: rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu Try running it in local mode. Looks like a jar conflict/missing. SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(local[2]).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(myJar.jar); new JavaWordCount(jsc).doJob(); Thanks Best Regards On Wed, Dec 3, 2014 at 2:49 AM, Rahul Swaminathan rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu wrote: Hi, I am trying to run JavaWordCount without using the spark-submit script. I have copied the source code for JavaWordCount and am using a JavaSparkContext with the following: SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(spark://127.0.0.1:7077http://127.0.0.1:7077).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(myJar.jar); new JavaWordCount(jsc).doJob(); I am getting the following error in the .textFile() method: Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.orghttp://org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) What can I do to solve this issue? It works fine when running from command line with spark-submit script. Thanks, Rahul
Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
Hi Guys, I am trying to use SparkSQL to convert an RDD to SchemaRDD so that I can save it in parquet format. A record in my RDD has the following format: RDD1 { field1:5, field2: 'string', field3: {'a':1, 'c':2} } I am using field3 to represent a sparse vector and it can have keys: 'a','b' or 'c' and values any int value The current approach I am using is : schemaRDD1 = sqc.jsonRDD(RDD1.map(lambda x: simplejson.dumps(x))) But when I do this, the dictionary in field 3 also gets converted to a SparkSQL Row. This converts field3 to be a dense data structure where it holds value None for every key that is not present in the field 3 for each record. When I do test = RDD1.map(lambda x: simplejson.dumps(x)) test.first() my output is: {field1: 5, field2:string, field3 :{a:1,c:2}} But then when I do schemaRDD = sqc.jsonRDD(test) schemaRDD.first() my output is : Row( field1=5, field2='string', field3 = Row(a=1,b=None,c=2)) in realty, I have 1000s of probable keys in field 3 and only 2 to 3 of them occur per record. So When tic converts to a Row, it generates thousands of None fields per record. Is there anyways for me to store field3 as a dictionary instead of converting it into a Row in the schemaRDD?? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL table Join, one task is taking long
Hey Venkat, This behavior seems reasonable. According to the table name, I guess here |DAgents| should be the fact table and |ContactDetails| is the dim table. Below is an explanation of a similar query, you may see |src| as |DAgents| and |src1| as |ContactDetails|. |0: jdbc:hive2://localhost:1 explain extended select * from src, src1 where src.key = src1.key and src.key = 100; ++ |plan | ++ | == Parsed Logical Plan == | | 'Project [*] | | 'Filter (('src.key = 'src1.key) ('src.key = 100)) | | 'Join Inner, None | |'UnresolvedRelation None, src, None | |'UnresolvedRelation None, src1, None | | | | == Analyzed Logical Plan == | | Project [key#81,value#82,key#83,value#84] | | Filter ((key#81 = key#83) (key#81 = 100)) | | Join Inner, None | |MetastoreRelation default, src, None | |MetastoreRelation default, src1, None | | | | == Optimized Logical Plan == | | Project [key#81,value#82,key#83,value#84] | | Join Inner, Some((key#81 = key#83)) | | Filter (key#81 = 100) | |MetastoreRelation default, src, None | | MetastoreRelation default, src1, None | | | | == Physical Plan == | | Project [key#81,value#82,key#83,value#84] | | ShuffledHashJoin [key#81], [key#83], BuildRight | | Exchange (HashPartitioning [key#81], 200) | |Filter (key#81 = 100) | | HiveTableScan [key#81,value#82], (MetastoreRelation default, src, None), None | | Exchange (HashPartitioning [key#83], 200) | |HiveTableScan [key#83,value#84], (MetastoreRelation default, src1, None), None | | | | Code Generation: false | | == RDD == | ++ | Please notice the |Filter| node in the physical plan. In your case, all the filtered rows are shuffled into a single partition because |DAgents.f1| is both the predicate key and the shuffle key, and that partition is handled by the task that lasts for more than 1 second. All other tasks in the count stage cost only a few ms because they don’t receive any rows from |DAgents|. If |ContactDetails| is small enought, you can cache |ContactDetails| first and set |spark.sql.autoBroadcastJoinShreshold| larger than the size of |ContactDetails|, a broadcast join rather than a would be performed, and would usually result better performance. Cheng On 12/2/14 6:35 AM, Venkat Subramanian wrote: Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB Ram each. Default serialization, Standalone, no security Data was sqooped from relational DB to HDFS and Data is partitioned across HDFS uniformly. I am reading a fact table about 8 GB in size and one small dim table from HDFS and then doing a join on them based on a criteria. . Running the Driver on Spark shell on Spark master. ContactDetail and DAgents are read as RDD and registered as table already. Each of these tables have 60 to 90 fields and I am using Product class. val CDJoinQry= sqlContext.sql(SELECT * FROM ContactDetail, DAgents WHERE ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902) CDJoinQry.map(ta = ta(4)).count // result is a
Re: Filter using the Vertex Ids
At 2014-12-03 02:13:49 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote: We cannot do sc.parallelize(List(VertexRDD)), can we? There's no need to do this, because every VertexRDD is also a pair RDD: class VertexRDD[VD] extends RDD[(VertexId, VD)] You can simply use graph.vertices in place of `a` in my example. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter using the Vertex Ids
At 2014-12-02 22:01:20 -0800, Deep Pradhan pradhandeep1...@gmail.com wrote: I have a graph which returns the following on doing graph.vertices (1, 1.0) (2, 1.0) (3, 2.0) (4, 2.0) (5, 0.0) I want to group all the vertices with the same attribute together, like into one RDD or something. I want all the vertices with same attribute to be together. You can do this by flipping the tuples so the values become the keys, then using one of the by-key functions in PairRDDFunctions: val a: RDD[(Int, Double)] = sc.parallelize(List( (1, 1.0), (2, 1.0), (3, 2.0), (4, 2.0), (5, 0.0))) val b: RDD[(Double, Int)] = a.map(kv = (kv._2, kv._1)) val c: RDD[(Double, Iterable[Int])] = b.groupByKey(numPartitions = 5) c.collect.foreach(println) // (0.0,CompactBuffer(5)) // (1.0,CompactBuffer(1, 2)) // (2.0,CompactBuffer(3, 4)) Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
textFileStream() issue?
Hi, I am trying to use textFileStream(some_hdfs_location) to pick new files from a HDFS location.I am seeing a pretty strange behavior though. textFileStream() is not detecting new files when I move them from a location with in hdfs to location at which textFileStream() is checking for new files. But when I copy files from a location in linux filesystem to hdfs then the textFileStream is detecting the new files. Is this a know issue? Thanks, Baahu
Re: getting firs N messages froma Kafka topic using Spark Streaming
You could do something like: val stream = kafkaStream.getStream().repartition(1).mapPartitions(x = x. take(*10*)) Here stream will have 10 elements from the kafakaStream. Thanks Best Regards On Wed, Dec 3, 2014 at 1:05 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi Experts! Is there a way to read first N messages from kafka stream and put them in some collection and return to the caller for visualization purpose and close spark streaming. I will be glad to hear from you and will be thankful to you. Currently I have following code that def getsample(params: scala.collection.immutable.Map[String, String]): Unit = { if (params.contains(zookeeperQourum)) zkQuorum = params.get(zookeeperQourum).get if (params.contains(userGroup)) group = params.get(userGroup).get if (params.contains(topics)) topics = params.get(topics).get if (params.contains(numberOfThreads)) numThreads = params.get(numberOfThreads).get if (params.contains(sink)) sink = params.get(sink).get if (params.contains(batchInterval)) interval = params.get(batchInterval).get.toInt val sparkConf = new SparkConf().setAppName(KafkaConsumer).setMaster(spark://cloud2-server:7077) val ssc = new StreamingContext(sparkConf, Seconds(interval)) val topicMap = topics.split(,).map((_, numThreads.toInt)).toMap var consumerConfig = scala.collection.immutable.Map.empty[String, String] consumerConfig += (auto.offset.reset - smallest) consumerConfig += (zookeeper.connect - zkQuorum) consumerConfig += (group.id - group) var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2) val streams = data.window(Seconds(interval), Seconds(interval)).map(x = new String(x)) streams.foreach(rdd = rdd.foreachPartition(itr = { while (itr.hasNext size = 0) { var msg=itr.next println(msg) sample.append(msg) sample.append(\n) size -= 1 } })) ssc.start() ssc.awaitTermination(5000) ssc.stop(true) } Where sample is a StringBuilder, when I print the contents of this string builder after getSample method call is returned. I got nothing in it. Any help will be appreciated -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-firs-N-messages-froma-Kafka-topic-using-Spark-Streaming-tp20227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: getting firs N messages froma Kafka topic using Spark Streaming
Hi Akhil! Thanks for your response. Can you please suggest me how to return this sample from a function to the caller and stopping SparkStreaming Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-firs-N-messages-froma-Kafka-topic-using-Spark-Streaming-tp20227p20249.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to enforce RDD to be cached?
On Wed, Dec 3, 2014 at 10:52 AM, shahab shahab.mok...@gmail.com wrote: Hi, I noticed that rdd.cache() is not happening immediately rather due to lazy feature of Spark, it is happening just at the moment you perform some map/reduce actions. Is this true? Yes, this is correct. If this is the case, how can I enforce Spark to cache immediately at its cache() statement? I need this to perform some benchmarking and I need to separate rdd caching and rdd transformation/action processing time. The typical solution I think is to run rdd.foreach(_ = ()) to trigger a calculation.
Re: Announcing Spark 1.1.1!
Andrew and developers, thank you for excellent release! It fixed almost all of our issues. Now we are migrating to Spark from Zoo of Python, Java, Hive, Pig jobs. Our Scala/Spark jobs often failed on 1.1. Spark 1.1.1 works like a Swiss watch. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Announcing-Spark-1-1-1-tp20195p20251.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
converting DStream[String] into RDD[String] in spark streaming
Hi everyOne! I want to convert a DStream[String] into an RDD[String]. I could not find how to do this. var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2) val streams = data.window(Seconds(interval), Seconds(interval)).map(x = new String(x)) Now I want to convert this streams into a single RDD[String]. Any help please. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
collecting fails - requirements for collecting (clone, hashCode etc?)
The following code is failing on the collect. If I don't do the collect and go with a JavaRDDDocument it works fine. Except I really would like to collect. At first I was getting an error regarding JDI threads and an index being 0. Then it just started locking up. I'm running the spark context locally on 8 cores. long count = documents.filter(d - d.getFeatures().size() Parameters.MIN_CENTROID_FEATURES).count(); ListDocument sampledDocuments = documents.filter(d - d.getFeatures().size() Parameters.MIN_CENTROID_FEATURES) .sample(false, samplingFraction(count)).collect();
RE: collecting fails - requirements for collecting (clone, hashCode etc?)
I didn't realize I do get a nice stack trace if not running in debug mode. Basically, I believe Document has to be serializable. But since the question has already been asked, are the other requirements for objects within an RDD that I should be aware of. serializable is very understandable. How about clone, hashCode, etc... From: ronalday...@live.com To: user@spark.apache.org Subject: collecting fails - requirements for collecting (clone, hashCode etc?) Date: Wed, 3 Dec 2014 07:48:53 -0600 The following code is failing on the collect. If I don't do the collect and go with a JavaRDDDocument it works fine. Except I really would like to collect. At first I was getting an error regarding JDI threads and an index being 0. Then it just started locking up. I'm running the spark context locally on 8 cores. long count = documents.filter(d - d.getFeatures().size() Parameters.MIN_CENTROID_FEATURES).count(); ListDocument sampledDocuments = documents.filter(d - d.getFeatures().size() Parameters.MIN_CENTROID_FEATURES) .sample(false, samplingFraction(count)).collect();
Re: converting DStream[String] into RDD[String] in spark streaming
DStream.foreachRDD gives you an RDD[String] for each interval of course. I don't think it makes sense to say a DStream can be converted into one RDD since it is a stream. The past elements are inherently not supposed to stick around for a long time, and future elements aren't known. You may consider saving each RDD[String] to HDFS, and then simply loading it from HDFS as an RDD[String]. On Wed, Dec 3, 2014 at 7:45 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi everyOne! I want to convert a DStream[String] into an RDD[String]. I could not find how to do this. var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2) val streams = data.window(Seconds(interval), Seconds(interval)).map(x = new String(x)) Now I want to convert this streams into a single RDD[String]. Any help please. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: converting DStream[String] into RDD[String] in spark streaming
Thanks Dear, It is good to save this data to HDFS and then load back into an RDD :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253p20258.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to enforce RDD to be cached?
Yes, otherwise you can try: rdd.cache().count() and then run your benchmark Paolo Da: Daniel Darabosmailto:daniel.dara...@lynxanalytics.com Data invio: ?mercoled?? ?3? ?dicembre? ?2014 ?12?:?28 A: shahabmailto:shahab.mok...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org On Wed, Dec 3, 2014 at 10:52 AM, shahab shahab.mok...@gmail.commailto:shahab.mok...@gmail.com wrote: Hi, I noticed that rdd.cache() is not happening immediately rather due to lazy feature of Spark, it is happening just at the moment you perform some map/reduce actions. Is this true? Yes, this is correct. If this is the case, how can I enforce Spark to cache immediately at its cache() statement? I need this to perform some benchmarking and I need to separate rdd caching and rdd transformation/action processing time. The typical solution I think is to run rdd.foreach(_ = ()) to trigger a calculation.
Re: Low Level Kafka Consumer for Spark
My main complain about the WAL mechanism in the new reliable kafka receiver is that you have to enable checkpointing and for some reason, even if spark.cleaner.ttl is set to a reasonable value, only the metadata is cleaned periodically. In my tests, using a folder in my filesystem as the checkpoint folder, the receivedMetaData folder remains almost constant in size but the receivedData folder is always increasing; the spark.cleaner.ttl was configured to 300 seconds. 2014-12-03 10:13 GMT+00:00 Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com: Hi, Yes, as Jerry mentioned, the Spark -3129 ( https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature which solves the Driver failure problem. The way 3129 is designed , it solved the driver failure problem agnostic of the source of the stream ( like Kafka or Flume etc) But with just 3129 you can not achieve complete solution for data loss. You need a reliable receiver which should also solves the data loss issue on receiver failure. The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer) for which this email thread was started has solved that problem with Kafka Low Level API. And Spark-4062 as Jerry mentioned also recently solved the same problem using Kafka High Level API. On the Kafka High Level Consumer API approach , I would like to mention that Kafka 0.8 has some issue as mentioned in this wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design) where consumer re-balance sometime fails and that is one of the key reason Kafka is re-writing consumer API in Kafka 0.9. I know there are few folks already have faced this re-balancing issues while using Kafka High Level API , and If you ask my opinion, we at Pearson are still using the Low Level Consumer as this seems to be more robust and performant and we have been using this for few months without any issue ..and also I may be little biased :) Regards, Dibyendu On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi Rod, The purpose of introducing WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain. The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler. Thanks Jerry -Original Message- From: RodrigoB [mailto:rodrigo.boav...@aspect.com] Sent: Wednesday, December 3, 2014 5:44 AM To: u...@spark.incubator.apache.org Subject: Re: Low Level Kafka Consumer for Spark Dibyendu, Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does filter on an RDD scan every data item ?
take(1000) merely takes the first 1000 elements of an RDD. I don't imagine that's what the OP means. filter() is how you select a subset of elements to work with. Yes, this requires evaluating the predicate on all 10M elements, at least once. I don't think you could avoid this in general, right, in any system? You might be able to take advantage of additional info you have. For example if you have a particular partitioning system and you know that elements of interest are only in one partition, you could create a more efficient version with mapPartitions that simply drops other partitions. Same answer to your second question. It sounds like you expect that Spark somehow has an index over keys, but it does not. It has no special notion of where your keys are or what they are. This changes a bit if you mean you are using the SQL APIs, but it doesn't sound like you are. On Tue, Dec 2, 2014 at 10:17 AM, Gen gen.tan...@gmail.com wrote: Hi, For your first question, I think that we can use /sc.parallelize(rdd.take(1000))/ For your second question, I am not sure. But I don't think that we can restricted filter within certain partition without scan every element. Cheers Gen nsareen wrote Hi , I wanted some clarity into the functioning of Filter function of RDD. 1) Does filter function scan every element saved in RDD? if my RDD represents 10 Million rows, and if i want to work on only 1000 of them, is there an efficient way of filtering the subset without having to scan every element ? 2) If my RDD represents a Key / Value data set. When i filter this data set of 10 Million rows, can i specify that the search should be restricted to only partitions which contain specific keys ? Will spark run by filter operation on all partitions if the partitions are done by key, irrespective the key exists in a partition or not ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20174.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Failed fetch: Could not get block(s)
I am using Spark 1.1.1. I am seeing an issue that only appears when I run in standalone clustered mode with at least 2 workers. The workers are on separate physical machines. I am performing a simple join on 2 RDDs. After the join I run first() on the joined RDD (in Scala) to get the first result. When this first() runs on Worker A it works fine; when the first() runs on worker B I get an error 'Fetch Failure'. I looked at the work stderr log for worker B. It shows the following exception: INFO BlockFetcherInterator$BasicBlockFetcherIterator: Started 2 remote fetches in 2 msERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(, )java.io.IOException: sendMessageReliably failed because ack was not received within 60 secat org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866). It is trying to connect to the ConnectionManager for BlockManager on Worker A from Worker B. It manages to connect, but it always times out. When I try to connect via telnet I see the same: it connects, but I don't get anything back from the host I noticed that two other people reported this issue http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Freezing-while-running-TPC-H-query-5-td14902.html . Unfortunately there was no meaningful progress. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-fetch-Could-not-get-block-s-tp20262.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLlib Naive Bayes classifier confidence
Probabilities won't sum to 1 since this expression doesn't incorporate the probability of the evidence, I imagine? it's constant across classes so is usually excluded. It would appear as a - log(P(evidence)) term. On Tue, Dec 2, 2014 at 10:44 AM, MariusFS marius.fete...@sien.com wrote: Are we sure that exponentiating will give us the probabilities? I did some tests by cloning the MLLIb class and adding the required code but the calculated probabilities do not add up to 1. I tried something like : def predictProbs(testData: Vector): (BDV[Double], BDV[Double]) = { val logProbs = brzPi + brzTheta * new BDV[Double](testData.toArray) val probs = logProbs.map(x = math.exp(x)) (logProbs, probs) } This was because I need the actual probs to process downstream from this... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Naive-Bayes-classifier-confidence-tp18456p20175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark MOOC by Berkeley and Databricks
Hello everybody, in case you missed DataBricks and Berkeley have announced a free mooc on spark and another one on scalable machine learning using spark. Both courses are free but if you want to have a verified certificate of completion you need to donate at least 50$. I did it, it's a great investment! Here's the link with all the info http://databricks.com/blog/2014/12/02/announcing-two-spark-based-moocs.html Have a nice day. MD
Providing query dsl to Elasticsearch for Spark (2.1.0.Beta3)
Hi, I'm trying the Elasticsearch support for Spark (2.1.0.Beta3). In the following I provide the query (as query dsl): import org.elasticsearch.spark._ object TryES { val sparkConf = new SparkConf().setAppName(Campaigns) sparkConf.set(es.nodes, es_cluster:9200) sparkConf.set(es.nodes.discovery, false) val sc = new SparkContext(sparkConf) def main(args: Array[String]) { val query = { query: { ... } } val campaigns = sc.esRDD(resource, query) campaigns.count(); } } However when I submit this (using spark-1.1.0-bin-hadoop2.4), I am experiencing the following exceptions: 14/12/03 14:55:27 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/03 14:55:27 INFO scheduler.DAGScheduler: Failed to run count at TryES.scala:... Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot open stream for resource { query: { ... } } Is the query dsl supported with esRDD, or am I missing something more fundamental? Huge thanks, ian - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.0.0 - RDD from snappy compress avro file
Ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-0-RDD-from-snappy-compress-avro-file-tp19998p20267.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS failure with size Integer.MAX_VALUE
Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And yes, I've been following the JIRA for the new ALS implementation. I'll try it out when it's ready for testing. . On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, You can try setting a small item blocks in this case. 1200 is definitely too large for ALS. Please try 30 or even smaller. I'm not sure whether this could solve the problem because you have 100 items connected with 10^8 users. There is a JIRA for this issue: https://issues.apache.org/jira/browse/SPARK-3735 which I will try to implement in 1.3. I'll ping you when it is ready. Best, Xiangrui On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
Re: Help understanding - Not enough space to cache rdd
hmm.. 33.6gb is sum of the memory used by the two RDD that is cached. You're right when I put serialized RDDs in the cache, the memory foot print for these rdds become a lot smaller. Serialized Memory footprint shown below: RDD NameStorage Level Cached Partitions Fraction Cached Size in Memory Size in Tachyon Size on Disk 2 Memory Serialized 1x Replicated 239 100%3.1 GB 0.0 B 0.0 B 5 Memory Serialized 1x Replicated 100 100%1254.9 MB 0.0 B 0.0 B I don't know what is 73.7 reflective of. I am able to verify in the application UI, I am able to see 4.3 GB Used out of (73.7 GB Total) by the cahced RDD. I am not sure how that is 73.7 is calculated. I have following configuration: conf.set(spark.storage.memoryFraction, 0.9); conf.set(spark.shuffle.memoryFraction,0.1); Based on my understanding, 0.9 * 95g (memory allocated to the driver) = 85.5 g should be the available memory, correct? Out of which 1 % is taken out for shuffle(~85.5-8.55=76.95)! which would lead to 76.95 gb usable memory. Is that right? The two RDD that is cached is not using nearly as much. The two systematic problem that I am avoiding is MAX_INTEGER and Requested array size exceeds VM limit No matter how much I tweak the parallelism/memory configuration, there seems to be little or no impact. Is there someone, who can help me understand the internals, so that I can get this working. I know this platform is great viable solution for the use case we have in mind, if I can get it running successfully. At this point, the data size is not that huge compared to some white papers that are published. So, I am thinking it boils down to the configuration and validating what I have with an expert. We can take this offline, if need be. Please feel free to email me directly. Thank you, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-understanding-Not-enough-space-to-cache-rdd-tp20186p20269.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Serializing with Kryo NullPointerException - Java
Hi all, I am having troubles using Kryo and being new to this kind of serialization, I am not sure where to look. Can someone please help me? :-) Here is my custom class: public class *DummyClass* implements KryoSerializable { private static final Logger LOGGER = LoggerFactory.getLogger(DummyClass.class); int value; public DummyClass() { } public DummyClass(int value) { LOGGER.info(hey I'm dum {}!, value); this.value = value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public void write(Kryo kryo, Output output) { output.writeInt(value); } @Override public void read(Kryo kryo, Input input) { this.value = input.readInt(); } } Here is my registrator: public class MyKryoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { kryo.register(DummyClass.class); } } And the *Spark* code: SparkConf sparkConf = new SparkConf() .setAppName(appName) .setMaster(master) .setJars(jars) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, org.roke.main.MyKryoRegistrator); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); ListDummyClass dummyClasses = Arrays.asList( new DummyClass(1), new DummyClass(2), new DummyClass(3), new DummyClass(4) ); JavaRDDDummyClass rdd = sparkContext.parallelize(dummyClasses); for (DummyClass dummyClass: rdd.collect()) LOGGER.info(driver collected {}, dummyClass); The program fails with the following NullPointerException: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.21.6.68): java.lang.NullPointerException: com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36) com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80) org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123) org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be
Re: what is the best way to implement mini batches?
I am trying to do the same thing and also wondering what the best strategy is. Thanks From: ll duy.huynh@gmail.com Sent: Wednesday, December 3, 2014 10:28 AM To: u...@spark.incubator.apache.org Subject: what is the best way to implement mini batches? hi. what is the best way to pass through a large dataset in small, sequential mini batches? for example, with 1,000,000 data points and the mini batch size is 10, we would need to do some computation at these mini batches (0..9), (10..19), (20..29), ... (N-9, N) RDD.repartition(N/10).mapPartitions() work? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SQL] Wildcards in SQLContext.parquetFile?
Hi folks, I'm wondering if someone has successfully used wildcards with a parquetFile call? I saw this thread and it makes me think no? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3CCACA1tWLjcF-NtXj=pqpqm3xk4aj0jitxjhmdqbojj_ojybo...@mail.gmail.com%3E I have a set of parquet files that are partitioned by key. I'd like to issue a query to read in a subset of the files, based on a directory wildcard (the wildcard will be a little more specific than * but this is to show the issue): This call works fine: sc.textFile(hdfs:///warehouse/hive/*/*/*.parquet).first res4: String = PAR1? L??? ?\??? , ,a??aL0?xU???e?? but this doesn't scala val parquetFile = sqlContext.parquetFile(“hdfs:///warehouse/hive/*/*/*.parquet”).first java.io.FileNotFoundException: File hdfs://cdh4-14822-nn/warehouse/hive/*/*/*.parquet does not exist
Re: Help understanding - Not enough space to cache rdd
I think, the memory calculation is correct, what I didn't account for is the memory used. I am still puzzled as how I can successfully process the RDD in spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-understanding-Not-enough-space-to-cache-rdd-tp20186p20273.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX Pregel halting condition
I'm trying to implement a graph algorithm that does a form of path searching. Once a certain criteria is met on any path in the graph, I wanted to halt the rest of the iterations. But I can't see how to do that with the Pregel API, since any vertex isn't able to know the state of other arbitrary vertex (if they're not adjacent). Is there a common pattern for doing something like this? I was thinking of using a custom accumulator where the zero is true and the addInPlace is a boolean or. Each vertex (as part of its vprog) could add to the accumulator, and once a path is found which meets the condition, the accumulator would then have a value of false. But since workers can't read accumulators, I don't see how to use that when knowing whether to iterate again. That is, unless I reimplement the Pregel class with the added check when iterating... Any suggestions? Thanks in advance!
Re: heterogeneous cluster setup
I don't have a great answer for you. For us, we found a common divisor, not necessarily a whole gigabyte, of the available memory of the different hardware and used that as the amount of memory per worker and scaled the number of cores accordingly so that every core in the system has the same amount of memory. The quotient of the available memory and the common divisor, hopefully a whole number to reduce waste, was the number of workers we spun up. Therefore, if you have 64G, 30G, and 15G available memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1 worker per machine. Every worker on all the machines would have the same number of cores, set to what you think is a good value. Hope that helps. On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote: Hi Victor, I want to setup a heterogeneous stand-alone spark cluster. I have hardware with different memory sizes and varied number of cores per node. I could get all the nodes active in the cluster only when the size of memory per executor is set as the least available memory size of all nodes and is same with no.of cores/executor. As of now, I configure one executor per node. Can you please suggest some path to set up a stand-alone heterogeneous cluster such that I can efficiently use the available hardware. Thank you _ Sent from http://apache-spark-user-list.1001560.n3.nabble.com
dockerized spark executor on mesos?
Just wondered if anyone had managed to start spark jobs on mesos wrapped in a docker container? At present (i.e. very early testing) I'm able to submit executors to mesos via spark-submit easily enough, but they fall over as we don't have a JVM on our slaves out of the box. I can push one out via our CM system if push comes to shove, but it'd be nice to have that as part of the job (I'm thinking it might be a way to get some of the dependencies deployed too). bear in mind I'm a total clueless newbie at this so please be gentle if I'm doing this completely wrong. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Monitoring Spark
hello, im running spark on stand alone station and im try to view the event log after the run is finished i turned on the event log as the site said (spark.eventLog.enabled set to true) but i can't find the log files or get the web ui to work. any idea on how to do this? thanks Isca
Re: SchemaRDD + SQL , loading projection columns
Thanks for the help.. Let me find more info on how to enable statistics in parquet. -Vishnu Michael Armbrust wrote There is not a super easy way to do what you are asking since in general parquet needs to read all the data in a column. As far as I understand it does not have indexes that would allow you to jump to a specific entry in a column. There is support for pushing down predicates, but unfortunately this is turned off by default (in Spark 1.2) due to bugs in the parquet library. Even with this feature though I believe you still read the data and just skip the cost of materializing the row. One thing that could speed up that particular query is to sort by 'rid before storing to parquet. Then (when filter pushdown is turned on), parquet will keep statistics on the min/max value for each column in a given row group. That would allow it to completely skip row groups that cannot contain a given 'rid. Michael On Tue, Dec 2, 2014 at 12:43 PM, Vishnusaran Ramaswamy vishnusaran@ wrote: Hi, I have 16 GB of parquet files in /tmp/logs/ folder with the following schema request_id(String), module(String), payload(Array[Byte]) Most of my 16 GB data is the payload field, the request_id, and module fields take less than 200 MB. I want to load the payload only when my filter condition matches. val sqlContext = new SQLContext(sc) val files = sqlContext.parquetFile(/tmp/logs) files.registerTempTable(logs) val filteredLogs = sqlContext.sql(select request_id, payload from logs where rid = 'dd4455ee' and module = 'query' ) when i run filteredLogs.collect.foreach(println) , i see all of the 16GB data loaded. How do I load only the columns used in filters first and then load the payload for the row matching the filter criteria? Let me know if this can be done in a different way. Thanks you, Vishnu. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-SQL-loading-projection-columns-tp20189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscribe@.apache For additional commands, e-mail: user-help@.apache -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-SQL-loading-projection-columns-tp20189p20278.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to enforce RDD to be cached?
Daniel and Paolo, thanks for the comments. best, /Shahab On Wed, Dec 3, 2014 at 3:12 PM, Paolo Platter paolo.plat...@agilelab.it wrote: Yes, otherwise you can try: rdd.cache().count() and then run your benchmark Paolo *Da:* Daniel Darabos daniel.dara...@lynxanalytics.com *Data invio:* mercoledì 3 dicembre 2014 12:28 *A:* shahab shahab.mok...@gmail.com *Cc:* user@spark.apache.org On Wed, Dec 3, 2014 at 10:52 AM, shahab shahab.mok...@gmail.com wrote: Hi, I noticed that rdd.cache() is not happening immediately rather due to lazy feature of Spark, it is happening just at the moment you perform some map/reduce actions. Is this true? Yes, this is correct. If this is the case, how can I enforce Spark to cache immediately at its cache() statement? I need this to perform some benchmarking and I need to separate rdd caching and rdd transformation/action processing time. The typical solution I think is to run rdd.foreach(_ = ()) to trigger a calculation.
MLLib: loading saved model
Hi All,I am using LinearRegressionWithSGD and then I save the model weights and intercept. File that contains weights have this format: 1.204550.13560.000456.. Intercept is 0 since I am using train not setting the intercept so it can be ignored for the moment. I would now like to initialize a new model object and using these saved weights from the above file. We are using CDH 5.1 Something along these lines: val weights = sc.textFile(linear-weights);val model = new LinearRegressionWithSGD(weights); then use is as: val valuesAndPreds = testData.map { point = val prediction = model.predict(point.features) (point.label, prediction)} Any pointers to how do I do that?
Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
inferSchema() will work better than jsonRDD() in your case, from pyspark.sql import Row srdd = sqlContext.inferSchema(rdd.map(lambda x: Row(**x))) srdd.first() Row( field1=5, field2='string', field3={'a'=1, 'c'=2}) On Wed, Dec 3, 2014 at 12:11 AM, sahanbull sa...@skimlinks.com wrote: Hi Guys, I am trying to use SparkSQL to convert an RDD to SchemaRDD so that I can save it in parquet format. A record in my RDD has the following format: RDD1 { field1:5, field2: 'string', field3: {'a':1, 'c':2} } I am using field3 to represent a sparse vector and it can have keys: 'a','b' or 'c' and values any int value The current approach I am using is : schemaRDD1 = sqc.jsonRDD(RDD1.map(lambda x: simplejson.dumps(x))) But when I do this, the dictionary in field 3 also gets converted to a SparkSQL Row. This converts field3 to be a dense data structure where it holds value None for every key that is not present in the field 3 for each record. When I do test = RDD1.map(lambda x: simplejson.dumps(x)) test.first() my output is: {field1: 5, field2:string, field3 :{a:1,c:2}} But then when I do schemaRDD = sqc.jsonRDD(test) schemaRDD.first() my output is : Row( field1=5, field2='string', field3 = Row(a=1,b=None,c=2)) in realty, I have 1000s of probable keys in field 3 and only 2 to 3 of them occur per record. So When tic converts to a Row, it generates thousands of None fields per record. Is there anyways for me to store field3 as a dictionary instead of converting it into a Row in the schemaRDD?? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX Pregel halting condition
I'm trying to implement a graph algorithm that does a form of path searching. Once a certain criteria is met on any path in the graph, I wanted to halt the rest of the iterations. But I can't see how to do that with the Pregel API, since any vertex isn't able to know the state of other arbitrary vertex (if they're not adjacent). Is there a common pattern for doing something like this? I was thinking of using a custom accumulator where the zero is true and the addInPlace is a boolean or. Each vertex (as part of its vprog) could add to the accumulator, and once a path is found which meets the condition, the accumulator would then have a value of false. But since workers can't read accumulators, I don't see how to use that when knowing whether to iterate again. That is, unless I reimplement the Pregel class with the added check when iterating... Any suggestions? Thanks in advance!
Re: How to enforce RDD to be cached?
shahabm wrote I noticed that rdd.cache() is not happening immediately rather due to lazy feature of Spark, it is happening just at the moment you perform some map/reduce actions. Is this true? Yes, .cache() is a transformation (lazy evaluation) shahabm wrote If this is the case, how can I enforce Spark to cache immediately at its cache() statement? I need this to perform some benchmarking and I need to separate rdd caching and rdd transformation/action processing time. put an action immediately after .cache() .cache().first() may be low impact, as it only returns the first element of the RDD, rather than iterating. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enforce-RDD-to-be-cached-tp20230p20284.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How can I read an avro file in HDFS in Java?
Hi, Try using sc.newAPIHadoopFile(hdfs path to your file, AvroSequenceFileInputFormat.class, AvroKey.class, AvroValue.class, your Configuration) You will get the Avro related classes by importing org.apache.avro.* Thanks. On Tue, Dec 2, 2014 at 9:23 PM, leaviva [via Apache Spark User List] ml-node+s1001560n20173...@n3.nabble.com wrote: How can I read an avro file in HDFS ? I try use newAPIHadoopFile but I don't know how can i use it -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-read-an-avro-file-in-HDFS-in-Java-tp20173.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-read-an-avro-file-in-HDFS-in-Java-tp20173p20285.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: WordCount fails in .textFile() method
For others who may be having a similar problem: The error below occurs when using Yarn, which uses an earlier version of Guava compared to Spark 1.1.0. When packaging using Maven, if you put the Yarn dependency above the Spark dependency, the earlier version of guava is the one that gets recognized, leading to the NoSuchMethodError. I solved the problem by simply switching the order of the dependencies.. Rahul From: Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com Date: Wednesday, December 3, 2014 at 4:02 AM To: Rahul Swaminathan rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: WordCount fails in .textFile() method Resent-From: rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu dump your classpath, looks like you have multiple versions of guava jars in the classpath. Thanks Best Regards On Wed, Dec 3, 2014 at 2:30 PM, Rahul Swaminathan rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu wrote: I've tried that and the same error occurs. Do you have any other suggestions? Thanks! Rahul From: Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com Date: Wednesday, December 3, 2014 at 3:55 AM To: Rahul Swaminathan rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: WordCount fails in .textFile() method Resent-From: rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu Try running it in local mode. Looks like a jar conflict/missing. SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(local[2]).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(myJar.jar); new JavaWordCount(jsc).doJob(); Thanks Best Regards On Wed, Dec 3, 2014 at 2:49 AM, Rahul Swaminathan rahul.swaminat...@duke.edumailto:rahul.swaminat...@duke.edu wrote: Hi, I am trying to run JavaWordCount without using the spark-submit script. I have copied the source code for JavaWordCount and am using a JavaSparkContext with the following: SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(spark://127.0.0.1:7077http://127.0.0.1:7077).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(myJar.jar); new JavaWordCount(jsc).doJob(); I am getting the following error in the .textFile() method: Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.orghttp://org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) What can I do to solve this issue? It works fine when running from command line with spark-submit script. Thanks, Rahul
Re: [SQL] Wildcards in SQLContext.parquetFile?
It won't work until this is merged: https://github.com/apache/spark/pull/3407 On Wed, Dec 3, 2014 at 9:25 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, I'm wondering if someone has successfully used wildcards with a parquetFile call? I saw this thread and it makes me think no? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3CCACA1tWLjcF-NtXj=pqpqm3xk4aj0jitxjhmdqbojj_ojybo...@mail.gmail.com%3E I have a set of parquet files that are partitioned by key. I'd like to issue a query to read in a subset of the files, based on a directory wildcard (the wildcard will be a little more specific than * but this is to show the issue): This call works fine: sc.textFile(hdfs:///warehouse/hive/*/*/*.parquet).first res4: String = PAR1? L??? ?\??? , ,a??aL0?xU???e?? but this doesn't scala val parquetFile = sqlContext.parquetFile(“hdfs:///warehouse/hive/*/*/*.parquet”).first java.io.FileNotFoundException: File hdfs://cdh4-14822-nn/warehouse/hive/*/*/*.parquet does not exist
Re: object xxx is not a member of package com
Hi, Add the jars in the external library of you related project. Right click on package or class - Build Path - Configure Build Path - Java Build Path - Select the Libraries tab - Add external library - Browse to com.xxx.yyy.zzz._ - ok Clean and build your project, most probably you will be able to pull the classes from com.xxx.yyy.zzz._ package. Thanks. On Wed, Dec 3, 2014 at 4:29 AM, flyson [via Apache Spark User List] ml-node+s1001560n20205...@n3.nabble.com wrote: Hello everyone, Could anybody tell me how to import and call the 3rd party java classes from inside spark? Here's my case: I have a jar file (the directory layout is com.xxx.yyy.zzz) which contains some java classes, and I need to call some of them in spark code. I used the statement import com.xxx.yyy.zzz._ on top of the impacted spark file and set the location of the jar file in the CLASSPATH environment, and use .sbt/sbt assembly to build the project. As a result, I got an error saying object xxx is not a member of package com. I thought that this could be related to the dependencies, but couldn't figure it out. Any suggestion/solution from you would be appreciated! Thanks! -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/object-xxx-is-not-a-member-of-package-com-tp20205.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/object-xxx-is-not-a-member-of-package-com-tp20205p20288.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Does filter on an RDD scan every data item ?
nsareen wrote 1) Does filter function scan every element saved in RDD? if my RDD represents 10 Million rows, and if i want to work on only 1000 of them, is there an efficient way of filtering the subset without having to scan every element ? using .take(1000) may be a biased sample. you may want to consider sampling your RDD (with or without replacement) using a seed for randomization, using .takeSample() eg. rdd.takeSample(false, 1000, 1) this returns an Array, from which you could create another RDD. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20289.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does filter on an RDD scan every data item ?
also available is .sample(), which will randomly sample your RDD with or without replacement, and returns an RDD. .sample() takes a fraction, so it doesn't return an exact number of elements. eg. rdd.sample(true, .0001, 1) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20290.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Insert new data into specific partition of an RDD
I'm not sure about .union(), but at least in the case of .join(), as long as you have hash partitioned the original RDDs and persisted them, calls to .join() take advantage of already knowing which partition the keys are on, and will not repartition rdd1. val rdd1 = log.partitionBy(new HashPartitioner(10)).persist() val rdd3 = rdd1.join(rdd2) I suspect you want to use one of the key aware operations anyways, rather than .union() I know other operations are also partitioner aware like this, though I don't know which ones. Perhaps use the partitioner property in order to test your operation? cheers, ds -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Insert-new-data-into-specific-partition-of-an-RDD-tp20156p20291.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Announcing Spark 1.1.1!
About version compatibility and upgrade path - can the Java application dependencies and the Spark server be upgraded separately (i.e. will 1.1.0 library work with 1.1.1 server, and vice versa), or do they need to be upgraded together? Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or and...@databricks.com wrote: I am happy to announce the availability of Spark 1.1.1! This is a maintenance release with many bug fixes, most of which are concentrated in the core. This list includes various fixes to sort-based shuffle, memory leak, and spilling issues. Contributions from this release came from 55 developers. Visit the release notes [1] to read about the new features, or download [2] the release today. [1] http://spark.apache.org/releases/spark-release-1-1-1.html [2] http://spark.apache.org/downloads.html Please e-mail me directly for any typo's in the release notes or name listing. Thanks for everyone who contributed, and congratulations! -Andrew
Re: Announcing Spark 1.1.1!
By the Spark server do you mean the standalone Master? It is best if they are upgraded together because there have been changes to the Master in 1.1.1. Although it might just work, it's highly recommended to restart your cluster manager too. 2014-12-03 13:19 GMT-08:00 Romi Kuntsman r...@totango.com: About version compatibility and upgrade path - can the Java application dependencies and the Spark server be upgraded separately (i.e. will 1.1.0 library work with 1.1.1 server, and vice versa), or do they need to be upgraded together? Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or and...@databricks.com wrote: I am happy to announce the availability of Spark 1.1.1! This is a maintenance release with many bug fixes, most of which are concentrated in the core. This list includes various fixes to sort-based shuffle, memory leak, and spilling issues. Contributions from this release came from 55 developers. Visit the release notes [1] to read about the new features, or download [2] the release today. [1] http://spark.apache.org/releases/spark-release-1-1-1.html [2] http://spark.apache.org/downloads.html Please e-mail me directly for any typo's in the release notes or name listing. Thanks for everyone who contributed, and congratulations! -Andrew
Re: Problem creating EC2 cluster using spark-ec2
Yeah this is currently broken for 1.1.1. I will submit a fix later today. 2014-12-02 17:17 GMT-08:00 Shivaram Venkataraman shiva...@eecs.berkeley.edu : +Andrew Actually I think this is because we haven't uploaded the Spark binaries to cloudfront / pushed the change to mesos/spark-ec2. Andrew, can you take care of this ? On Tue, Dec 2, 2014 at 5:11 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Interesting. Do you have any problems when launching in us-east-1? What is the full output of spark-ec2 when launching a cluster? (Post it to a gist if it’s too big for email.) On Mon, Dec 1, 2014 at 10:34 AM, Dave Challis dave.chal...@aistemos.com wrote: I've been trying to create a Spark cluster on EC2 using the documentation at https://spark.apache.org/docs/latest/ec2-scripts.html (with Spark 1.1.1). Running the script successfully creates some EC2 instances, HDFS etc., but appears to fail to copy the actual files needed to run Spark across. I ran the following commands: $ cd ~/src/spark-1.1.1/ec2 $ ./spark-ec2 --key-pair=* --identity-file=* --slaves=1 --region=eu-west-1 --zone=eu-west-1a --instance-type=m3.medium --no-ganglia launch foocluster I see the following in the script's output: (instance and HDFS set up happens here) ... Persistent HDFS installed, won't start by default... ~/spark-ec2 ~/spark-ec2 Setting up spark-standalone RSYNC'ing /root/spark/conf to slaves... *.eu-west-1.compute.amazonaws.com RSYNC'ing /root/spark-ec2 to slaves... *.eu-west-1.compute.amazonaws.com ./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No such file or directory ./spark-standalone/setup.sh: line 27: /root/spark/sbin/start-master.sh: No such file or directory ./spark-standalone/setup.sh: line 33: /root/spark/sbin/start-slaves.sh: No such file or directory Setting up tachyon RSYNC'ing /root/tachyon to slaves... ... (Tachyon setup happens here without any problem) I can ssh to the master (using the ./spark-ec2 login), and looking in /root/, it contains: $ ls /root ephemeral-hdfs hadoop-native mapreduce persistent-hdfs scala shark spark spark-ec2 tachyon If I look in /root/spark (where the sbin directory should be found), it only contains a single 'conf' directory: $ ls /root/spark conf Any idea why spark-ec2 might have failed to copy these files across? Thanks, Dave - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dockerized spark executor on mesos?
I'd suggest asking about this on the Mesos list (CCed). As far as I know, there was actually some ongoing work for this. Matei On Dec 3, 2014, at 9:46 AM, Dick Davies d...@hellooperator.net wrote: Just wondered if anyone had managed to start spark jobs on mesos wrapped in a docker container? At present (i.e. very early testing) I'm able to submit executors to mesos via spark-submit easily enough, but they fall over as we don't have a JVM on our slaves out of the box. I can push one out via our CM system if push comes to shove, but it'd be nice to have that as part of the job (I'm thinking it might be a way to get some of the dependencies deployed too). bear in mind I'm a total clueless newbie at this so please be gentle if I'm doing this completely wrong. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Announcing Spark 1.1.1!
Because this was a maintenance release, we should not have introduced any binary backwards or forwards incompatibilities. Therefore, applications that were written and compiled against 1.1.0 should still work against a 1.1.1 cluster, and vice versa. On Wed, Dec 3, 2014 at 1:30 PM, Andrew Or and...@databricks.com wrote: By the Spark server do you mean the standalone Master? It is best if they are upgraded together because there have been changes to the Master in 1.1.1. Although it might just work, it's highly recommended to restart your cluster manager too. 2014-12-03 13:19 GMT-08:00 Romi Kuntsman r...@totango.com: About version compatibility and upgrade path - can the Java application dependencies and the Spark server be upgraded separately (i.e. will 1.1.0 library work with 1.1.1 server, and vice versa), or do they need to be upgraded together? Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or and...@databricks.com wrote: I am happy to announce the availability of Spark 1.1.1! This is a maintenance release with many bug fixes, most of which are concentrated in the core. This list includes various fixes to sort-based shuffle, memory leak, and spilling issues. Contributions from this release came from 55 developers. Visit the release notes [1] to read about the new features, or download [2] the release today. [1] http://spark.apache.org/releases/spark-release-1-1-1.html [2] http://spark.apache.org/downloads.html Please e-mail me directly for any typo's in the release notes or name listing. Thanks for everyone who contributed, and congratulations! -Andrew
Re: Problem creating EC2 cluster using spark-ec2
This should be fixed now. Thanks for bringing this to our attention. 2014-12-03 13:31 GMT-08:00 Andrew Or and...@databricks.com: Yeah this is currently broken for 1.1.1. I will submit a fix later today. 2014-12-02 17:17 GMT-08:00 Shivaram Venkataraman shiva...@eecs.berkeley.edu: +Andrew Actually I think this is because we haven't uploaded the Spark binaries to cloudfront / pushed the change to mesos/spark-ec2. Andrew, can you take care of this ? On Tue, Dec 2, 2014 at 5:11 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Interesting. Do you have any problems when launching in us-east-1? What is the full output of spark-ec2 when launching a cluster? (Post it to a gist if it’s too big for email.) On Mon, Dec 1, 2014 at 10:34 AM, Dave Challis dave.chal...@aistemos.com wrote: I've been trying to create a Spark cluster on EC2 using the documentation at https://spark.apache.org/docs/latest/ec2-scripts.html (with Spark 1.1.1). Running the script successfully creates some EC2 instances, HDFS etc., but appears to fail to copy the actual files needed to run Spark across. I ran the following commands: $ cd ~/src/spark-1.1.1/ec2 $ ./spark-ec2 --key-pair=* --identity-file=* --slaves=1 --region=eu-west-1 --zone=eu-west-1a --instance-type=m3.medium --no-ganglia launch foocluster I see the following in the script's output: (instance and HDFS set up happens here) ... Persistent HDFS installed, won't start by default... ~/spark-ec2 ~/spark-ec2 Setting up spark-standalone RSYNC'ing /root/spark/conf to slaves... *.eu-west-1.compute.amazonaws.com RSYNC'ing /root/spark-ec2 to slaves... *.eu-west-1.compute.amazonaws.com ./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No such file or directory ./spark-standalone/setup.sh: line 27: /root/spark/sbin/start-master.sh: No such file or directory ./spark-standalone/setup.sh: line 33: /root/spark/sbin/start-slaves.sh: No such file or directory Setting up tachyon RSYNC'ing /root/tachyon to slaves... ... (Tachyon setup happens here without any problem) I can ssh to the master (using the ./spark-ec2 login), and looking in /root/, it contains: $ ls /root ephemeral-hdfs hadoop-native mapreduce persistent-hdfs scala shark spark spark-ec2 tachyon If I look in /root/spark (where the sbin directory should be found), it only contains a single 'conf' directory: $ ls /root/spark conf Any idea why spark-ec2 might have failed to copy these files across? Thanks, Dave - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to create a new SchemaRDD which is not based on original SparkPlan?
Hi All, My question is about lazy running mode for SchemaRDD, I guess. I know lazy mode is good, however, I still have this demand. For example, here is the first SchemaRDD, named result.(select * from table where num1 and num 4): results: org.apache.spark.sql.SchemaRDD = SchemaRDD[59] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Filter ((num#0 1) (num#0 4)) ExistingRdd [num#0,str1#1,str2#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:208 Then I create the second RDD with: select num, str1 from table from result results1: org.apache.spark.sql.SchemaRDD = SchemaRDD[60] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Project [num#0,str1#1] Filter ((num#0 1) (num#0 4)) ExistingRdd [num#0,str1#1,str2#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:208 Actually, I want the second RDD's plan is based on result not the original table. How can I create a new SchemaRDD whose plan starts from last RDD? Thanks, Tim
Re: Alternatives to groupByKey
I think it would depend on the type and amount of information you're collecting. If you're just trying to collect small numbers for each window, and don't have an overwhelming number of windows, you might consider using accumulators. Just make one per value per time window, and for each data point, add it to the accumulators for the time windows in which it belongs. We've found this approach a lot faster than anything involving a shuffle. This should work fine for stuff like max(), min(), and mean() If you're collecting enough data that accumulators are impractical, I think I would try multiple passes. Cache your data, and for each pass, filter to that window, and perform all your operations on the filtered RDD. Because of the caching, it won't be significantly slower than processing it all at once - in fact, it will probably be a lot faster, because the shuffles are shuffling less information. This is similar to what you're suggesting about partitioning your rdd, but probably simpler and easier. That being said, your restriction 3 seems to be in contradiction to the rest of your request - if your aggregation needs to be able to look at all the data at once, then that seems contradictory to viewing the data through an RDD. Could you explain a bit more what you mean by that? -Nathan On Wed, Dec 3, 2014 at 4:26 PM, ameyc ambr...@gmail.com wrote: Hi, So my Spark app needs to run a sliding window through a time series dataset (I'm not using Spark streaming). And then run different types on aggregations on per window basis. Right now I'm using a groupByKey() which gives me Iterables for each window. There are a few concerns I have with this approach: 1. groupByKey() could potentially fail for a key not fitting in the memory. 2. I'd like to run aggregations like max(), mean() on each of the groups, it'd be nice to have the RDD functionality at this point instead of the iterables. 3. I can't use reduceByKey() or aggregateByKey() are some of my aggregations need to have a view of the entire window. Only other way I could think of is partitioning my RDDs into multiple RDDs with each RDD representing a window. Is this a sensible approach? Or is there any other way of going about this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-tp20293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Spark executor lost
We are using Spark job server to submit spark jobs (our spark version is 0.91). After running the spark job server for a while, we often see the following errors (executor lost) in the spark job server log. As a consequence, the spark driver (allocated inside spark job server) gradually loses executors. And finally the spark job server no longer be able to submit jobs. We tried to google the solutions but so far no luck. Please help if you have any ideas. Thanks! [2014-11-25 01:37:36,250] INFO parkDeploySchedulerBackend [] [akka://JobServer/user/context-supervisor/next-staging] - Executor 6 disconnected, so removing it[2014-11-25 01:37:36,252] ERROR cheduler.TaskSchedulerImpl [] [akka://JobServer/user/context-supervisor/next-staging] - Lost executor 6 on : remote Akka client disassociated[2014-11-25 01:37:36,252] INFO ark.scheduler.DAGScheduler [] [] - Executor lost: 6 (epoch 8)[2014-11-25 01:37:36,252] INFO ge.BlockManagerMasterActor [] [] - Trying to remove executor 6 from BlockManagerMaster.[2014-11-25 01:37:36,252] INFO storage.BlockManagerMaster [] [] - Removed 6 successfully in removeExecutor[2014-11-25 01:37:36,286] INFO ient.AppClient$ClientActor [] [akka://JobServer/user/context-supervisor/next-staging] - Executor updated: app-20141125002023-0037/6 is now FAILED (Command exited with code 143)
RE: Spark executor lost
You want to look further up the stack (there are almost certainly other errors before this happens) and those other errors may give your better idea of what is going on. Also if you are running on yarn you can run yarn logs -applicationId yourAppId to get the logs from the data nodes. Sent with Good (www.good.com) -Original Message- From: S. Zhou [myx...@yahoo.com.INVALIDmailto:myx...@yahoo.com.INVALID] Sent: Wednesday, December 03, 2014 06:30 PM Eastern Standard Time To: user@spark.apache.org Subject: Spark executor lost We are using Spark job server to submit spark jobs (our spark version is 0.91). After running the spark job server for a while, we often see the following errors (executor lost) in the spark job server log. As a consequence, the spark driver (allocated inside spark job server) gradually loses executors. And finally the spark job server no longer be able to submit jobs. We tried to google the solutions but so far no luck. Please help if you have any ideas. Thanks! [2014-11-25 01:37:36,250] INFO parkDeploySchedulerBackend [] [akka://JobServer/user/context-supervisor/next-staging] - Executor 6 disconnected, so removing it [2014-11-25 01:37:36,252] ERROR cheduler.TaskSchedulerImpl [] [akka://JobServer/user/context-supervisor/next-staging] - Lost executor 6 on : remote Akka client disassociated [2014-11-25 01:37:36,252] INFO ark.scheduler.DAGScheduler [] [] - Executor lost: 6 (epoch 8) [2014-11-25 01:37:36,252] INFO ge.BlockManagerMasterActor [] [] - Trying to remove executor 6 from BlockManagerMaster. [2014-11-25 01:37:36,252] INFO storage.BlockManagerMaster [] [] - Removed 6 successfully in removeExecutor [2014-11-25 01:37:36,286] INFO ient.AppClient$ClientActor [] [akka://JobServer/user/context-supervisor/next-staging] - Executor updated: app-20141125002023-0037/6 is now FAILED (Command exited with code 143) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Alternatives to groupByKey
do these requirements boils down to a need for foldLeftByKey with sorting of the values? https://issues.apache.org/jira/browse/SPARK-3655 On Wed, Dec 3, 2014 at 6:34 PM, Xuefeng Wu ben...@gmail.com wrote: I have similar requirememt,take top N by key. right now I use groupByKey,but one key would group more than half data in some dataset. Yours, Xuefeng Wu 吴雪峰 敬上 On 2014年12月4日, at 上午7:26, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I think it would depend on the type and amount of information you're collecting. If you're just trying to collect small numbers for each window, and don't have an overwhelming number of windows, you might consider using accumulators. Just make one per value per time window, and for each data point, add it to the accumulators for the time windows in which it belongs. We've found this approach a lot faster than anything involving a shuffle. This should work fine for stuff like max(), min(), and mean() If you're collecting enough data that accumulators are impractical, I think I would try multiple passes. Cache your data, and for each pass, filter to that window, and perform all your operations on the filtered RDD. Because of the caching, it won't be significantly slower than processing it all at once - in fact, it will probably be a lot faster, because the shuffles are shuffling less information. This is similar to what you're suggesting about partitioning your rdd, but probably simpler and easier. That being said, your restriction 3 seems to be in contradiction to the rest of your request - if your aggregation needs to be able to look at all the data at once, then that seems contradictory to viewing the data through an RDD. Could you explain a bit more what you mean by that? -Nathan On Wed, Dec 3, 2014 at 4:26 PM, ameyc ambr...@gmail.com wrote: Hi, So my Spark app needs to run a sliding window through a time series dataset (I'm not using Spark streaming). And then run different types on aggregations on per window basis. Right now I'm using a groupByKey() which gives me Iterables for each window. There are a few concerns I have with this approach: 1. groupByKey() could potentially fail for a key not fitting in the memory. 2. I'd like to run aggregations like max(), mean() on each of the groups, it'd be nice to have the RDD functionality at this point instead of the iterables. 3. I can't use reduceByKey() or aggregateByKey() are some of my aggregations need to have a view of the entire window. Only other way I could think of is partitioning my RDDs into multiple RDDs with each RDD representing a window. Is this a sensible approach? Or is there any other way of going about this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-tp20293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
single key-value pair fitting in memory
Hi, In the talk A Deeper Understanding of Spark Internals, it was mentioned that for some operators, spark can spill to disk across keys (in 1.1 - .groupByKey(), .reduceByKey(), .sortByKey()), but that as a limitation of the shuffle at that time, each single key-value pair must fit in memory. 1) Now that the shuffle is sort-based rather than hash-based, does each pair still need to fit in memory for the shuffle? 2) Also, do other operators, such as .cogroup(), also spill to disk? Or must they fit in memory for the operator to work. thanks, ds -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/single-key-value-pair-fitting-in-memory-tp20305.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SQL query in scala API
I'm wondering how to do this kind of SQL query with PairRDDFunctions. SELECT zip, COUNT(user), COUNT(DISTINCT user) FROM users GROUP BY zip In the Spark scala API, I can make an RDD (called users) of key-value pairs where the keys are zip (as in ZIP code) and the values are user id's. Then I can compute the count and distinct count like this: val count = users.mapValues(_ = 1).reduceByKey(_ + _) val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _) Then, if I want count and countDistinct in the same table, I have to join them on the key. Is there a way to do this without doing a join (and without using SQL or spark SQL)? Arun
Re: Spark SQL UDF returning a list?
Hi, On Wed, Dec 3, 2014 at 4:31 PM, Jerry Raj jerry@gmail.com wrote: Exception in thread main java.lang.RuntimeException: [1.57] failure: ``('' expected but identifier myudf found I also tried returning a List of Ints, that did not work either. Is there a way to write a UDF that returns a list? You seem to be hitting a parser limitation before your function is even called. The message you are seeing is saying there must be an opening bracket here, and I am afraid you won't get around this whatever function you write... (maybe the HiveContext provides a possibility, though). Tobias
Re: Spark executor lost
bq. to get the logs from the data nodes Minor correction: the logs are collected from machines where node managers run. Cheers On Wed, Dec 3, 2014 at 3:39 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You want to look further up the stack (there are almost certainly other errors before this happens) and those other errors may give your better idea of what is going on. Also if you are running on yarn you can run yarn logs -applicationId yourAppId to get the logs from the data nodes. Sent with Good (www.good.com) -Original Message- *From: *S. Zhou [myx...@yahoo.com.INVALID] *Sent: *Wednesday, December 03, 2014 06:30 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Spark executor lost We are using Spark job server to submit spark jobs (our spark version is 0.91). After running the spark job server for a while, we often see the following errors (executor lost) in the spark job server log. As a consequence, the spark driver (allocated inside spark job server) gradually loses executors. And finally the spark job server no longer be able to submit jobs. We tried to google the solutions but so far no luck. Please help if you have any ideas. Thanks! [2014-11-25 01:37:36,250] INFO parkDeploySchedulerBackend [] [akka://JobServer/user/context-supervisor/next-staging] - Executor 6 disconnected, so removing it [2014-11-25 01:37:36,252] ERROR cheduler.TaskSchedulerImpl [] [akka://JobServer/user/context-supervisor/next-staging] - Lost executor 6 on : remote Akka client disassociated [2014-11-25 01:37:36,252] INFO ark.scheduler.DAGScheduler [] [] - *Executor lost*: 6 (epoch 8) [2014-11-25 01:37:36,252] INFO ge.BlockManagerMasterActor [] [] - Trying to remove executor 6 from BlockManagerMaster. [2014-11-25 01:37:36,252] INFO storage.BlockManagerMaster [] [] - Removed 6 successfully in removeExecutor [2014-11-25 01:37:36,286] INFO ient.AppClient$ClientActor [] [akka://JobServer/user/context-supervisor/next-staging] - Executor updated: app-20141125002023-0037/6 is now FAILED (Command exited with code 143) -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
How can a function running on a slave access the Executor
I have been working on balancing work across a number of partitions and find it would be useful to access information about the current execution environment much of which (like Executor ID) are available if there was a way to get the current executor or the Hadoop TaskAttempt context - does any one on the list know how to access this object from a function running on a slave. Currently I am reduced to tracking Mac Address to at least know which machine code is running on but there must be a better way
Re: textFileStream() issue?
Hi, On Wed, Dec 3, 2014 at 5:31 PM, Bahubali Jain bahub...@gmail.com wrote: I am trying to use textFileStream(some_hdfs_location) to pick new files from a HDFS location.I am seeing a pretty strange behavior though. textFileStream() is not detecting new files when I move them from a location with in hdfs to location at which textFileStream() is checking for new files. But when I copy files from a location in linux filesystem to hdfs then the textFileStream is detecting the new files. Is it possible that the timestamp of the moved files is actually older than the ones of previously processed files? I think only new files are picked up. Try moving the file and set the timestamp to now() to see if it makes a difference. Tobias
Re: Alternatives to groupByKey
looks good. I concern about the foldLeftByKey which looks break the consistence from foldLeft in RDD and aggregateByKey in PairRDD Yours, Xuefeng Wu 吴雪峰 敬上 On 2014年12月4日, at 上午7:47, Koert Kuipers ko...@tresata.com wrote: foldLeftByKey - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best way to have some singleton per worker
Hi, On Thu, Dec 4, 2014 at 2:59 AM, Ashic Mahtab as...@live.com wrote: I've been doing this with foreachPartition (i.e. have the parameters for creating the singleton outside the loop, do a foreachPartition, create the instance, loop over entries in the partition, close the partition), but it's quite cludgy. Is there a pattern by which I can have an instance of something nonserializable on each worker? I think the pattern you describe is the standard way of doing this, several people on this list (including me) have used it for database access etc. Tobias
Re: dockerized spark executor on mesos?
I'd like to tag a question onto this; has anybody attempted to deploy spark under Kubernetes https://github.com/googlecloudplatform/kubernetes or Kubernetes mesos ( https://github.com/mesosphere/kubernetes-mesos ) . On Wednesday, December 3, 2014, Matei Zaharia matei.zaha...@gmail.com wrote: I'd suggest asking about this on the Mesos list (CCed). As far as I know, there was actually some ongoing work for this. Matei On Dec 3, 2014, at 9:46 AM, Dick Davies d...@hellooperator.net javascript:; wrote: Just wondered if anyone had managed to start spark jobs on mesos wrapped in a docker container? At present (i.e. very early testing) I'm able to submit executors to mesos via spark-submit easily enough, but they fall over as we don't have a JVM on our slaves out of the box. I can push one out via our CM system if push comes to shove, but it'd be nice to have that as part of the job (I'm thinking it might be a way to get some of the dependencies deployed too). bear in mind I'm a total clueless newbie at this so please be gentle if I'm doing this completely wrong. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:; - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
reading dynamoDB with spark
Hi, I try to read data from DynamoDB table with Spark, but after I run this code I got an error massege like in below. I use Spark 1.1.1 and emr-core-1.1.jar, emr-ddb-hive-1.0.jar and emr-ddb-hadoop-1.0.jar. valsparkConf = SparkConf().setAppName(DynamoRdeader).setMaster(local[4]) valctx = JavaSparkContext(sparkConf) valjobConf = JobConf(ctx.hadoopConfiguration()) jobConf.set(dynamodb.servicename,dynamodb) jobConf.set(dynamodb.input.tableName,...) jobConf.set(dynamodb.endpoint,...) jobConf.set(dynamodb.regionid,...) jobConf.set(dynamodb.throughput.read,1) jobConf.set(dynamodb.throughput.read.percent,1.5) jobConf.set(dynamodb.max.map.tasks,2) jobConf.set(dynamodb.awsAccessKeyId,...) jobConf.set(dynamodb.awsSecretAccessKey,...) jobConf.set(mapred.input.format.class, javaClassDynamoDBInputFormat().getName()) varusers = ctx.hadoopRDD(jobConf,javaClassDynamoDBInputFormat(),javaClassText(),javaClassDynamoDBItemWritable()) users.collect().forEach{ println(it) } Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: java.lang.NullPointerException org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:930) org.apache.spark.SerializableWritable.writeObject(SerializableWritable.scala:34) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:867) org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) akka.actor.ActorCell.invoke(ActorCell.scala:456) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) akka.dispatch.Mailbox.run(Mailbox.scala:219) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
Spark SQL with a sorted file
Hi, If I create a SchemaRDD from a file that I know is sorted on a certain field, is it possible to somehow pass that information on to Spark SQL so that SQL queries referencing that field are optimized? Thanks -Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Having problem with Spark streaming with Kinesis
Guys, In my local machine it consumes a stream of Kinesis with 3 shards. But in EC2 it does not consume from the stream. Later we found that the EC2 machine was of 2 cores and my local machine was of 4 cores. I am using a single machine and in spark standalone mode. And we got a larger machine from EC2 and now the kinesis is getting consumed. 4 cores Single machine - works 2 cores Single machine - does not work 2 cores 2 workers - does not work So my question is that do we need a cluster of (#KinesisShards + 1) workers to be able to consume from Kinesis? A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Nov 27, 2014, at 10:28 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Did you set spark master as local[*]? If so, then it means that nunber of executors is equal to number of cores of the machine. Perhaps your mac machine has more cores (certainly more than number of kinesis shards +1). Try explicitly setting master as local[N] where N is number of kinesis shards + 1. It should then work on both the machines. On Thu, Nov 27, 2014, 9:46 AM Ashrafuzzaman ashrafuzzaman...@gmail.com wrote: I was trying in one machine with just sbt run. And it is working with my mac environment with the same configuration. I used the sample code from https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl) val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size() /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */ val numStreams = numShards /* Setup the and SparkConfig and StreamingContext */ /* Spark Streaming batch interval */ val batchInterval = Milliseconds(2000) val sparkConfig = new SparkConf().setAppName(KinesisWordCount) val ssc = new StreamingContext(sparkConfig, batchInterval) /* Kinesis checkpoint interval. Same as batchInterval for this example. */ val kinesisCheckpointInterval = batchInterval /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val unionStreams = ssc.union(kinesisStreams) /* Convert each line of Array[Byte] to String, split into words, and count them */ val words = unionStreams.flatMap(byteArray = new String(byteArray) .split( )) /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */ val wordCounts = words.map(word = (word, 1)).reduceByKey(_ + _) /* Print the first 10 wordCounts */ wordCounts.print() /* Start the streaming context and await termination */ ssc.start() ssc.awaitTermination() A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: What's your cluster size? For streamig to work, it needs shards + 1 executors. On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman ashrafuzzaman...@gmail.com wrote: Hi guys, When we are using Kinesis with 1 shard then it works fine. But when we use more that 1 then it falls into an infinite loop and no data is processed by the spark streaming. In the kinesis dynamo DB, I can see that it keeps increasing the leaseCounter. But it do start processing. I am using, scala: 2.10.4 java version: 1.8.0_25 Spark: 1.1.0 spark-streaming-kinesis-asl: 1.1.0 A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources
Re: SQL query in scala API
You may do this: |table(users).groupBy('zip)('zip, count('user), countDistinct('user)) | On 12/4/14 8:47 AM, Arun Luthra wrote: I'm wondering how to do this kind of SQL query with PairRDDFunctions. SELECT zip, COUNT(user), COUNT(DISTINCT user) FROM users GROUP BY zip In the Spark scala API, I can make an RDD (called users) of key-value pairs where the keys are zip (as in ZIP code) and the values are user id's. Then I can compute the count and distinct count like this: val count = users.mapValues(_ = 1).reduceByKey(_ + _) val countDistinct = users.distinct().mapValues(_ = 1).reduceByKey(_ + _) Then, if I want count and countDistinct in the same table, I have to join them on the key. Is there a way to do this without doing a join (and without using SQL or spark SQL)? Arun
spark-submit on YARN is slow
Hi, I am using spark-submit to submit my application to YARN in yarn-cluster mode. I have both the Spark assembly jar file as well as my application jar file put in HDFS and can see from the logging output that both files are used from there. However, it still takes about 10 seconds for my application's yarnAppState to switch from ACCEPTED to RUNNING. I am aware that this is probably not a Spark issue, but some YARN configuration setting (or YARN-inherent slowness), I was just wondering if anyone has an advice for how to speed this up. Thanks Tobias
cannot submit python files on EC2 cluster
Hi, I am using spark with version number 1.1.0 on an EC2 cluster. After I submitted the job, it returned an error saying that a python module cannot be loaded due to missing files. I am using the same command that used to work on an private cluster before for submitting jobs and all the source files are located in the current working directory. I also tried to add the current path to $PYTHONPATH but it didn't help. In the outputs, there is nothing coming after the line spark.submit.pyFiles=. The command I used is spark-submit --executor-memory 7G --driver-memory 8G l2_exp.py --py-files a.py,b.py,c.py Anyone knows how to fix this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-submit-python-files-on-EC2-cluster-tp20320.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark SQL UDF returning a list?
Yes I agree, and it may also be ambiguous in semantic. A list of objects V.S. A list with single List Object. I’ve also tested that, seems a. There is a bug in registerFunction, which doesn’t support the UDF without argument. ( I just create a PR for this: https://github.com/apache/spark/pull/3595 ) b. It expects the function return type to be immutable.Seq[XX] for List, immutable.Map[X, X] for Map, scala.Product for Struct, and only Array[Byte] for binary. The Array[_] is not supported. Cheng Hao From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Thursday, December 4, 2014 9:05 AM To: Jerry Raj Cc: user Subject: Re: Spark SQL UDF returning a list? Hi, On Wed, Dec 3, 2014 at 4:31 PM, Jerry Raj jerry@gmail.commailto:jerry@gmail.com wrote: Exception in thread main java.lang.RuntimeException: [1.57] failure: ``('' expected but identifier myudf found I also tried returning a List of Ints, that did not work either. Is there a way to write a UDF that returns a list? You seem to be hitting a parser limitation before your function is even called. The message you are seeing is saying there must be an opening bracket here, and I am afraid you won't get around this whatever function you write... (maybe the HiveContext provides a possibility, though). Tobias
RE: Spark SQL with a sorted file
You can try to write your own Relation with filter push down or use the ParquetRelation2 for workaround. (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala) Cheng Hao -Original Message- From: Jerry Raj [mailto:jerry@gmail.com] Sent: Thursday, December 4, 2014 11:34 AM To: user@spark.apache.org Subject: Spark SQL with a sorted file Hi, If I create a SchemaRDD from a file that I know is sorted on a certain field, is it possible to somehow pass that information on to Spark SQL so that SQL queries referencing that field are optimized? Thanks -Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issue in executing Spark Application from Eclipse
Hi All, I have a standalone Spark(1.1) cluster on one machine and I have installed scala Eclipse IDE (scala 2.10) on my desktop. I am trying to execute a spark code to execute over my standalone cluster but getting errors. Please guide me to resolve this. Code: val logFile = File Path present on desktop // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application).setMaster(spark://IP:PORT).setSparkHome(/home/stuti/Spark/spark-1.1.0-bin-hadoop1); val sc = new SparkContext(conf) println(sc.master) // Print correct master val logData = sc.textFile(logFile, 2).cache() println(logData.count) // throws error Error : Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/12/04 11:05:38 INFO SecurityManager: Changing view acls to: stutiawasthi, 14/12/04 11:05:38 INFO SecurityManager: Changing modify acls to: stutiawasthi, 14/12/04 11:05:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(stutiawasthi, ); users with modify permissions: Set(stutiawasthi, ) 14/12/04 11:05:39 INFO Slf4jLogger: Slf4jLogger started 14/12/04 11:05:39 INFO Remoting: Starting remoting 14/12/04 11:05:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@HOSTNAME_DESKTOP:62308] 14/12/04 11:05:40 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@HOSTNAME_DESKTOP:62308] 14/12/04 11:05:40 INFO Utils: Successfully started service 'sparkDriver' on port 62308. 14/12/04 11:05:40 INFO SparkEnv: Registering MapOutputTracker 14/12/04 11:05:40 INFO SparkEnv: Registering BlockManagerMaster 14/12/04 11:05:40 INFO DiskBlockManager: Created local directory at C:\Users\STUTIA~1\AppData\Local\Temp\spark-local-20141204110540-ad60 14/12/04 11:05:40 INFO Utils: Successfully started service 'Connection manager for block manager' on port 62311. 14/12/04 11:05:40 INFO ConnectionManager: Bound socket to port 62311 with id = ConnectionManagerId(HOSTNAME_DESKTOP,62311) 14/12/04 11:05:41 INFO MemoryStore: MemoryStore started with capacity 133.6 MB 14/12/04 11:05:41 INFO BlockManagerMaster: Trying to register BlockManager 14/12/04 11:05:41 INFO BlockManagerMasterActor: Registering block manager HOSTNAME_DESKTOP:62311 with 133.6 MB RAM 14/12/04 11:05:41 INFO BlockManagerMaster: Registered BlockManager 14/12/04 11:05:41 INFO HttpFileServer: HTTP File server directory is C:\Users\STUTIA~1\AppData\Local\Temp\spark-b65e69f4-69b9-4bb2-b41f-67165909e4c7 14/12/04 11:05:41 INFO HttpServer: Starting HTTP Server 14/12/04 11:05:41 INFO Utils: Successfully started service 'HTTP file server' on port 62312. 14/12/04 11:05:42 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/12/04 11:05:42 INFO SparkUI: Started SparkUI at http://HOSTNAME_DESKTOP:4040 14/12/04 11:05:43 INFO AppClient$ClientActor: Connecting to master spark://10.112.67.80:7077... 14/12/04 11:05:43 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 spark://10.112.67.80:7077 14/12/04 11:05:44 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 14/12/04 11:05:45 INFO MemoryStore: ensureFreeSpace(31447) called with curMem=0, maxMem=140142182 14/12/04 11:05:45 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 30.7 KB, free 133.6 MB) 14/12/04 11:05:45 INFO MemoryStore: ensureFreeSpace(3631) called with curMem=31447, maxMem=140142182 14/12/04 11:05:45 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.5 KB, free 133.6 MB) 14/12/04 11:05:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on HOSTNAME_DESKTOP:62311 (size: 3.5 KB, free: 133.6 MB) 14/12/04 11:05:45 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 14/12/04 11:05:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/04 11:05:45 WARN LoadSnappy: Snappy native library not loaded 14/12/04 11:05:46 INFO FileInputFormat: Total input paths to process : 1 14/12/04 11:05:46 INFO SparkContext: Starting job: count at Test.scala:15 14/12/04 11:05:46 INFO DAGScheduler: Got job 0 (count at Test.scala:15) with 2 output partitions (allowLocal=false) 14/12/04 11:05:46 INFO DAGScheduler: Final stage: Stage 0(count at Test.scala:15) 14/12/04 11:05:46 INFO DAGScheduler: Parents of final stage: List() 14/12/04 11:05:46 INFO DAGScheduler: Missing parents: List() 14/12/04 11:05:46 INFO DAGScheduler: Submitting Stage 0 (D:/Workspace/Spark/Test/README MappedRDD[1] at textFile at Test.scala:14), which has no missing parents 14/12/04 11:05:46 INFO MemoryStore: ensureFreeSpace(2408) called with curMem=35078, maxMem=140142182
MLLIB model export: PMML vs MLLIB serialization
Hi All, I am doing model training using Spark MLLIB inside our hadoop cluster. But prediction happens in a different realtime synchronous system(Web application). I am currently exploring different options to export the trained Mllib models from spark. 1. *Export model as PMML:* I found the projects under JPMML: Java PMML API https://github.com/jpmml is quite interesting. Use JPMML https://github.com/jpmml/jpmml to convert the mllib model entity to PMML. And use PMML evaluator https://github.com/jpmml/jpmml-evaluator for prediction in a different system. Or we can also explore openscoring rest api https://github.com/jpmml/openscoring for model deployment and prediction. This could be standard approach if we need to port models across different systems. But converting non linear Mllib models to PMML might be a complex task. Apart from that I need to keep on updating my Mllib to PMML conversion code for any new Mllib models or any change in Mllib entities. I have not evaluated any of these JPMML projects personally and I see there is only single contributor for these projects. Just wondering if enough people have already started using these projects. Please share if any of you have any points on this. 2. *Export MLLIB model as serialized form:* Mllib models can be serialized using Kryo serialization http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAFRXrqdpkfCX41=JyTSmmtt8aNWrSdpJvxE3FmYVZ=uuepe...@mail.gmail.com%3E or normal java serialization http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-mllib-model-to-hdfs-and-reload-it-td11953.html . And the same model can be deserialized by different other standalone applications and use the mllib entity for prediction. This blog http://blog.knoldus.com/2014/07/21/play-with-spark-building-spark-mllib-in-a-play-spark-application/ shows an example how spark mllib can be used inside Play web application. I am expecting, I can use spark mllib in any other JVM based web application in the same way(?). Please share if any one has any experience on this. Advantage of this approach is : - No recurring effort to support any new model or any change in Mllib model entity in future version. - Less dependency on any other tools Disadvantages: - Model can not be ported to non JVM system - Model serialized using one version of Mllib entity, may not be deserializable using a different version of mllib entity(?). I think this is a quite common problem.I am really interested to hear from you people how you are solving this and what are the approaches and pros and cons. Thanks Sourabh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: netty on classpath when using spark-submit
Markus, On Tue, Nov 11, 2014 at 10:40 AM, M. Dale medal...@yahoo.com wrote: I never tried to use this property. I was hoping someone else would jump in. When I saw your original question I remembered that Hadoop has something similar. So I searched and found the link below. A quick JIRA search seems to indicate that there is another property: https://issues.apache.org/jira/browse/SPARK-2996 Maybe that property will work with yarn: spark.yarn.user.classpath.first Thank you very much! That property does in fact load the classes from my jar file first when running on YARN, great! However, in local[N] mode, neither that one nor the spark.files.userClassPathFirst one works. So when using spark-submit with --master local[3] instead of --master yarn-cluster, the value for spark.files.userClassPathFirst is displayed correctly, but the classes are still loaded from the wrong jar... Tobias
Re: Filter using the Vertex Ids
To get that function in scope you have to import org.apache.spark.SparkContext._ Ankur On Wednesday, December 3, 2014, Deep Pradhan pradhandeep1...@gmail.com wrote: But groupByKey() gives me the error saying that it is not a member of org.apache.spark,rdd,RDD[(Double, org.apache.spark.graphx.VertexId)] -- Ankur http://www.ankurdave.com/
Spark Streaming empty RDD issue
Hi Experts I am using Spark Streaming to integrate Kafka for real time data processing. I am facing some issues related to Spark Streaming So I want to know how can we detect 1) Our connection has been lost 2) Our receiver is down 3) Spark Streaming has no new messages to consume. how can we deal these issues? I will be glad to hear from you and will be thankful to you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-empty-RDD-issue-tp20329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: heterogeneous cluster setup
You'll have to decide which is more expensive in your heterogenous environment and optimize for the utilization of that. For example, you may decide that memory is the only costing factor and you can discount the number of cores. Then you could have 8GB on each worker each with four cores. Note that cores in Spark don't necessarily map to cores on the machine. It's just a configuration setting for how many simultaneous tasks that worker can work on. You are right that each executor gets the same amount of resources and I would add level of parallelization. Your heterogeneity is in the physical layout of your cluster, not in how Spark treats the workers as resources. It's very important for Spark's workers to have the same resources available because it needs to be able to generically divide and conquer your data amongst all those workers. Hope that helps, Victor On Wed, Dec 3, 2014 at 10:04 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Thank you so much for valuable reply, Victor. That's a very clear solution I understood. Right now I have nodes with: 16Gb RAM, 4 cores; 8GB RAM, 4cores; 8GB RAM, 2 cores. From my understanding, the division could be something like, each executor can have 2 cores and 6GB RAM. So, the ones with 16GB RAM and 4 cores can have two executors. Please let me know if my understanding is correct. But, I am not able to see any heterogeneity in this setting as each executor has got the same amount of resources. Can you please clarify this doubt? Regards Karthik On Wed, Dec 3, 2014 at 11:11 PM, Victor Tso-Guillen v...@paxata.com wrote: I don't have a great answer for you. For us, we found a common divisor, not necessarily a whole gigabyte, of the available memory of the different hardware and used that as the amount of memory per worker and scaled the number of cores accordingly so that every core in the system has the same amount of memory. The quotient of the available memory and the common divisor, hopefully a whole number to reduce waste, was the number of workers we spun up. Therefore, if you have 64G, 30G, and 15G available memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1 worker per machine. Every worker on all the machines would have the same number of cores, set to what you think is a good value. Hope that helps. On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote: Hi Victor, I want to setup a heterogeneous stand-alone spark cluster. I have hardware with different memory sizes and varied number of cores per node. I could get all the nodes active in the cluster only when the size of memory per executor is set as the least available memory size of all nodes and is same with no.of cores/executor. As of now, I configure one executor per node. Can you please suggest some path to set up a stand-alone heterogeneous cluster such that I can efficiently use the available hardware. Thank you _ Sent from http://apache-spark-user-list.1001560.n3.nabble.com
Re: Issue in executing Spark Application from Eclipse
It seems you provided master url as spark://10.112.67.80:7077 , i think you should give spark://ubuntu:7077 instead. Thanks Best Regards On Thu, Dec 4, 2014 at 11:35 AM, Stuti Awasthi stutiawas...@hcl.com wrote: Hi All, I have a standalone Spark(1.1) cluster on one machine and I have installed scala Eclipse IDE (scala 2.10) on my desktop. I am trying to execute a spark code to execute over my standalone cluster but getting errors. Please guide me to resolve this. Code: val logFile = File Path present on desktop // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application).setMaster(spark://IP:PORT).setSparkHome(/home/stuti/Spark/spark-1.1.0-bin-hadoop1); val sc = new SparkContext(conf) println(sc.master) // Print correct master val logData = sc.textFile(logFile, 2).cache() println(logData.count) // throws error Error : Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/12/04 11:05:38 INFO SecurityManager: Changing view acls to: stutiawasthi, 14/12/04 11:05:38 INFO SecurityManager: Changing modify acls to: stutiawasthi, 14/12/04 11:05:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(stutiawasthi, ); users with modify permissions: Set(stutiawasthi, ) 14/12/04 11:05:39 INFO Slf4jLogger: Slf4jLogger started 14/12/04 11:05:39 INFO Remoting: Starting remoting 14/12/04 11:05:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@HOSTNAME_DESKTOP:62308] 14/12/04 11:05:40 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@HOSTNAME_DESKTOP:62308] 14/12/04 11:05:40 INFO Utils: Successfully started service 'sparkDriver' on port 62308. 14/12/04 11:05:40 INFO SparkEnv: Registering MapOutputTracker 14/12/04 11:05:40 INFO SparkEnv: Registering BlockManagerMaster 14/12/04 11:05:40 INFO DiskBlockManager: Created local directory at C:\Users\STUTIA~1\AppData\Local\Temp\spark-local-20141204110540-ad60 14/12/04 11:05:40 INFO Utils: Successfully started service 'Connection manager for block manager' on port 62311. 14/12/04 11:05:40 INFO ConnectionManager: Bound socket to port 62311 with id = ConnectionManagerId(HOSTNAME_DESKTOP,62311) 14/12/04 11:05:41 INFO MemoryStore: MemoryStore started with capacity 133.6 MB 14/12/04 11:05:41 INFO BlockManagerMaster: Trying to register BlockManager 14/12/04 11:05:41 INFO BlockManagerMasterActor: Registering block manager HOSTNAME_DESKTOP:62311 with 133.6 MB RAM 14/12/04 11:05:41 INFO BlockManagerMaster: Registered BlockManager 14/12/04 11:05:41 INFO HttpFileServer: HTTP File server directory is C:\Users\STUTIA~1\AppData\Local\Temp\spark-b65e69f4-69b9-4bb2-b41f-67165909e4c7 14/12/04 11:05:41 INFO HttpServer: Starting HTTP Server 14/12/04 11:05:41 INFO Utils: Successfully started service 'HTTP file server' on port 62312. 14/12/04 11:05:42 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/12/04 11:05:42 INFO SparkUI: Started SparkUI at http:// HOSTNAME_DESKTOP:4040 14/12/04 11:05:43 INFO AppClient$ClientActor: Connecting to master spark://10.112.67.80:7077... 14/12/04 11:05:43 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 spark://10.112.67.80:7077 14/12/04 11:05:44 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 14/12/04 11:05:45 INFO MemoryStore: ensureFreeSpace(31447) called with curMem=0, maxMem=140142182 14/12/04 11:05:45 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 30.7 KB, free 133.6 MB) 14/12/04 11:05:45 INFO MemoryStore: ensureFreeSpace(3631) called with curMem=31447, maxMem=140142182 14/12/04 11:05:45 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.5 KB, free 133.6 MB) 14/12/04 11:05:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on HOSTNAME_DESKTOP:62311 (size: 3.5 KB, free: 133.6 MB) 14/12/04 11:05:45 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 14/12/04 11:05:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/04 11:05:45 WARN LoadSnappy: Snappy native library not loaded 14/12/04 11:05:46 INFO FileInputFormat: Total input paths to process : 1 14/12/04 11:05:46 INFO SparkContext: Starting job: count at Test.scala:15 14/12/04 11:05:46 INFO DAGScheduler: Got job 0 (count at Test.scala:15) with 2 output partitions (allowLocal=false) 14/12/04 11:05:46 INFO DAGScheduler: Final stage: Stage 0(count at Test.scala:15) 14/12/04 11:05:46 INFO DAGScheduler: Parents of final stage: List() 14/12/04 11:05:46 INFO DAGScheduler: Missing parents: List() 14/12/04 11:05:46 INFO DAGScheduler: Submitting Stage 0
Re: Spark Streaming empty RDD issue
You can setup nagios based monitoring for these, also setting up a high availability environment will be more fault tolerant. Thanks Best Regards On Thu, Dec 4, 2014 at 12:17 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi Experts I am using Spark Streaming to integrate Kafka for real time data processing. I am facing some issues related to Spark Streaming So I want to know how can we detect 1) Our connection has been lost 2) Our receiver is down 3) Spark Streaming has no new messages to consume. how can we deal these issues? I will be glad to hear from you and will be thankful to you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-empty-RDD-issue-tp20329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dockerized spark executor on mesos?
Oh great, thanks Tim - I'll give that a whirl then. A Spark rollout isn't in our immediate future, I'm just looking for a good framework for compute alongside our marathon deployment. So a good time to experiment! On 4 December 2014 at 02:36, Tim Chen t...@mesosphere.io wrote: Hi Dick, There is a PR on Spark to enable launching Spark executors with Docker containers, and pending a committer to review and merge (https://github.com/apache/spark/pull/3074) If you like to try it out please take the patch and try it! Tim On Wed, Dec 3, 2014 at 6:31 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'd like to tag a question onto this; has anybody attempted to deploy spark under Kubernetes https://github.com/googlecloudplatform/kubernetes or Kubernetes mesos ( https://github.com/mesosphere/kubernetes-mesos ) . On Wednesday, December 3, 2014, Matei Zaharia matei.zaha...@gmail.com wrote: I'd suggest asking about this on the Mesos list (CCed). As far as I know, there was actually some ongoing work for this. Matei On Dec 3, 2014, at 9:46 AM, Dick Davies d...@hellooperator.net wrote: Just wondered if anyone had managed to start spark jobs on mesos wrapped in a docker container? At present (i.e. very early testing) I'm able to submit executors to mesos via spark-submit easily enough, but they fall over as we don't have a JVM on our slaves out of the box. I can push one out via our CM system if push comes to shove, but it'd be nice to have that as part of the job (I'm thinking it might be a way to get some of the dependencies deployed too). bear in mind I'm a total clueless newbie at this so please be gentle if I'm doing this completely wrong. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
serialization issue in case of case class is more than 1
Hi, I am newbie in Spark and performed following steps during POC execution: 1. Map csv file to object-file after some transformations once. 2. Serialize object-file to RDD for operation, as per need. In case of 2 csv/object-files, first object-file is serialized to RDD successfully but during serialization of second object-file error appears. This error occurs only when spark-shell is restarted between step-1 and step-2. Please suggest how to serialize 2 object-files. Also find below executed code on spark-shell *** //#1//Start spark-shell and csv to object-file creation val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class person(id: Int, name: String, fathername: String, officeid: Int) val baseperson = sc.textFile(person_csv).flatMap(line = line.split(\n)).map(_.split(,)) baseperson.map(p = person(p(0).trim.toInt, p(1), p(2), p(3).trim.toInt)).saveAsObjectFile(person_obj) case class office(id: Int, name: String, landmark: String, areacode: String) val baseoffice = sc.textFile(office_csv).flatMap(line = line.split(\n)).map(_.split(,)) baseoffice.map(p = office(p(0).trim.toInt, p(1), p(2), p(3))).saveAsObjectFile(office_obj) //#2//Stop spark-shell //#3//Start spark-shell and map object-file val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class person(id: Int, name: String, fathername: String, officeid: Int) case class office(id: Int, name: String, landmark: String, areacode: String) sc.objectFile[person](person_obj).count [OK] sc.objectFile[office](office_obj).count *[FAILS]* *** stack trace is attached stacktrace.txt http://apache-spark-user-list.1001560.n3.nabble.com/file/n20334/stacktrace.txt rahul@... *** Regards, Rahul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-in-case-of-case-class-is-more-than-1-tp20334.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Necessity for rdd replication.
Hi, I was just thinking about necessity for rdd replication. One category could be something like large number of threads requiring same rdd. Even though, a single rdd can be shared by multiple threads belonging to same application , I believe we can extract better parallelism if the rdd is replicated, am I right?. I am eager to know if there are any real life applications or any other scenarios which force rdd to be replicated. Can someone please throw some light on necessity for rdd replication. Thank you
Re: cannot submit python files on EC2 cluster
On Wed, Dec 3, 2014 at 8:17 PM, chocjy jiyanyan...@gmail.com wrote: Hi, I am using spark with version number 1.1.0 on an EC2 cluster. After I submitted the job, it returned an error saying that a python module cannot be loaded due to missing files. I am using the same command that used to work on an private cluster before for submitting jobs and all the source files are located in the current working directory. I also tried to add the current path to $PYTHONPATH but it didn't help. In the outputs, there is nothing coming after the line spark.submit.pyFiles=. The command I used is spark-submit --executor-memory 7G --driver-memory 8G l2_exp.py --py-files a.py,b.py,c.py --py-files should be placed before your script l2_exp.py, or it will became arguments of l2_exp.py. Anyone knows how to fix this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-submit-python-files-on-EC2-cluster-tp20320.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org