spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)
Hi all I am unable to access s3n:// urls using sc.textFile().. getting 'no file system for scheme s3n://' error. a bug or some conf settings missing? See below for details: env variables : AWS_SECRET_ACCESS_KEY=set AWS_ACCESS_KEY_ID=set spark/RELAESE : Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 ./bin/spark-shell val f = sc.textFile(s3n://bucket/file) f.count error== java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.count(RDD.scala:1006) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at org.apache.spark.repl.SparkILoop.org $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Sujee Maniyam
RE: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)
Below is my code to access s3n without problem (only for 1.3.1. there is a bug in 1.3.0). Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId); hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey); Regards, Shuai From: Sujee Maniyam [mailto:su...@sujee.net] Sent: Wednesday, April 22, 2015 12:45 PM To: Spark User List Subject: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:) Hi all I am unable to access s3n:// urls using sc.textFile().. getting 'no file system for scheme s3n://' error. a bug or some conf settings missing? See below for details: env variables : AWS_SECRET_ACCESS_KEY=set AWS_ACCESS_KEY_ID=set spark/RELAESE : Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 ./bin/spark-shell val f = sc.textFile(s3n://bucket/file) f.count error== java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.count(RDD.scala:1006) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) at
Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)
This thread from hadoop mailing list should give you some clue: http://search-hadoop.com/m/LgpTk2df7822 On Wed, Apr 22, 2015 at 9:45 AM, Sujee Maniyam su...@sujee.net wrote: Hi all I am unable to access s3n:// urls using sc.textFile().. getting 'no file system for scheme s3n://' error. a bug or some conf settings missing? See below for details: env variables : AWS_SECRET_ACCESS_KEY=set AWS_ACCESS_KEY_ID=set spark/RELAESE : Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 ./bin/spark-shell val f = sc.textFile(s3n://bucket/file) f.count error== java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.count(RDD.scala:1006) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at org.apache.spark.repl.SparkILoop.org $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at
Can I index a column in parquet file to make it join faster
I have two RDDs each saved in a parquet file. I need to join this two RDDs by the id column. Can I created index on the id column so they can join faster? Here is the code case class Example(val id: String, val category: String) case class DocVector(val id: String, val vector: Vector) val examples : RDD[Example] = ...// First RDD which contain a few thousands items val docVectors : RDD[DocVector] = ... // Second RDD which contains 10 million items // These two RDDs are saved in parquet files as follow examples.toDF().saveAsParquetFile(file:///c:/temp/examples.parquet) docVectors.toDF().saveAsParquetFile(file:///c:/temp/docVectors.parquet) // Now I need to join these two RDDs stored in the parquet files val dfExamples = sqlContext.parquetFile(file:///c:/temp/docVectors.parquet) val dfDocVectors = sqlContext.parquetFile(docVectorsParquet) // DataFrame of (id, vector) dfExamples.join(dfDocVectors, dfExamples(id) === dfDocVectors(id)).select(dfDocVectors(id), dfDocVectors(vector), dfExamples(cat)) I need to perform such kind of join many times. To speed up the join, can I create index on the id column in the parquet file like what I can do to a database table? Ningjun
Re: Convert DStream to DataFrame
I tried the solution of the guide, but I exceded the size of case class Row: 2015-04-22 15:22 GMT+02:00 Tathagata Das tathagata.das1...@gmail.com: Did you checkout the latest streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations You also need to be aware of that to convert json RDDs to dataframe, sqlContext has to make a pass on the data to learn the schema. This will fail if a batch has no data. You have to safeguard against that. On Wed, Apr 22, 2015 at 6:19 AM, ayan guha guha.a...@gmail.com wrote: What about sqlcontext.createDataframe(rdd)? On 22 Apr 2015 23:04, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Hi, I am using Kafka with Apache Stream to send JSON to Apache Spark: val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) Now, I want parse the DStream created to DataFrame, but I don't know if Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the message with: val lines = messages.map(_._2) Thank u for all. Sergio J.
Re: Clustering algorithms in Spark
does anybody have any thought on this? On 21 April 2015 at 20:57, Jeetendra Gangele gangele...@gmail.com wrote: The problem with k means is we have to define the no of cluster which I dont want in this case So thinking for something like hierarchical clustering any idea and suggestions? On 21 April 2015 at 20:51, Jeetendra Gangele gangele...@gmail.com wrote: I have a requirement in which I want to match the company name .. and I am thinking to solve this using clustering technique. Can anybody suggest which algo I should Use in Spark and how to evaluate the running time and accuracy for this particular problem. I checked K means looks good. Any idea suggestions?
Re: Convert DStream to DataFrame
Did you checkout the latest streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations You also need to be aware of that to convert json RDDs to dataframe, sqlContext has to make a pass on the data to learn the schema. This will fail if a batch has no data. You have to safeguard against that. On Wed, Apr 22, 2015 at 6:19 AM, ayan guha guha.a...@gmail.com wrote: What about sqlcontext.createDataframe(rdd)? On 22 Apr 2015 23:04, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Hi, I am using Kafka with Apache Stream to send JSON to Apache Spark: val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) Now, I want parse the DStream created to DataFrame, but I don't know if Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the message with: val lines = messages.map(_._2) Thank u for all. Sergio J.
Re: Building Spark : Building just one module.
One way is to use export SPARK_PREPEND_CLASSES=true. This will instruct the launcher to prepend the target directories for each project to the spark assembly. I’ve had mixed experiences with it lately, but in principle that's the only way I know. On Wed, Apr 22, 2015 at 3:42 PM, zia_kayani zia.kay...@platalytics.com wrote: Hi, I've to add custom things into spark SQL and Catalyst Module ... But for every time I change a line of code I've to compile the whole spark, if I only compile sql/core and sql/catalyst module those changes aren't visible when I run the job over that spark, What I'm missing ? Any other way to overcome this, as this is time talking ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-Building-just-one-module-tp22607.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Re: Auto Starting a Spark Job on Cluster Starup
This thread seems related: http://search-hadoop.com/m/JW1q51W02V Cheers On Wed, Apr 22, 2015 at 6:09 AM, James King jakwebin...@gmail.com wrote: What's the best way to start-up a spark job as part of starting-up the Spark cluster. I have an single uber jar for my job and want to make the start-up as easy as possible. Thanks jk
Re: Multiple HA spark clusters managed by 1 ZK cluster?
You can run multiple Spark clusters against one ZK cluster. Just use this config to set independent ZK roots for each cluster: spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark). -Jeff From: Sean Owen so...@cloudera.com To: Akhil Das ak...@sigmoidanalytics.com Cc: Michal Klos michal.klo...@gmail.com, User user@spark.apache.org Date: Wed, 22 Apr 2015 11:05:46 +0100 Subject: Re: Multiple HA spark clusters managed by 1 ZK cluster? Not that i've tried it, but, why couldn't you use one ZK server? I don't see a reason. On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: It isn't mentioned anywhere in the doc, but you will probably need separate ZK for each of your HA cluster. Thanks Best Regards
Re: Spark Performance on Yarn
In master branch, overhead is now 10%. That would be 500 MB FYI On Apr 22, 2015, at 8:26 AM, nsalian neeleshssal...@gmail.com wrote: +1 to executor-memory to 5g. Do check the overhead space for both the driver and the executor as per Wilfred's suggestion. Typically, 384 MB should suffice. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22610.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Master -chatter - Worker
Is there a good resource that covers what kind of chatter (communication) that goes on between driver, master and worker processes? Thanks
the indices of SparseVector must be ordered while computing SVD
Hi all, I am using Spark 1.3.1 to write a Spectral Clustering algorithm. This really confused me today. At first I thought my implementation is wrong. It turns out it's an issue in MLlib. Fortunately, I've figured it out. I suggest to add a hint on user document of MLlib ( as far as I know, there have not been such hints yet) that indices of Local Sparse Vector must be ordered in ascending manner. Because of ignorance of this point, I spent a lot of time looking for reasons why computeSVD of RowMatrix did not run correctly on Sparse data. I don't know the influence of Sparse Vector without ordered indices on other functions, but I believe it is necessary to let the users know or fix it. Actually, it's very easy to fix. Just add a sortBy function in internal construction of SparseVector. Here is an example to reproduce the affect of unordered Sparse Vector on computeSVD. //in spark-shell, Spark 1.3.1 import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors} val sparseData_ordered = Seq( Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), Vectors.sparse(3, Array(0,1,2), Array(3.0, 4.0, 5.0)), Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)), Vectors.sparse(3, Array(0,2), Array(9.0, 1.0)) ) val sparseMat_ordered = new RowMatrix(sc.parallelize(sparseData_ordered, 2)) val sparseData_not_ordered = Seq( Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), Vectors.sparse(3, Array(2,1,0), Array(5.0,4.0,3.0)), Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)), Vectors.sparse(3, Array(2,0), Array(1.0,9.0)) ) val sparseMat_not_ordered = new RowMatrix(sc.parallelize(sparseData_not_ordered, 2)) //apparently, sparseMat_ordered and sparseMat_not_ordered are essentially the same matirx //however, the computeSVD result of these two matrixes are different. Users should be notified about this situation. println(sparseMat_ordered.computeSVD(2, true).U.rows.collect.mkString(\n)) println(===) println(sparseMat_not_ordered.computeSVD(2, true).U.rows.collect.mkString(\n)) == The results are: ordered: [-0.10972870132786407,-0.18850811494220537] [-0.44712472003608356,-0.24828866611663725] [-0.784520738744303,-0.3080692172910691] [-0.4154110101064339,0.8988385762953358] not ordered: [-0.10830447119599484,-0.1559341848984378] [-0.4522713511277327,-0.23449829541447448] [-0.7962382310594706,-0.3130624059305111] [-0.43131320303494614,0.8453864703362308] Looking into this issue, I can see it's reason locates in RowMatrix.scala(line 629). The implementation of Sparse dspr here requires ordered indices. Because it is scanning the indices consecutively to skip empty columns. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/the-indices-of-SparseVector-must-be-ordered-while-computing-SVD-tp22611.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet Hive table become very slow on 1.3?
Xudong and Rex, Can you try 1.3.1? With PR 5339 http://github.com/apache/spark/pull/5339 , after we get a hive parquet from metastore and convert it to our native parquet code path, we will cache the converted relation. For now, the first access to that hive parquet table reads all of the footers (when you first refer to that table in a query or call sqlContext.table(hiveParquetTable)). All of your later accesses will hit the metadata cache. Thanks, Yin On Tue, Apr 21, 2015 at 1:13 AM, Rex Xiong bycha...@gmail.com wrote: We have the similar issue with massive parquet files, Cheng Lian, could you have a look? 2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com: Hi Cheng, I tried both these patches, and seems still not resolve my issue. And I found the most time is spend on this line in newParquet.scala: ParquetFileReader.readAllFootersInParallel( sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) Which need read all the files under the Parquet folder, while our Parquet folder has a lot of Parquet files (near 2000), read one file need about 2 seconds, so it become very slow ... And the PR 5231 did not skip this steps so it not resolve my issue. As our Parquet files are generated by a Spark job, so the number of .parquet files is same with the number of tasks, that is why we have so many files. But these files actually have the same schema. Is there any way to merge these files into one, or avoid scan each of them? On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Xudong, We had been digging this issue for a while, and believe PR 5339 http://github.com/apache/spark/pull/5339 and PR 5334 http://github.com/apache/spark/pull/5339 should fix this issue. There two problems: 1. Normally we cache Parquet table metadata for better performance, but when converting Hive metastore Hive tables, the cache is not used. Thus heavy operations like schema discovery is done every time a metastore Parquet table is converted. 2. With Parquet task side metadata reading (which is turned on by default), we can actually skip the row group information in the footer. However, we accidentally called a Parquet function which doesn't skip row group information. For your question about schema merging, Parquet allows different part-files have different but compatible schemas. For example, part-1.parquet has columns a and b, while part-2.parquet may has columns a and c. In some cases, the summary files (_metadata and _common_metadata) contains the merged schema (a, b, and c), but it's not guaranteed. For example, when the user defined metadata stored different part-files contain different values for the same key, Parquet simply gives up writing summary files. That's why all part-files must be touched to get a precise merged schema. However, in scenarios where a centralized arbitrative schema is available (e.g. Hive metastore schema, or the schema provided by user via data source DDL), we don't need to do schema merging on driver side, but defer it to executor side and each task only needs to reconcile those part-files it needs to touch. This is also what the Parquet developers did recently for parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45. Cheng On 3/31/15 11:49 PM, Zheng, Xudong wrote: Thanks Cheng! Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but the PR 5231 seems not. Not sure any other things I did wrong ... BTW, actually, we are very interested in the schema merging feature in Spark 1.3, so both these two solution will disable this feature, right? It seems that Parquet metadata is store in a file named _metadata in the Parquet file folder (each folder is a partition as we use partition table), why we need scan all Parquet part files? Is there any other solutions could keep schema merging feature at the same time? We are really like this feature :) On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Xudong, This is probably because of Parquet schema merging is turned on by default. This is generally useful for Parquet files with different but compatible schemas. But it needs to read metadata from all Parquet part-files. This can be problematic when reading Parquet files with lots of part-files, especially when the user doesn't need schema merging. This issue is tracked by SPARK-6575, and here is a PR for it: https://github.com/apache/spark/pull/5231. This PR adds a configuration to disable schema merging by default when doing Hive metastore Parquet table conversion. Another workaround is to fallback to the old Parquet code by setting spark.sql.parquet.useDataSourceApi to false. Cheng On 3/31/15 2:47 PM, Zheng, Xudong wrote: Hi all, We are using Parquet Hive table, and we are upgrading to Spark 1.3. But we find that, just a simple COUNT(*) query will much slower (100x) than Spark
Re: Spark Performance on Yarn
+1 to executor-memory to 5g. Do check the overhead space for both the driver and the executor as per Wilfred's suggestion. Typically, 384 MB should suffice. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22610.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple HA spark clusters managed by 1 ZK cluster?
nice, thanks for the information. Thanks Best Regards On Wed, Apr 22, 2015 at 8:53 PM, Jeff Nadler jnad...@srcginc.com wrote: You can run multiple Spark clusters against one ZK cluster. Just use this config to set independent ZK roots for each cluster: spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark). -Jeff From: Sean Owen so...@cloudera.com To: Akhil Das ak...@sigmoidanalytics.com Cc: Michal Klos michal.klo...@gmail.com, User user@spark.apache.org Date: Wed, 22 Apr 2015 11:05:46 +0100 Subject: Re: Multiple HA spark clusters managed by 1 ZK cluster? Not that i've tried it, but, why couldn't you use one ZK server? I don't see a reason. On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: It isn't mentioned anywhere in the doc, but you will probably need separate ZK for each of your HA cluster. Thanks Best Regards
Re: Spark Performance on Yarn
Does it still hit the memory limit for the container? An expensive transformation? On Wed, Apr 22, 2015 at 8:45 AM, Ted Yu yuzhih...@gmail.com wrote: In master branch, overhead is now 10%. That would be 500 MB FYI On Apr 22, 2015, at 8:26 AM, nsalian neeleshssal...@gmail.com wrote: +1 to executor-memory to 5g. Do check the overhead space for both the driver and the executor as per Wilfred's suggestion. Typically, 384 MB should suffice. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22610.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD.filter vs. RDD.join--advice please
Hi Everyone, I have two options of filtering the RDD resulting from the Graph.vertices method as illustrated with the following pseudo code: 1. Filter val vertexSet = Set(vertexOne,vertexTwo...); val filteredVertices = Graph.vertices.filter(x = vertexSet.contains(x._2.vertexName)) 2. Join val filterVertexIdsRDD = ... val filteredVertices = Graph.vertices.join(filterVertexIdsRDD) Either one of these should work, correct? Thanks --John -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-filter-vs-RDD-join-advice-please-tp22612.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Trouble working with Spark-CSV package (error: object databricks is not a member of package com)
Afternoon all, I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via: `mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package` The error is encountered when running spark shell via: `spark-shell --packages com.databricks:spark-csv_2.11:1.0.3` The full trace of the commands can be found at https://gist.github.com/momer/9d1ca583f9978ec9739d Not sure if I've done something wrong, or if the documentation is outdated, or...? Would appreciate any input or push in the right direction! Thank you, Mo
Re: Map Question
Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim ᐧ
Re: regarding ZipWithIndex
Sure thanks. if you can guide me how to do this will be great help. On 17 April 2015 at 22:05, Ted Yu yuzhih...@gmail.com wrote: I have some assignments on hand at the moment. Will try to come up with sample code after I clear the assignments. FYI On Thu, Apr 16, 2015 at 2:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: Can you please guide me how can I extend RDD and convert into this way you are suggesting. On 16 April 2015 at 23:46, Jeetendra Gangele gangele...@gmail.com wrote: I type T i already have Object ... I have RDDObject and then I am calling ZipWithIndex on this RDD and getting RDDObject,Long on this I am running MapToPair and converting into RDDLong,Object so that i can use it later for other operation like lookup and join. On 16 April 2015 at 23:42, Ted Yu yuzhih...@gmail.com wrote: The Long in RDD[(T, Long)] is type parameter. You can create RDD with Integer as the first type parameter. Cheers On Thu, Apr 16, 2015 at 11:07 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi Ted. This works for me. But since Long takes here 8 bytes. Can I reduce it to 4 bytes. its just a index and I feel 4 bytes was more than enough.is there any method which takes Integer or similar for Index? On 13 April 2015 at 01:59, Ted Yu yuzhih...@gmail.com wrote: bq. will return something like JavaPairRDDObject, long The long component of the pair fits your description of index. What other requirement does ZipWithIndex not provide you ? Cheers On Sun, Apr 12, 2015 at 1:16 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have an RDD JavaRDDObject and I want to convert it to JavaPairRDDIndex,Object.. Index should be unique and it should maintain the order. For first object It should have 1 and then for second 2 like that. I tried using ZipWithIndex but it will return something like JavaPairRDDObject, long I wanted to use this RDD for lookup and join operation later in my workflow so ordering is important. Regards jeet
Spark streaming action running the same work in parallel
Hi, I'm running a unit test that keeps failing to work with the code I wrote in Spark. Here is the output logs from my test that I ran that gets the customers from incoming events in the JSON called QueueEvent and I am trying to convert the incoming events for each customer to an alert. From the logs you can see that there is one RDD in the stream with 6 events (3 for each customer). Here is the code snippet that gets the customers and gets the alerts for all the customers. Spark is doing the same work but on different threads as you can see in the logs Executor task launch worker-x. This is throwing off the results when comparing against the expected results. Here is some of the code in the test for troubleshooting -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-action-running-the-same-work-in-parallel-tp22613.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Not able run multiple tasks in parallel, spark streaming
Furthermore, just to explain, doing arr.par.foreach does not help because it not really running anything, it only doing setup of the computation. Doing the setup in parallel does not mean that the jobs will be done concurrently. Also, from your code it seems like your pairs of dstreams dont interact with each other (that is pair1 dont interact with pair2). Then you could run then in separate applications, which would allow them to run in parallel. On Tue, Apr 21, 2015 at 11:53 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can enable this flag to run multiple jobs concurrently, It might not be production ready, but you can give it a try: sc.set(spark.streaming.concurrentJobs,2) Refer to TD's answer here http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header for more information. Thanks Best Regards On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal abhaybansal.1...@gmail.com wrote: Hi, I have use case wherein I have to join multiple kafka topics in parallel. So if there are 2n topics there is a one to one mapping of topics which needs to be joined. val arr= ... for(condition) { val dStream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics1 ).map(a=(getKey1(a._2),a._2)) val dStream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics2 ).map(a=(getKey2(a._2),a._2)) arr(counter) = (dStream1, dStream2); counter+=1; } arr.par.foreach { case(dStream1, dStream2) = try { val joined = dStream1.join(dStream2,4); joined.saveAsTextFiles(joinedData”) } catch { case t:Exception =t.printStackTrace(); } } ssc.start() ssc.awaitTermination() Doing so the streams are getting joined by sequentially. Is there a way out of this? I am new to spark, would appreciate any suggestions around this. Thanks, -Abhay
Re: Map Question
Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: ElasticSearch for Spark times out
will you be able to paste the code? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn't it? Bug?) Here's the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:122) ~[na:1.7.0_75] at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75] at java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75] at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) ~[commons-httpclient-3.1.jar:na] at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) ~[commons-httpclient-3.1.jar:na] at java.io.FilterInputStream.read(FilterInputStream.java:133) ~[na:1.7.0_75] at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) ~[commons-httpclient-3.1.jar:na] at org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.codehaus.jackson.impl.Utf8StreamParser.loadMore(Utf8StreamParser.java:172) ~[jackson-core-asl-1.9.11.jar:1.9.11] at org.codehaus.jackson.impl.Utf8StreamParser.parseEscapedFieldName(Utf8StreamParser.java:1502)
RE: ElasticSearch for Spark times out
Hi The gist of it is this: I have data indexed into ES. Each index stores monthly data and the query will get data for some date range (across several ES indexes or within 1 if the date range is within 1 month). Then I merge these RDDs into an uberRdd and performs some operations then print the result with for each. The simplified code is below. { // esRdds: List[RDD] contains mentions count per post val esRdds = (startDate.getYear to endDate.getYear).flatMap { year = val sMonth = if (year == startDate.getYear) startDate.getMonthOfYear else 1 val eMonth = if (year == endDate.getYear) endDate.getMonthOfYear else 12 (sMonth to eMonth).map { i = sc.esRDD(s$year-${i.formatted(%02d)}_nlpindex/nlp, ESQueries.generateQueryString(Some(startDate), Some(endDate), mentionsToFind, siteNames)) .map { case (str, map) = unwrapAndCountMentionsPerPost(map)} } } var uberRdd = esRdds(0) for (rdd - esRdds) { uberRdd = uberRdd ++ rdd } uberRdd.map joinforeach(x = println(x)) } From: Jeetendra Gangele [mailto:gangele...@gmail.com] Sent: April 22, 2015 2:52 PM To: Adrian Mocanu Cc: u...@spark.incubator.apache.org Subject: Re: ElasticSearch for Spark times out will you be able to paste the code? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn't it? Bug?) Here's the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75] at
Re: ElasticSearch for Spark times out
Basically ready timeout means hat no data arrived within the specified receive timeout period. Few thing I would suggest 1.are your ES cluster Up and running? 2. if 1 is yes then reduce the size of the Index make it few kbps and then test? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn't it? Bug?) Here's the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:122) ~[na:1.7.0_75] at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75] at java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75] at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) ~[commons-httpclient-3.1.jar:na] at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) ~[commons-httpclient-3.1.jar:na] at java.io.FilterInputStream.read(FilterInputStream.java:133) ~[na:1.7.0_75] at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) ~[commons-httpclient-3.1.jar:na] at org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
It could very well be that your executor memory is not enough to store the state RDDs AND operate on the data. 1G per executor is quite low. Definitely give more memory. And have you tried increasing the number of partitions (specify number of partitions in updateStateByKey) ? On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra sourav.chan...@livestream.com wrote: Anyone? On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi Olivier, *the update function is as below*: *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {* * val previousCount = state.getOrElse((0L, 0L))._2* * var startValue: IConcurrentUsers = ConcurrentViewers(0)* * var currentCount = 0L* * val lastIndexOfConcurrentUsers =* *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* * val subList = values.slice(0, lastIndexOfConcurrentUsers)* * val currentCountFromSubList = subList.foldLeft(startValue)(_ op _).count + previousCount* * val lastConcurrentViewersCount = values(lastIndexOfConcurrentUsers).count* * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList) = 1) {* *logger.error(* * sCount using state updation $currentCountFromSubList, +* *sConcurrentUsers count $lastConcurrentViewersCount +* *s resetting to $lastConcurrentViewersCount* *)* *currentCount = lastConcurrentViewersCount* * }* * val remainingValuesList = values.diff(subList)* * startValue = ConcurrentViewers(currentCount)* * currentCount = remainingValuesList.foldLeft(startValue)(_ op _).count* * if (currentCount 0) {* *logger.error(* * sERROR: Got new count $currentCount 0, value:$values, state:$state, resetting to 0* *)* *currentCount = 0* * }* * // to stop pushing subsequent 0 after receiving first 0* * if (currentCount == 0 previousCount == 0) None* * else Some(previousCount, currentCount)* *}* *trait IConcurrentUsers {* * val count: Long* * def op(a: IConcurrentUsers): IConcurrentUsers = IConcurrentUsers.op(this, a)* *}* *object IConcurrentUsers {* * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = (a, b) match {* *case (_, _: ConcurrentViewers) = * * ConcurrentViewers(b.count)* *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = * * ConcurrentViewers(a.count + b.count)* *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = * * ConcurrentViewers(a.count - b.count)* * }* *}* *case class IncrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class DecrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* *also the error stack trace copied from executor logs is:* *java.lang.OutOfMemoryError: Java heap space* *at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* *at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* *at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* *at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* *at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* *at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* *at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* *at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at
ElasticSearch for Spark times out
Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn't it? Bug?) Here's the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:122) ~[na:1.7.0_75] at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75] at java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75] at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) ~[commons-httpclient-3.1.jar:na] at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) ~[commons-httpclient-3.1.jar:na] at java.io.FilterInputStream.read(FilterInputStream.java:133) ~[na:1.7.0_75] at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) ~[commons-httpclient-3.1.jar:na] at org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.codehaus.jackson.impl.Utf8StreamParser.loadMore(Utf8StreamParser.java:172) ~[jackson-core-asl-1.9.11.jar:1.9.11] at org.codehaus.jackson.impl.Utf8StreamParser.parseEscapedFieldName(Utf8StreamParser.java:1502) ~[jackson-core-asl-1.9.11.jar:1.9.11] at org.codehaus.jackson.impl.Utf8StreamParser.slowParseFieldName(Utf8StreamParser.java:1404) ~[jackson-core-asl-1.9.11.jar:1.9.11] at
Re: Efficient saveAsTextFile by key, directory for each key?
I ended up post-processing the result in hive with a dynamic partition insert query to get a table partitioned by the key. Looking further, it seems that 'dynamic partition' insert is in Spark SQL and working well in Spark SQL versions 1.2.0: https://issues.apache.org/jira/browse/SPARK-3007 On Tue, Apr 21, 2015 at 5:45 PM, Arun Luthra arun.lut...@gmail.com wrote: Is there an efficient way to save an RDD with saveAsTextFile in such a way that the data gets shuffled into separated directories according to a key? (My end goal is to wrap the result in a multi-partitioned Hive table) Suppose you have: case class MyData(val0: Int, val1: string, directory_name: String) and an RDD called myrdd with type RDD[MyData]. Suppose that you already have an array of the distinct directory_name's, called distinct_directories. A very inefficient way to to this is: distinct_directories.foreach( dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name ) .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,)) .coalesce(5) .saveAsTextFile(base_dir_name/ + f$dir_name) ) I tried this solution, and it does not do the multiple myrdd.filter's in parallel. I'm guessing partitionBy might be in the efficient solution if an easy efficient solution exists... Thanks, Arun
Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)
Thanks all... btw, s3n load works without any issues with spark-1.3.1-bulit-for-hadoop 2.4 I tried this on 1.3.1-hadoop26 sc.hadoopConfiguration.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) val f = sc.textFile(s3n://bucket/file) f.count No it can't find the implementation path. Looks like some jar is missing ? java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam ) On Wed, Apr 22, 2015 at 9:49 AM, Shuai Zheng szheng.c...@gmail.com wrote: Below is my code to access s3n without problem (only for 1.3.1. there is a bug in 1.3.0). Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId); hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey); Regards, Shuai *From:* Sujee Maniyam [mailto:su...@sujee.net] *Sent:* Wednesday, April 22, 2015 12:45 PM *To:* Spark User List *Subject:* spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:) Hi all I am unable to access s3n:// urls using sc.textFile().. getting 'no file system for scheme s3n://' error. a bug or some conf settings missing? See below for details: env variables : AWS_SECRET_ACCESS_KEY=set AWS_ACCESS_KEY_ID=set spark/RELAESE : Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 ./bin/spark-shell val f = sc.textFile(s3n://bucket/file) f.count error== java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.count(RDD.scala:1006) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
Spark SQL: SchemaRDD, DataFrame. Multi-value, Nested attributes
Hi! I’m trying to query a dataset that reads data from csv and provides a SQL on top of it. The problem I have is I have a hierarchy of objects that I need to represent as a table so that users might use SQL to query it and do some aggregations. I do have multi value attributes (that in csv file looks like column_1, column_2, …, column_n) and I do have particular entities that split into several columns, like an Address (city, street, etc). And each row (let’s say it represents a Person) might have several Addresses. It’s pretty clear that it’s not simple to flatten everything into one long list of columns as I would be able to find some weird stuff by doing that. So my question is the following: 1. Does SchemaRDD support something like multi value attributes? It might look like and array of values that lives in just one column. Although it’s not clear how I’d aggregate over it. May be there is some custom type API I can utilise? 2. Does newly supported DataFrame provides something in this regard? My understanding is that columns in DataFrame do need to be actual columns (as in a relation), but they may be different types (like arrays or objects). May be implementation of DataFrame itself provides some sort of custom types or smth pluggable that I might consider. Any clue would be really appreciated. Thanks -- Eugene Morozov fathers...@list.ru
RE: Scheduling across applications - Need suggestion
Yes. Fair schedulwr only helps concurrency within an application. With multiple shells you'd either need something like Yarn/Mesos or careful math on resources as you said Sent on the new Sprint Network from my Samsung Galaxy S®4. div Original message /divdivFrom: Arun Patel arunp.bigd...@gmail.com /divdivDate:04/22/2015 6:28 AM (GMT-05:00) /divdivTo: user user@spark.apache.org /divdivSubject: Scheduling across applications - Need suggestion /divdiv /divI believe we can use the properties like --executor-memory --total-executor-cores to configure the resources allocated for each application. But, in a multi user environment, shells and applications are being submitted by multiple users at the same time. All users are requesting resources with different properties. At times, some users are not getting resources of the cluster. How to control resource usage in this case? Please share any best practices followed. As per my understanding, Fair scheduler can used for scheduling tasks within an application but not across multiple applications. Is this correct? Regards, Arun
RE: ElasticSearch for Spark times out
Hi Thanks for the help. My ES is up. Out of curiosity, do you know what the timeout value is? There are probably other things happening to cause the timeout; I don't think my ES is that slow but it's possible that ES is taking too long to find the data. What I see happening is that it uses scroll to get the data from ES; about 150 items at a time. Usual delay when I perform the same query from a browser plugin ranges from 1-5sec. Thanks From: Jeetendra Gangele [mailto:gangele...@gmail.com] Sent: April 22, 2015 3:09 PM To: Adrian Mocanu Cc: u...@spark.incubator.apache.org Subject: Re: ElasticSearch for Spark times out Basically ready timeout means hat no data arrived within the specified receive timeout period. Few thing I would suggest 1.are your ES cluster Up and running? 2. if 1 is yes then reduce the size of the Index make it few kbps and then test? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn't it? Bug?) Here's the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:122) ~[na:1.7.0_75] at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75] at java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75] at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) ~[commons-httpclient-3.1.jar:na] at
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
I have now a fair understanding of the situation after looking at javap output. So as a reminder: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } Basically the serialization failed because the ClassTag[K] came from the enclosing class, in which the dstream.map() code is running e.g. : class A[K : ClassTag](val dstream: DStream[K]) { [...] def fun = dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) } therefore the instance of class A is being serialized and it fails when the dstream field call writeObject() when it checks for the graph field... The fact that graph is not set might be expected given that I have not started the context yet... Cheers, On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote: It is kind of unexpected, i can imagine a real scenario under which it should trigger. But obviously I am missing something :) TD On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Sure. But in general, I am assuming this Graph is unexpectedly null when DStream is being serialized must mean something. Under which circumstances, such an exception would trigger? On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com wrote: Yeah, I am not sure what is going on. The only way to figure to take a look at the disassembled bytecodes using javap. TD On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com wrote: At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext. clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: Map Question
Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: RE: ElasticSearch for Spark times out
Hi, If you get ES response back in 1-5 seconds that's pretty slow. Are these ES aggregation queries? Costin may be right about GC possibly causing timeouts. SPM http://sematext.com/spm/ can give you all Spark and all key Elasticsearch metrics, including various JVM metrics. If the problem is GC, you'll see it. If you monitor both Spark side and ES side, you should be able to find some correlation with SPM. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Wed, Apr 22, 2015 at 5:43 PM, Costin Leau costin.l...@gmail.com wrote: Hi, First off, for Elasticsearch questions is worth pinging the Elastic mailing list as that is closer monitored than this one. Back to your question, Jeetendra is right that the exception indicates nodata is flowing back to the es-connector and Spark. The default is 1m [1] which should be more than enough for a typical scenario. As a side note the scroll size is 50 per tasks (so 150 suggests 3 tasks). Once the query is made, scrolling the document is fast - likely there's something else at hand that causes the connection to timeout. In such cases, you can enable logging on the REST package and see what type of data transfer occurs between ES and Spark. Do note that if a GC occurs, that can freeze Elastic (or Spark) which might trigger the timeout. Consider monitoring Elasticsearch during the query and see whether anything jumps - in particular the memory pressure. Hope this helps, [1] http://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#_network On 4/22/15 10:44 PM, Adrian Mocanu wrote: Hi Thanks for the help. My ES is up. Out of curiosity, do you know what the timeout value is? There are probably other things happening to cause the timeout; I don’t think my ES is that slow but it’s possible that ES is taking too long to find the data. What I see happening is that it uses scroll to get the data from ES; about 150 items at a time.Usual delay when I perform the same query from a browser plugin ranges from 1-5sec. Thanks *From:*Jeetendra Gangele [mailto:gangele...@gmail.com] *Sent:* April 22, 2015 3:09 PM *To:* Adrian Mocanu *Cc:* u...@spark.incubator.apache.org *Subject:* Re: ElasticSearch for Spark times out Basically ready timeout means hat no data arrived within the specified receive timeout period. Few thing I would suggest 1.are your ES cluster Up and running? 2. if 1 is yes then reduce the size of the Index make it few kbps and then test? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com mailto:amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn’t it? Bug?) Here’s the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727)
Re: StackOverflow Error when run ALS with 100 iterations
Hi, would you please how to checkpoint the training set rdd since all things are done in ALS.train method. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StackOverflow-Error-when-run-ALS-with-100-iterations-tp4296p22619.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to access HBase on Spark SQL
I notice that databricks provides external datasource api for Spark SQL. But I can't find any reference documents to guide how to access HBase based on it directly. Who know it? Or please give me some related links. Thanks. ZhangYi (张逸) BigEye website: http://www.bigeyedata.com blog: http://zhangyi.farbox.com tel: 15023157626
Re: Parquet Hive table become very slow on 1.3?
Yin, Thanks for your reply. We already patched this PR to our 1.3.0 As Xudong mentioned, we have thousand of parquet files, it's very very slow in first read, and another app will add more files and refresh table regularly. Cheng Lian's PR-5334 seems can resolve this issue, it will skip read all footer if we set auto merge to false. But it's not done yet. Thanks 2015-04-22 23:10 GMT+08:00 Yin Huai yh...@databricks.com: Xudong and Rex, Can you try 1.3.1? With PR 5339 http://github.com/apache/spark/pull/5339 , after we get a hive parquet from metastore and convert it to our native parquet code path, we will cache the converted relation. For now, the first access to that hive parquet table reads all of the footers (when you first refer to that table in a query or call sqlContext.table(hiveParquetTable)). All of your later accesses will hit the metadata cache. Thanks, Yin On Tue, Apr 21, 2015 at 1:13 AM, Rex Xiong bycha...@gmail.com wrote: We have the similar issue with massive parquet files, Cheng Lian, could you have a look? 2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com: Hi Cheng, I tried both these patches, and seems still not resolve my issue. And I found the most time is spend on this line in newParquet.scala: ParquetFileReader.readAllFootersInParallel( sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) Which need read all the files under the Parquet folder, while our Parquet folder has a lot of Parquet files (near 2000), read one file need about 2 seconds, so it become very slow ... And the PR 5231 did not skip this steps so it not resolve my issue. As our Parquet files are generated by a Spark job, so the number of .parquet files is same with the number of tasks, that is why we have so many files. But these files actually have the same schema. Is there any way to merge these files into one, or avoid scan each of them? On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Xudong, We had been digging this issue for a while, and believe PR 5339 http://github.com/apache/spark/pull/5339 and PR 5334 http://github.com/apache/spark/pull/5339 should fix this issue. There two problems: 1. Normally we cache Parquet table metadata for better performance, but when converting Hive metastore Hive tables, the cache is not used. Thus heavy operations like schema discovery is done every time a metastore Parquet table is converted. 2. With Parquet task side metadata reading (which is turned on by default), we can actually skip the row group information in the footer. However, we accidentally called a Parquet function which doesn't skip row group information. For your question about schema merging, Parquet allows different part-files have different but compatible schemas. For example, part-1.parquet has columns a and b, while part-2.parquet may has columns a and c. In some cases, the summary files (_metadata and _common_metadata) contains the merged schema (a, b, and c), but it's not guaranteed. For example, when the user defined metadata stored different part-files contain different values for the same key, Parquet simply gives up writing summary files. That's why all part-files must be touched to get a precise merged schema. However, in scenarios where a centralized arbitrative schema is available (e.g. Hive metastore schema, or the schema provided by user via data source DDL), we don't need to do schema merging on driver side, but defer it to executor side and each task only needs to reconcile those part-files it needs to touch. This is also what the Parquet developers did recently for parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45. Cheng On 3/31/15 11:49 PM, Zheng, Xudong wrote: Thanks Cheng! Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but the PR 5231 seems not. Not sure any other things I did wrong ... BTW, actually, we are very interested in the schema merging feature in Spark 1.3, so both these two solution will disable this feature, right? It seems that Parquet metadata is store in a file named _metadata in the Parquet file folder (each folder is a partition as we use partition table), why we need scan all Parquet part files? Is there any other solutions could keep schema merging feature at the same time? We are really like this feature :) On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Xudong, This is probably because of Parquet schema merging is turned on by default. This is generally useful for Parquet files with different but compatible schemas. But it needs to read metadata from all Parquet part-files. This can be problematic when reading Parquet files with lots of part-files, especially when the user doesn't need schema merging. This issue is tracked by SPARK-6575, and here is a PR for it: https://github.com/apache/spark/pull/5231. This PR adds a
Re: Start ThriftServer Error
You may need to specify the hive port itself. For example, my own Thrift start command is in the form: ./sbin/start-thriftserver.sh --master spark://$myserver:7077 --driver-class-path $CLASSPATH --hiveconf hive.server2.thrift.bind.host $myserver --hiveconf hive.server2.thrift.port 1 HTH! On Wed, Apr 22, 2015 at 5:27 AM Yiannis Gkoufas johngou...@gmail.com wrote: Hi Himanshu, I am using: ./start-thriftserver.sh --master spark://localhost:7077 Do I need to specify something additional to the command? Thanks! On 22 April 2015 at 13:14, Himanshu Parashar himanshu.paras...@gmail.com wrote: what command are you using to start the Thrift server? On Wed, Apr 22, 2015 at 3:52 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi all, I am trying to start the thriftserver and I get some errors. I have hive running and placed hive-site.xml under the conf directory. From the logs I can see that the error is: Call From localhost to localhost:54310 failed I am assuming that it tries to connect to the wrong port for the namenode, which in my case its running on 9000 instead of 54310 Any help would be really appreciated. Thanks a lot! -- [HiM]
Re: RE: ElasticSearch for Spark times out
Hi, First off, for Elasticsearch questions is worth pinging the Elastic mailing list as that is closer monitored than this one. Back to your question, Jeetendra is right that the exception indicates nodata is flowing back to the es-connector and Spark. The default is 1m [1] which should be more than enough for a typical scenario. As a side note the scroll size is 50 per tasks (so 150 suggests 3 tasks). Once the query is made, scrolling the document is fast - likely there's something else at hand that causes the connection to timeout. In such cases, you can enable logging on the REST package and see what type of data transfer occurs between ES and Spark. Do note that if a GC occurs, that can freeze Elastic (or Spark) which might trigger the timeout. Consider monitoring Elasticsearch during the query and see whether anything jumps - in particular the memory pressure. Hope this helps, [1] http://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#_network On 4/22/15 10:44 PM, Adrian Mocanu wrote: Hi Thanks for the help. My ES is up. Out of curiosity, do you know what the timeout value is? There are probably other things happening to cause the timeout; I don’t think my ES is that slow but it’s possible that ES is taking too long to find the data. What I see happening is that it uses scroll to get the data from ES; about 150 items at a time.Usual delay when I perform the same query from a browser plugin ranges from 1-5sec. Thanks *From:*Jeetendra Gangele [mailto:gangele...@gmail.com] *Sent:* April 22, 2015 3:09 PM *To:* Adrian Mocanu *Cc:* u...@spark.incubator.apache.org *Subject:* Re: ElasticSearch for Spark times out Basically ready timeout means hat no data arrived within the specified receive timeout period. Few thing I would suggest 1.are your ES cluster Up and running? 2. if 1 is yes then reduce the size of the Index make it few kbps and then test? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com mailto:amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn’t it? Bug?) Here’s the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at
Re: Hive table creation - possible bug in Spark 1.3?
Sorry for the confusion. We should be more clear about the semantics in the documentation. (PRs welcome :) ) .saveAsTable does not create a hive table, but instead creates a Spark Data Source table. Here the metadata is persisted into Hive, but hive cannot read the tables (as this API support MLlib vectors, schema discovery, and other things that hive does not). If you want to create a hive table, use HiveQL and run a CREATE TABLE AS SELECT ... On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote: I wrote few mails here regarding this issue. After further investigation I think there is a bug in Spark 1.3 in saving Hive tables. (hc is HiveContext) 1. Verify the needed configuration exists: scala hc.sql(set hive.exec.compress.output).collect res4: Array[org.apache.spark.sql.Row] = Array([hive.exec.compress.output=true]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.codec).collect res5: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.type).collect res6: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.type=BLOCK]) 2. Loading DataFrame and save as table (path point to exists file): val saDF = hc.parquetFile(path) saDF.count (count yield 229764 - i.e. the rdd exists) saDF.saveAsTable(test_hive_ms) Now for few interesting outputs: 1. Trying to query Hive CLI, the table exists but with wrong output format: Failed with exception java.io.IOException:java.io.IOException: hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet not a SequenceFile 2. Looking at the output files found that files are '.parquet' and not '.snappy' 3. Looking at the saveAsTable output shows that it actually store the table in both, wrong output format and without compression: 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:test_hive_ms, dbName:default, owner:hadoop, createTime:1429687014, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring, comment:from deserializer)], location:null, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe, parameters:{serialization.format=1, path=hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms} http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[], parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]}, EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=org.apache.spark.sql.parquet}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) So, the question is: do I miss some configuration here or should I open a bug? Thanks, Ophir
spark-ec2 s3a filesystem support and hadoop versions
I would like to easily launch a cluster that supports s3a file systems. if I launch a cluster with `spark-ec2 --hadoop-major-version=2`, what determines the minor version of hadoop? Does it depend on the spark version being launched? Are there other allowed values for --hadoop-major-version besides 1 and 2? How can I get a cluster that supports s3a fielsystems? thanks Daniel
why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
Hi, Just a quick question, Regarding the source code of groupByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L453 In the end, it cast CompactBuffer to Iterable. But why ? Any advantage? Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
setting cost in linear SVM [Python]
Is there a way to set the cost value C when using linear SVM?
beeline that comes with spark 1.3.0 doesn't work with --hiveconf or ''--hivevar which substitutes variables at hive scripts.
Hello, I am using Spark 1.3 for SparkSQL (hive) ThriftServer Beeline. The Beeline doesn't work with --hiveconf or ''--hivevar which substitutes variables at hive scripts. I found the following jiras saying that Hive 0.13 resolved that issue. I wonder if this is well-known issue? https://issues.apache.org/jira/browse/HIVE-4568 Beeline needs to support resolving variables https://issues.apache.org/jira/browse/HIVE-6173 Beeline doesn't accept --hiveconf option as Hive CLI does Thanks, Okehee -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/beeline-that-comes-with-spark-1-3-0-doesn-t-work-with-hiveconf-or-hivevar-which-substitutes-variable-tp22615.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
Vaguely makes sense. :) Wow that's an interesting corner case. On Wed, Apr 22, 2015 at 1:57 PM, Jean-Pascal Billaud j...@tellapart.com wrote: I have now a fair understanding of the situation after looking at javap output. So as a reminder: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } Basically the serialization failed because the ClassTag[K] came from the enclosing class, in which the dstream.map() code is running e.g. : class A[K : ClassTag](val dstream: DStream[K]) { [...] def fun = dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) } therefore the instance of class A is being serialized and it fails when the dstream field call writeObject() when it checks for the graph field... The fact that graph is not set might be expected given that I have not started the context yet... Cheers, On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote: It is kind of unexpected, i can imagine a real scenario under which it should trigger. But obviously I am missing something :) TD On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Sure. But in general, I am assuming this Graph is unexpectedly null when DStream is being serialized must mean something. Under which circumstances, such an exception would trigger? On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com wrote: Yeah, I am not sure what is going on. The only way to figure to take a look at the disassembled bytecodes using javap. TD On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com wrote: At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$. ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext. clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: RE: ElasticSearch for Spark times out
Is your ES cluster reachable from your Spark cluster via network / firewall? Can you run the same query from the spark master and slave nodes via curl / one of the other clients? Seems odd that GC issues would be a problem from the scan but not when running query from a browser plugin... Sounds like it could be a network issue. — Sent from Mailbox On Thu, Apr 23, 2015 at 5:11 AM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, If you get ES response back in 1-5 seconds that's pretty slow. Are these ES aggregation queries? Costin may be right about GC possibly causing timeouts. SPM http://sematext.com/spm/ can give you all Spark and all key Elasticsearch metrics, including various JVM metrics. If the problem is GC, you'll see it. If you monitor both Spark side and ES side, you should be able to find some correlation with SPM. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Wed, Apr 22, 2015 at 5:43 PM, Costin Leau costin.l...@gmail.com wrote: Hi, First off, for Elasticsearch questions is worth pinging the Elastic mailing list as that is closer monitored than this one. Back to your question, Jeetendra is right that the exception indicates nodata is flowing back to the es-connector and Spark. The default is 1m [1] which should be more than enough for a typical scenario. As a side note the scroll size is 50 per tasks (so 150 suggests 3 tasks). Once the query is made, scrolling the document is fast - likely there's something else at hand that causes the connection to timeout. In such cases, you can enable logging on the REST package and see what type of data transfer occurs between ES and Spark. Do note that if a GC occurs, that can freeze Elastic (or Spark) which might trigger the timeout. Consider monitoring Elasticsearch during the query and see whether anything jumps - in particular the memory pressure. Hope this helps, [1] http://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#_network On 4/22/15 10:44 PM, Adrian Mocanu wrote: Hi Thanks for the help. My ES is up. Out of curiosity, do you know what the timeout value is? There are probably other things happening to cause the timeout; I don’t think my ES is that slow but it’s possible that ES is taking too long to find the data. What I see happening is that it uses scroll to get the data from ES; about 150 items at a time.Usual delay when I perform the same query from a browser plugin ranges from 1-5sec. Thanks *From:*Jeetendra Gangele [mailto:gangele...@gmail.com] *Sent:* April 22, 2015 3:09 PM *To:* Adrian Mocanu *Cc:* u...@spark.incubator.apache.org *Subject:* Re: ElasticSearch for Spark times out Basically ready timeout means hat no data arrived within the specified receive timeout period. Few thing I would suggest 1.are your ES cluster Up and running? 2. if 1 is yes then reduce the size of the Index make it few kbps and then test? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com mailto:amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn’t it? Bug?) Here’s the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
Loading lots of .parquet files in Spark 1.3.1 (Hadoop 2.4)
I am trying to read a few hundred .parquet files from S3 into an EMR cluster. The .parquet files are structured by date and have /_common_metadata/ in each of the folders (as well as /_metadata/).The *sqlContext.parquetFile* operation takes a very long time, opening for reading each of the .parquet files. I would have expected that the /*metdata/ files would be used for structure so that Spark does not have to go through all the files in a folder. I have also tried for a single folder this experiment, all the .parquet files have been opened and the /*metdata/ was apparently ignored.What can I do to speed up the loading process? Can I load the .parquet files in parallel? What is the purpose of the /*metadata/ files? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-lots-of-parquet-files-in-Spark-1-3-1-Hadoop-2-4-tp22624.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
LDA code little error @Xiangrui Meng
Hi: there is a little error in source code LDA.scala at line 180, as follows: def setBeta(beta: Double): this.type = setBeta(beta) which cause java.lang.StackOverflowError. It's easy to see there is error -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LDA-code-little-error-Xiangrui-Meng-tp22621.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib - Collaborative Filtering - trainImplicit task size
This is the size of the serialized task closure. Is stage 246 part of ALS iterations, or something before or after it? -Xiangrui On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone christian.per...@gmail.com wrote: Hi Sean, thanks for the answer. I tried to call repartition() on the input with many different sizes and it still continues to show that warning message. On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote: I think maybe you need more partitions in your input, which might make for smaller tasks? On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone christian.per...@gmail.com wrote: I keep seeing these warnings when using trainImplicit: WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. And then the task size starts to increase. Is this a known issue ? Thanks ! -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to debug Spark on Yarn?
I submit a spark app to YARN and i get these messages 15/04/22 22:45:04 INFO yarn.Client: Application report for application_1429087638744_101363 (state: RUNNING) 15/04/22 22:45:04 INFO yarn.Client: Application report for application_1429087638744_101363 (state: RUNNING). ... 1) I can go to Spark UI and see the status of the APP but cannot see the logs as the job progresses. How can i see logs of executors as they progress ? 2) In case the App fails/completes then Spark UI vanishes and i get a YARN Job page that says job failed, there are no link to Spark UI again. Now i take the job ID and run yarn application logs appid and my console fills up (with huge scrolling) with logs of all executors. Then i copy and paste into a text editor and search for keywords Exception , Job aborted due to . Is this the right way to view logs ? -- Deepak
Re: Spark 1.3.1 Dataframe breaking ALS.train?
The patched was merged and it will be included in 1.3.2 and 1.4.0. Thanks for reporting the bug! -Xiangrui On Tue, Apr 21, 2015 at 2:51 PM, ayan guha guha.a...@gmail.com wrote: Thank you all. On 22 Apr 2015 04:29, Xiangrui Meng men...@gmail.com wrote: SchemaRDD subclasses RDD in 1.2, but DataFrame is no longer an RDD in 1.3. We should allow DataFrames in ALS.train. I will submit a patch. You can use `ALS.train(training.rdd, ...)` for now as a workaround. -Xiangrui On Tue, Apr 21, 2015 at 10:51 AM, Joseph Bradley jos...@databricks.com wrote: Hi Ayan, If you want to use DataFrame, then you should use the Pipelines API (org.apache.spark.ml.*) which will take DataFrames: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS In the examples/ directory for ml/, you can find a MovieLensALS example. Good luck! Joseph On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote: Hi I am getting an error Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD It was working fine in 1.2.0 (till last night :)) Any solution? I am thinking to map the training dataframe back to a RDD, byt will lose the schema information. Best Ayan On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote: Hi Just upgraded to Spark 1.3.1. I am getting an warning Warning (from warnings module): File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py, line 191 warnings.warn(inferSchema is deprecated, please use createDataFrame instead) UserWarning: inferSchema is deprecated, please use createDataFrame instead However, documentation still says to use inferSchema. Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in section Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Rank:8 Lmbda:1.0 iteration:10 Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem with using Spark ML
Please try reducing the step size. The native BLAS library is not required. -Xiangrui On Tue, Apr 21, 2015 at 5:15 AM, Staffan staffan.arvids...@gmail.com wrote: Hi, I've written an application that performs some machine learning on some data. I've validated that the data _should_ give a good output with a decent RMSE by using Lib-SVM: Mean squared error = 0.00922063 (regression) Squared correlation coefficient = 0.9987 (regression) When I try to use Spark ML to do the exact same thing I get: Mean Squared Error = 8.466193152067944E224 Which is somewhat worse.. I've tried to look at the data before it's inputted to the model, printed that data to file (which is actually the data used when I got the result from Lib-SVM above). Somewhere there much be a huge mistake, but I cannot place it somewhere in my code (see below). traningLP and testLP are training and test-data, in RDD[LabeledPoint]. // Generate model val model_gen = new RidgeRegressionWithSGD(); val model = model_gen.run(trainingLP); // Predict on the test-data val valuesAndPreds = testLP.map { point = val prediction = model.predict(point.features); println(label: + point.label + , pred: + prediction); (point.label, prediction); } val MSE = valuesAndPreds.map{case(v, p) = math.pow((v - p), 2)}.mean(); println(Mean Squared Error = + MSE) I've printed label and prediction-values for each data-point in the testset, and the result is something like this; label: 5.04, pred: -4.607899000641277E112 label: 3.59, pred: -3.96787105480399E112 label: 5.06, pred: -2.8263294374576145E112 label: 2.85, pred: -1.1536508029072844E112 label: 2.1, pred: -4.269312783707508E111 label: 2.75, pred: -3.0072665148591558E112 label: -0.29, pred: -2.035681731641989E112 label: 1.98, pred: -3.163404340354783E112 So there is obviously something wrong with the prediction step. I'm using the SparseVector representation of the Vector in LabeledPoint, looking something like this for reference (shortened for convenience); (-1.59,(2080,[29,59,62,74,127,128,131,144,149,175,198,200,239,247,267,293,307,364,374,393,410,424,425,431,448,469,477,485,501,525,532,533,538,560,..],[1.0,1.0,2.0,8.0,1.0,1.0,6.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,8.0,2.0,1.0,1.0,..])) (-1.75,(2080,[103,131,149,208,296,335,520,534,603,620,661,694,709,748,859,1053,1116,1156,1186,1207,1208,1223,1256,1278,1356,1375,1399,1480,1569,..],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,2.0,2.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0,1.0,7.0,1.0,3.0,2.0,1.0])) I do get one type of warning, but that's about it! (And as to my understanding, this native code is not required to get the correct results, only to improve performance). 6010 [main] WARN com.github.fommil.netlib.BLAS - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 6011 [main] WARN com.github.fommil.netlib.BLAS - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS So where do I go from here? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [MLlib] fail to run word2vec
We store the vectors on the driver node. So it is hard to handle a really large vocabulary. You can use setMinCount to filter out infrequent word to reduce the model size. -Xiangrui On Wed, Apr 22, 2015 at 12:32 AM, gm yu husty...@gmail.com wrote: When use Mllib.Word2Vec, I meet the following error: allocating large array--thread_id[0x7ff2741ca000]--thread_name[Driver]--array_size[1146093680 bytes]--array_length[1146093656 elememts] prio=10 tid=0x7ff2741ca000 nid=0x1405f runnable at java.util.Arrays.copyOf(Arrays.java:2786) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94) - locked 0x7ff33f7fafd0 (a java.io.ByteArrayOutputStream) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1812) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1504) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1346) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1627) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:270) at com.taobao.changrui.SynonymFind$.main(SynonymFind.scala:79) at com.taobao.changrui.SynonymFind.main(SynonymFind.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:516) The data size is: 100M+ sentences, 100M+ words Jos Setting is: 50 executors with 20GB and 4cores, the driver memory is 30GB Any ideas? Thank you. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: the indices of SparseVector must be ordered while computing SVD
Having ordered indices is a contract of SparseVector: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector. We do not verify it for performance. -Xiangrui On Wed, Apr 22, 2015 at 8:26 AM, yaochunnan yaochun...@gmail.com wrote: Hi all, I am using Spark 1.3.1 to write a Spectral Clustering algorithm. This really confused me today. At first I thought my implementation is wrong. It turns out it's an issue in MLlib. Fortunately, I've figured it out. I suggest to add a hint on user document of MLlib ( as far as I know, there have not been such hints yet) that indices of Local Sparse Vector must be ordered in ascending manner. Because of ignorance of this point, I spent a lot of time looking for reasons why computeSVD of RowMatrix did not run correctly on Sparse data. I don't know the influence of Sparse Vector without ordered indices on other functions, but I believe it is necessary to let the users know or fix it. Actually, it's very easy to fix. Just add a sortBy function in internal construction of SparseVector. Here is an example to reproduce the affect of unordered Sparse Vector on computeSVD. //in spark-shell, Spark 1.3.1 import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors} val sparseData_ordered = Seq( Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), Vectors.sparse(3, Array(0,1,2), Array(3.0, 4.0, 5.0)), Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)), Vectors.sparse(3, Array(0,2), Array(9.0, 1.0)) ) val sparseMat_ordered = new RowMatrix(sc.parallelize(sparseData_ordered, 2)) val sparseData_not_ordered = Seq( Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), Vectors.sparse(3, Array(2,1,0), Array(5.0,4.0,3.0)), Vectors.sparse(3, Array(0,1,2), Array(6.0, 7.0, 8.0)), Vectors.sparse(3, Array(2,0), Array(1.0,9.0)) ) val sparseMat_not_ordered = new RowMatrix(sc.parallelize(sparseData_not_ordered, 2)) //apparently, sparseMat_ordered and sparseMat_not_ordered are essentially the same matirx //however, the computeSVD result of these two matrixes are different. Users should be notified about this situation. println(sparseMat_ordered.computeSVD(2, true).U.rows.collect.mkString(\n)) println(===) println(sparseMat_not_ordered.computeSVD(2, true).U.rows.collect.mkString(\n)) == The results are: ordered: [-0.10972870132786407,-0.18850811494220537] [-0.44712472003608356,-0.24828866611663725] [-0.784520738744303,-0.3080692172910691] [-0.4154110101064339,0.8988385762953358] not ordered: [-0.10830447119599484,-0.1559341848984378] [-0.4522713511277327,-0.23449829541447448] [-0.7962382310594706,-0.3130624059305111] [-0.43131320303494614,0.8453864703362308] Looking into this issue, I can see it's reason locates in RowMatrix.scala(line 629). The implementation of Sparse dspr here requires ordered indices. Because it is scanning the indices consecutively to skip empty columns. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/the-indices-of-SparseVector-must-be-ordered-while-computing-SVD-tp22611.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LDA code little error @Xiangrui Meng
Thanks! That's a bug .. -Xiangrui On Wed, Apr 22, 2015 at 9:34 PM, buring qyqb...@gmail.com wrote: Hi: there is a little error in source code LDA.scala at line 180, as follows: def setBeta(beta: Double): this.type = setBeta(beta) which cause java.lang.StackOverflowError. It's easy to see there is error -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LDA-code-little-error-Xiangrui-Meng-tp22621.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StandardScaler failing with OOM errors in PySpark
What is the feature dimension? Did you set the driver memory? -Xiangrui On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote: I'm trying to use the StandardScaler in pyspark on a relatively small (a few hundred Mb) dataset of sparse vectors with 800k features. The fit method of StandardScaler crashes with Java heap space or Direct buffer memory errors. There should be plenty of memory around -- 10 executors with 2 cores each and 8 Gb per core. I'm giving the executors 9g of memory and have also tried lots of overhead (3g), thinking it might be the array creation in the aggregators that's causing issues. The bizarre thing is that this isn't always reproducible -- sometimes it actually works without problems. Should I be setting up executors differently? Thanks, Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.filter vs. RDD.join--advice please
Test it out, but I would be willing to bet the join is going to be a good deal faster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-filter-vs-RDD-join-advice-please-tp22612p22614.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scheduling across applications - Need suggestion
YARN capacity scheduler support hierarchical queues, which you can assign cluster resource as percentage. Your spark application/shell can be submitted to different queues. Mesos supports fine-grained mode, which allows the machines/cores used each executors ramp up and down. Lan On Wed, Apr 22, 2015 at 2:32 PM, yana yana.kadiy...@gmail.com wrote: Yes. Fair schedulwr only helps concurrency within an application. With multiple shells you'd either need something like Yarn/Mesos or careful math on resources as you said Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: Arun Patel Date:04/22/2015 6:28 AM (GMT-05:00) To: user Subject: Scheduling across applications - Need suggestion I believe we can use the properties like --executor-memory --total-executor-cores to configure the resources allocated for each application. But, in a multi user environment, shells and applications are being submitted by multiple users at the same time. All users are requesting resources with different properties. At times, some users are not getting resources of the cluster. How to control resource usage in this case? Please share any best practices followed. As per my understanding, Fair scheduler can used for scheduling tasks within an application but not across multiple applications. Is this correct? Regards, Arun
Re: Map Question
Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: SparkSQL performance
https://github.com/databricks/spark-avro On Tue, Apr 21, 2015 at 3:09 PM, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Thanks Michael! I have tried applying my schema programatically but I didn't get any improvement on performance :( Could you point me to some code examples using Avro please? Many thanks again! Renato M. 2015-04-21 20:45 GMT+02:00 Michael Armbrust mich...@databricks.com: Here is an example using rows directly: https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#programmatically-specifying-the-schema Avro or parquet input would likely give you the best performance. On Tue, Apr 21, 2015 at 4:28 AM, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Thanks for the hints guys! much appreciated! Even if I just do a something like: Select * from tableX where attribute1 5 I see similar behaviour. @Michael Could you point me to any sample code that uses Spark's Rows? We are at a phase where we can actually change our JavaBeans for something that provides a better performance than what we are seeing now. Would you recommend using Avro presentation then? Thanks again! Renato M. 2015-04-21 1:18 GMT+02:00 Michael Armbrust mich...@databricks.com: There is a cost to converting from JavaBeans to Rows and this code path has not been optimized. That is likely what you are seeing. On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote: SparkSQL optimizes better by column pruning and predicate pushdown, primarily. Here you are not taking advantage of either. I am curious to know what goes in your filter function, as you are not using a filter in SQL side. Best Ayan On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Does anybody have an idea? a clue? a hint? Thanks! Renato M. 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com: Hi all, I have a simple query Select * from tableX where attribute1 between 0 and 5 that I run over a Kryo file with four partitions that ends up being around 3.5 million rows in our case. If I run this query by doing a simple map().filter() it takes around ~9.6 seconds but when I apply schema, register the table into a SqlContext, and then run the query, it takes around ~16 seconds. This is using Spark 1.2.1 with Scala 2.10.0 I am wondering why there is such a big gap on performance if it is just a filter. Internally, the relation files are mapped to a JavaBean. This different data presentation (JavaBeans vs SparkSQL internal representation) could lead to such difference? Is there anything I could do to make the performance get closer to the hard-coded option? Thanks in advance for any suggestions or ideas. Renato M.
RE: Trouble working with Spark-CSV package (error: object databricks is not a member of package com)
You can try pulling the jar with wget and using it with -jars with Spark shell. I used 1.0.3 with Spark 1.3.0 but with a different version of scala. From the stack trace it looks like Spark shell is just not seeing the csv jar... Sent on the new Sprint Network from my Samsung Galaxy S®4. div Original message /divdivFrom: Mohammed Omer beancinemat...@gmail.com /divdivDate:04/22/2015 2:01 PM (GMT-05:00) /divdivTo: user@spark.apache.org /divdivSubject: Trouble working with Spark-CSV package (error: object databricks is not a member of package com) /divdiv /divAfternoon all, I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via: `mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package` The error is encountered when running spark shell via: `spark-shell --packages com.databricks:spark-csv_2.11:1.0.3` The full trace of the commands can be found at https://gist.github.com/momer/9d1ca583f9978ec9739d Not sure if I've done something wrong, or if the documentation is outdated, or...? Would appreciate any input or push in the right direction! Thank you, Mo
Re: Join on DataFrames from the same source (Pyspark)
DataFrames do not have the attributes 'alias' or 'as' in the Python API. On 2015-04-21 20:41, Michael Armbrust wrote: This is https://issues.apache.org/jira/browse/SPARK-6231 Unfortunately this is pretty hard to fix as its hard for us to differentiate these without aliases. However you can add an alias as follows: from pyspark.sql.functions import * df.alias(a).join(df.alias(b), col(a.col1) == col(b.col1)) On Tue, Apr 21, 2015 at 8:10 AM, Karlson ksonsp...@siberie.de wrote: Sorry, my code actually was df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') But in Spark 1.4.0 this does not seem to make any difference anyway and the problem is the same with both versions. On 2015-04-21 17:04, ayan guha wrote: your code should be df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') Your current code is generating a tupple, and of course df_1 and df_2 are different, so join is yielding to cartesian. Best Ayan On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote: Hi, can anyone confirm (and if so elaborate on) the following problem? When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example: I load a DataFrame with n rows from disk: df = sql_context.parquetFile('data.parquet') Then I create two DataFrames from that source. df_one = df.select(['col1', 'col2']) df_two = df.select(['col1', 'col3']) Finally I want to (inner) join them back together: df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows. That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Building Spark : Adding new DataType in Catalyst
Hi, I am working on adding Geometry i.e. a new DataType into Spark catalyst, so that ROW can hold that object also, I've made a progress but its time taking as I've to compile the whole spark project, otherwise that changes aren't visible, I've tried to just build Spark SQL and Catalyst module but it doesn't have any impact unless i compile the whole spark. What i am missing ? Is there any better way ?? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-Adding-new-DataType-in-Catalyst-tp22604.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[MLlib] fail to run word2vec
When use Mllib.Word2Vec, I meet the following error: allocating large array--thread_id[0x7ff2741ca000]--thread_name[Driver]--array_size[1146093680 bytes]--array_length[1146093656 elememts] prio=10 tid=0x7ff2741ca000 nid=0x1405f runnable at java.util.Arrays.copyOf(Arrays.java:2786) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94) - locked 0x7ff33f7fafd0 (a java.io.ByteArrayOutputStream) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1812) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1504) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1346) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1627) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:270) at com.taobao.changrui.SynonymFind$.main(SynonymFind.scala:79) at com.taobao.changrui.SynonymFind.main(SynonymFind.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:516) The data size is: 100M+ sentences, 100M+ words Jos Setting is: 50 executors with 20GB and 4cores, the driver memory is 30GB Any ideas? Thank you.
Re: sparksql - HiveConf not found during task deserialization
I see, now try a bit tricky approach, Add the hive jar to the SPARK_CLASSPATH (in conf/spark-env.sh file on all machines) and make sure that jar is available on all the machines in the cluster in the same path. Thanks Best Regards On Wed, Apr 22, 2015 at 11:24 AM, Manku Timma manku.tim...@gmail.com wrote: Akhil, Thanks for the suggestions. I tried out sc.addJar, --jars, --conf spark.executor.extraClassPath and none of them helped. I added stuff into compute-classpath.sh. That did not change anything. I checked the classpath of the running executor and made sure that the hive jars are in that dir. For me the most confusing thing is that the executor can actually create HiveConf objects but when it cannot find that when the task deserializer is at work. On 20 April 2015 at 14:18, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try sc.addJar(/path/to/your/hive/jar), i think it will resolve it. Thanks Best Regards On Mon, Apr 20, 2015 at 12:26 PM, Manku Timma manku.tim...@gmail.com wrote: Akhil, But the first case of creating HiveConf on the executor works fine (map case). Only the second case fails. I was suspecting some foul play with classloaders. On 20 April 2015 at 12:20, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like a missing jar, try to print the classpath and make sure the hive jar is present. Thanks Best Regards On Mon, Apr 20, 2015 at 11:52 AM, Manku Timma manku.tim...@gmail.com wrote: I am using spark-1.3 with hadoop-provided and hive-provided and hive-0.13.1 profiles. I am running a simple spark job on a yarn cluster by adding all hadoop2 and hive13 jars to the spark classpaths. If I remove the hive-provided while building spark, I dont face any issue. But with hive-provided I am getting a java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf in the yarn executor. Code is below: import org.apache.spark._ import org.apache.spark.sql._ import org.apache.hadoop.hive.conf.HiveConf object Simple { def main(args: Array[String]) = { val sc = new SparkContext(new SparkConf()) val sqlC = new org.apache.spark.sql.hive.HiveContext(sc) val x = sc.parallelize(1 to 2).map(x = { val h = new HiveConf; h.getBoolean(hive.test, false) }) x.collect.foreach(x = println(s- $x )) val result = sqlC.sql( select * from products_avro order by month, name, price ) result.collect.foreach(println) } } The first job (involving map) runs fine. HiveConf is instantiated and the conf variable is looked up etc. But the second job (involving the select * query) throws the class not found exception. The task deserializer is the one throwing the exception. It is unable to find the class in its classpath. Not sure what is different from the first job which also involved HiveConf. 157573 [task-result-getter-3] 2015/04/20 11:01:48:287 WARN TaskSetManager: Lost task 0.2 in stage 2.0 (TID 4, localhost): java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.init(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
Re: How does GraphX stores the routing table?
Hi Ankur, Thanks for the answer. However i still have following queries. On Wed, Apr 22, 2015 at 8:39 AM, Ankur Dave ankurd...@gmail.com wrote: On Tue, Apr 21, 2015 at 10:39 AM, mas mas.ha...@gmail.com wrote: How does GraphX stores the routing table? Is it stored on the master node or chunks of the routing table is send to each partition that maintains the record of vertices and edges at that node? The latter: the routing table is stored alongside the vertices, and for each vertex it stores the set of edge partitions that reference that vertex. *Then how the master node tracks that where(in which partition) a particular vertex and edge is?* *Further, does it mean that to fetch a particular edge we first have to find its source or destination vertex ?* If only customized edge partitioning is performed will the corresponding vertices be sent to same partition or not ? If I understand correctly, you're asking whether it's possible to colocate the vertices with the edges so they don't have to move during replication. It's possible to do this in some cases by partitioning each edge based on a hash partitioner of its source or destination vertex. GraphX will still do replication using a shuffle, but most of the shuffle files should be local in this case. I tried this a while ago but didn't find a very big improvement for PageRank. Ultimately a more general solution would be to unify the vertex and edge RDDs by designating one replica for each vertex as the master. This would also reduce the storage cost by a factor of (average degree - 1)/(average degree). *What do you exactly mean here by desingating one replica for each vertex as the master ? How can we perform this ?* Ankur http://www.ankurdave.com/ -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.*
Re: Not able run multiple tasks in parallel, spark streaming
You can enable this flag to run multiple jobs concurrently, It might not be production ready, but you can give it a try: sc.set(spark.streaming.concurrentJobs,2) Refer to TD's answer here http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header for more information. Thanks Best Regards On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal abhaybansal.1...@gmail.com wrote: Hi, I have use case wherein I have to join multiple kafka topics in parallel. So if there are 2n topics there is a one to one mapping of topics which needs to be joined. val arr= ... for(condition) { val dStream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics1 ).map(a=(getKey1(a._2),a._2)) val dStream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics2 ).map(a=(getKey2(a._2),a._2)) arr(counter) = (dStream1, dStream2); counter+=1; } arr.par.foreach { case(dStream1, dStream2) = try { val joined = dStream1.join(dStream2,4); joined.saveAsTextFiles(joinedData”) } catch { case t:Exception =t.printStackTrace(); } } ssc.start() ssc.awaitTermination() Doing so the streams are getting joined by sequentially. Is there a way out of this? I am new to spark, would appreciate any suggestions around this. Thanks, -Abhay
Hive table creation - possible bug in Spark 1.3?
I wrote few mails here regarding this issue. After further investigation I think there is a bug in Spark 1.3 in saving Hive tables. (hc is HiveContext) 1. Verify the needed configuration exists: scala hc.sql(set hive.exec.compress.output).collect res4: Array[org.apache.spark.sql.Row] = Array([hive.exec.compress.output=true]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.codec).collect res5: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.type).collect res6: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.type=BLOCK]) 2. Loading DataFrame and save as table (path point to exists file): val saDF = hc.parquetFile(path) saDF.count (count yield 229764 - i.e. the rdd exists) saDF.saveAsTable(test_hive_ms) Now for few interesting outputs: 1. Trying to query Hive CLI, the table exists but with wrong output format: Failed with exception java.io.IOException:java.io.IOException: hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet not a SequenceFile 2. Looking at the output files found that files are '.parquet' and not '.snappy' 3. Looking at the saveAsTable output shows that it actually store the table in both, wrong output format and without compression: 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:test_hive_ms, dbName:default, owner:hadoop, createTime:1429687014, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring, comment:from deserializer)], location:null, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe, parameters:{serialization.format=1, path=hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[], parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]}, EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=org.apache.spark.sql.parquet}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) So, the question is: do I miss some configuration here or should I open a bug? Thanks, Ophir
Re: Addition of new Metrics for killed executors.
Hi, Looks interesting. It is quite interesting to know about what could have been the reason for not showing these stats in UI. As per the description of Patrick W in https://spark-project.atlassian.net/browse/SPARK-999, it does not mention any exception w.r.t failed tasks/executors. Can somebody please comment if it is a bug or some intended behaviour w.r.t performance or some other bottleneck. --Twinkle On Mon, Apr 20, 2015 at 2:47 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi Twinkle, We have a use case in where we want to debug the reason of how n why an executor got killed. Could be because of stackoverflow, GC or any other unexpected scenario. If I see the driver UI there is no information present around killed executors, So was just curious how do people usually debug those things apart from scanning logs and understanding it. The metrics we are planning to add are similar to what we have for non killed executors - [data per stage specifically] - numFailedTasks, executorRunTime, inputBytes, memoryBytesSpilled .. etc. Apart from that we also intend to add all information present in an executor tabs for running executors. Thanks, Archit Thakur. On Mon, Apr 20, 2015 at 1:31 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi Archit, What is your use case and what kind of metrics are you planning to add? Thanks, Twinkle On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, We are planning to add new Metrics in Spark for the executors that got killed during the execution. Was just curious, why this info is not already present. Is there some reason for not adding it.? Any ideas around are welcome. Thanks and Regards, Archit Thakur.
Re: Multiple HA spark clusters managed by 1 ZK cluster?
the key thing would be to use different ZK paths for each cluster. You shouldn't need more than 2 ZK quorums even for a large (few thousand node) Hadoop clusters: one for the HA bits of the infrastructure (HDFS, YARN) and one for the applications to abuse. It's easy for apps using ZK to stick too much stuff, with too high a rate of change for ZK to be happy; overloading ZK can then impact those core infrastructure services. I don't believe that Spark is in the category of antisocial ZK Apps. On 22 Apr 2015, at 11:05, Sean Owen so...@cloudera.com wrote: Not that i've tried it, but, why couldn't you use one ZK server? I don't see a reason. On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: It isn't mentioned anywhere in the doc, but you will probably need separate ZK for each of your HA cluster. Thanks Best Regards On Wed, Apr 22, 2015 at 12:02 AM, Michal Klos michal.klo...@gmail.com wrote: Hi, I'm trying to set up multiple spark clusters with high availability and I was wondering if I can re-use a single ZK cluster to manage them? It's not very clear in the docs and it seems like the answer may be that I need a separate ZK cluster for each spark cluster? thanks, M - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Scheduling across applications - Need suggestion
I believe we can use the properties like --executor-memory --total-executor-cores to configure the resources allocated for each application. But, in a multi user environment, shells and applications are being submitted by multiple users at the same time. All users are requesting resources with different properties. At times, some users are not getting resources of the cluster. How to control resource usage in this case? Please share any best practices followed. As per my understanding, Fair scheduler can used for scheduling tasks within an application but not across multiple applications. Is this correct? Regards, Arun
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
Anyone? On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi Olivier, *the update function is as below*: *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {* * val previousCount = state.getOrElse((0L, 0L))._2* * var startValue: IConcurrentUsers = ConcurrentViewers(0)* * var currentCount = 0L* * val lastIndexOfConcurrentUsers =* *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* * val subList = values.slice(0, lastIndexOfConcurrentUsers)* * val currentCountFromSubList = subList.foldLeft(startValue)(_ op _).count + previousCount* * val lastConcurrentViewersCount = values(lastIndexOfConcurrentUsers).count* * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList) = 1) {* *logger.error(* * sCount using state updation $currentCountFromSubList, +* *sConcurrentUsers count $lastConcurrentViewersCount +* *s resetting to $lastConcurrentViewersCount* *)* *currentCount = lastConcurrentViewersCount* * }* * val remainingValuesList = values.diff(subList)* * startValue = ConcurrentViewers(currentCount)* * currentCount = remainingValuesList.foldLeft(startValue)(_ op _).count* * if (currentCount 0) {* *logger.error(* * sERROR: Got new count $currentCount 0, value:$values, state:$state, resetting to 0* *)* *currentCount = 0* * }* * // to stop pushing subsequent 0 after receiving first 0* * if (currentCount == 0 previousCount == 0) None* * else Some(previousCount, currentCount)* *}* *trait IConcurrentUsers {* * val count: Long* * def op(a: IConcurrentUsers): IConcurrentUsers = IConcurrentUsers.op(this, a)* *}* *object IConcurrentUsers {* * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = (a, b) match {* *case (_, _: ConcurrentViewers) = * * ConcurrentViewers(b.count)* *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = * * ConcurrentViewers(a.count + b.count)* *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = * * ConcurrentViewers(a.count - b.count)* * }* *}* *case class IncrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class DecrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* *also the error stack trace copied from executor logs is:* *java.lang.OutOfMemoryError: Java heap space* *at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* *at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* *at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* *at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* *at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* *at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* *at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* *at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* *at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* *at
Re: Multiple HA spark clusters managed by 1 ZK cluster?
Not that i've tried it, but, why couldn't you use one ZK server? I don't see a reason. On Wed, Apr 22, 2015 at 7:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: It isn't mentioned anywhere in the doc, but you will probably need separate ZK for each of your HA cluster. Thanks Best Regards On Wed, Apr 22, 2015 at 12:02 AM, Michal Klos michal.klo...@gmail.com wrote: Hi, I'm trying to set up multiple spark clusters with high availability and I was wondering if I can re-use a single ZK cluster to manage them? It's not very clear in the docs and it seems like the answer may be that I need a separate ZK cluster for each spark cluster? thanks, M - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to write Hive's map(key, value, ...) in Spark SQL DSL
Hi, I want to write this in Spark SQL DSL: select map('c1', c1, 'c2', c2) as m from table Is there a way? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Error in creating spark RDD
Hi, I am creating a spark RDD through accumulo writing like: JavaPairRDDKey, Value accumuloRDD = sc.newAPIHadoopRDD(accumuloJob.getConfiguration(),AccumuloInputFormat.class,Key.class, Value.class); But I am getting the following error and it is not getting compiled: Bound mismatch: The generic method newAPIHadoopRDD(Configuration, ClassF, ClassK, ClassV) of type JavaSparkContext is not applicable for the arguments (Configuration, ClassAccumuloInputFormat, ClassKey, ClassValue). The inferred type AccumuloInputFormat is not a valid substitute for the bounded parameter F extends InputFormatK,V I am using the following import statements: import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; I am not getting what is the problem in this. Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle question
Anyone ? On Tue, Apr 21, 2015 at 3:38 PM Marius Danciu marius.dan...@gmail.com wrote: Hello anyone, I have a question regarding the sort shuffle. Roughly I'm doing something like: rdd.mapPartitionsToPair(f1).groupByKey().mapPartitionsToPair(f2) The problem is that in f2 I don't see the keys being sorted. The keys are Java Comparable not scala.math.Ordered or scala.math.Ordering (it would be weird for each key to implement Ordering as mentioned in the JIRA item https://issues.apache.org/jira/browse/SPARK-2045) Questions: 1. Do I need to explicitly sortByKey ? (if I do this I can see the keys correctly sorted in f2) ... but I'm worried about the extra costs since Spark 1.3.0 is supposed to use the SORT shuffle manager by default, right ? 2. Do I need each key to be an scala.math.Ordered ? ... is Java Comparable used at all ? ... btw I'm using Spark from Java ... don't ask me why :) Best, Marius