kafka + Spark Streaming with checkPointing fails to restart
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution): Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266 ) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28 4) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt ream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca la:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251 ) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala: 116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s cala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s cala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca la:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca la:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala :512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s cala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1 5) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableF orwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit $$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10), StorageLevel.MEMORY_AND_DISK_2) val eventStream = rawlogDStream .map({ case (key, rawlogVal) = val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get(id).asInstanceOf[String]) .setAccount(record.get(account).asInstanceOf[String]) .setEvent(record.get(event).asInstanceOf[String]) .setTimestamp(record.get(timestamp).asInstanceOf[Long]) .setUserAgent(record.get(user_agent).asInstanceOf[String]) .setParams(record.get(params).asInstanceOf[java.util.Map[String, String]]) .build()
kafka + Spark Streaming with checkPointing fails to restart
Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution): Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266 ) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28 4) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt ream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca la:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251 ) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala: 116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s cala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s cala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca la:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca la:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala :512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s cala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1 5) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableF orwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit $$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10), StorageLevel.MEMORY_AND_DISK_2) val eventStream = rawlogDStream .map({ case (key, rawlogVal) = val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get(id).asInstanceOf[String]) .setAccount(record.get(account).asInstanceOf[String]) .setEvent(record.get(event).asInstanceOf[String]) .setTimestamp(record.get(timestamp).asInstanceOf[Long]) .setUserAgent(record.get(user_agent).asInstanceOf[String]) .setParams(record.get(params).asInstanceOf[java.util.Map[String, String]]) .build() val norm = Normalizer(rlog) (key,
Backward compatibility with org.apache.spark.sql.api.java.Row class
Hello everyone I'm adopting the latest version of Apache Spark on my project, moving from *1.2.x* to *1.3.x*, and the only significative incompatibility for now is related to the *Row *class. Any idea about what did happen to* org.apache.spark.sql.api.java.Row* class in Apache Spark 1.3 ? Migration guide on Spark SQL and DataFrames - Spark 1.3.0 Documentation does not mention anything about it. https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13 Looking around there is a new *Row *Interface on *org.apache.spark.sql package,* but I'm not 100% sure if this is related to my question and about how to proceed with the upgrading, Note that this new interface *Row* was not available in the previous Spark's versions *1.0.0 1.1.0 1.2.0* and even *1.3.0* Thanks in ahead Emerson
Increase maximum amount of columns for covariance matrix for principal components
Hello, in order to compute a huge dataset, the amount of columns to calculate the covariance matrix is limited: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L129 What is the reason behind this limitation and can it be extended? Greetings Sebastian
Re: value toDF is not a member of RDD object
No, creating DF using createDataFrame won’t work: val peopleDF = sqlContext.createDataFrame(people) the code can be compiled but raised the same error as toDF at the line above. On Wed, May 13, 2015 at 6:22 PM Sebastian Alfers [sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com) http://mailto:[sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com) wrote: I use: val conf = new SparkConf()... val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rdd: RDD[...] = ... val schema: StructType = ... sqlContext.createDataFrame(rdd, schema) 2015-05-13 12:00 GMT+02:00 SLiZn Liu sliznmail...@gmail.com: Additionally, after I successfully packaged the code, and submitted via spark-submit webcat_2.11-1.0.jar, the following error was thrown at the line where toDF() been called: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebcatApp$.main(webcat.scala:49) at WebcatApp.main(webcat.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Unsurprisingly, if I remove toDF, no error occurred. I have moved the case class definition outside of main but inside the outer object scope, and removed the provided specification in build.sbt. However, when I tried *Dean Wampler*‘s suggestion of using sc.createDataFrame() the compiler says this function is not a member of sc, and I cannot find any reference in the latest documents. What else should I try? REGARDS, Todd Leo On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote: Thanks folks, really appreciate all your replies! I tried each of your suggestions and in particular, *Animesh*‘s second suggestion of *making case class definition global* helped me getting off the trap. Plus, I should have paste my entire code with this mail to help the diagnose. REGARDS, Todd Leo On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com wrote: It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested by
Fwd: value toDF is not a member of RDD object
Are you sure that you are submitting it correctly? Can you post the entire command you are using to run the .jar file via spark-submit? Ok, here it is: /opt/spark-1.3.1-bin-hadoop2.6/bin/spark-submit target/scala-2.11/webcat_2.11-1.0.jar However, on the server somehow I have to specify main class using spark-submit --class WebcatApp --verbose webcat_2.11-1.0.jar to make spark recognize the main class. The error and stack trace remains the same. -- Forwarded message - From: Animesh Baranawal animeshbarana...@gmail.com Date: Wed, May 13, 2015 at 6:49 PM Subject: Re: value toDF is not a member of RDD object To: SLiZn Liu sliznmail...@gmail.com Are you sure that you are submitting it correctly? Can you post the entire command you are using to run the .jar file via spark-submit? On Wed, May 13, 2015 at 4:07 PM, SLiZn Liu sliznmail...@gmail.com wrote: No, creating DF using createDataFrame won’t work: val peopleDF = sqlContext.createDataFrame(people) the code can be compiled but raised the same error as toDF at the line above. On Wed, May 13, 2015 at 6:22 PM Sebastian Alfers [sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com) http://mailto:%5bsebastian.alf...@googlemail.com%5D(mailto:sebastian.alf...@googlemail.com) wrote: I use: val conf = new SparkConf()... val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rdd: RDD[...] = ... val schema: StructType = ... sqlContext.createDataFrame(rdd, schema) 2015-05-13 12:00 GMT+02:00 SLiZn Liu sliznmail...@gmail.com: Additionally, after I successfully packaged the code, and submitted via spark-submit webcat_2.11-1.0.jar, the following error was thrown at the line where toDF() been called: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebcatApp$.main(webcat.scala:49) at WebcatApp.main(webcat.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Unsurprisingly, if I remove toDF, no error occurred. I have moved the case class definition outside of main but inside the outer object scope, and removed the provided specification in build.sbt. However, when I tried *Dean Wampler*‘s suggestion of using sc.createDataFrame() the compiler says this function is not a member of sc, and I cannot find any reference in the latest documents. What else should I try? REGARDS, Todd Leo On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote: Thanks folks, really appreciate all your replies! I tried each of your suggestions and in particular, *Animesh*‘s second suggestion of *making case class definition global* helped me getting off the trap. Plus, I should have paste my entire code with this mail to help the diagnose. REGARDS, Todd Leo On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com wrote: It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide
Re: value toDF is not a member of RDD object
Additionally, after I successfully packaged the code, and submitted via spark-submit webcat_2.11-1.0.jar, the following error was thrown at the line where toDF() been called: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebcatApp$.main(webcat.scala:49) at WebcatApp.main(webcat.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Unsurprisingly, if I remove toDF, no error occurred. I have moved the case class definition outside of main but inside the outer object scope, and removed the provided specification in build.sbt. However, when I tried *Dean Wampler*‘s suggestion of using sc.createDataFrame() the compiler says this function is not a member of sc, and I cannot find any reference in the latest documents. What else should I try? REGARDS, Todd Leo On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote: Thanks folks, really appreciate all your replies! I tried each of your suggestions and in particular, *Animesh*‘s second suggestion of *making case class definition global* helped me getting off the trap. Plus, I should have paste my entire code with this mail to help the diagnose. REGARDS, Todd Leo On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com wrote: It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested by Xiangrui Meng in that post and got the same error. The Spark version is specified in build.sbt file as follows: scalaVersion := 2.11.6 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % provided libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1 Anyone have ideas the cause of this error? REGARDS, Todd Leo
Kafka Direct Approach + Zookeeper
From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html I'm trying to use the direct approach to read messages form Kafka. Kafka is running as a cluster and configured with Zookeeper. On the above page it mentions: In the Kafka parameters, you must specify either *metadata.broker.list* or *bootstrap.servers*. ... Can someone please explain the difference of between the two config parameters? And which one is more relevant in my case? Regards jk
[Spark SQL 1.3.1] data frame saveAsTable returns exception
Hi , I am using Spark SQL 1.3.1. I have created a dataFrame using jdbc data source and am using saveAsTable() method but got the following 2 exceptions: java.lang.RuntimeException: Unsupported datatype DecimalType() at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269) at org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:391) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:218) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1121) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1071) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1037) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1015) java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
Re: how to monitor multi directories in spark streaming task
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, You could retroactively union an existing DStream with one from a newly created file. Then when another file is detected, you would need to re-union the stream an create another DStream. It seems like the implementation of FileInputDStream only looks for files in the directory and the filtering is applied using FileSystem.listStatus(dir, filter) method which does not provide recursive listing. A cleaner solution would be to extend FileInputDStream and override the findNewFiles(...) with the ability to recursively list files (probably by using FileSystem.listFiles. Refer: http://stackoverflow.com/a/25645225/113411 - -- Ankur On 13/05/2015 02:03, lisendong wrote: but in fact the directories are not ready at the beginning to my task . for example: /user/root/2015/05/11/data.txt /user/root/2015/05/12/data.txt /user/root/2015/05/13/data.txt like this. and one new directory one day. how to create the new DStream for tomorrow’s new directory(/user/root/2015/05/13/) ?? 在 2015年5月13日,下午4:59,Ankur Chauhan achau...@brightcove.com 写道: I would suggest creating one DStream per directory and then using StreamingContext#union(...) to get a union DStream. -- Ankur On 13/05/2015 00:53, hotdog wrote: I want to use use fileStream in spark streaming to monitor multi hdfs directories, such as: val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](/user/root/*/*, check_valid_file(_), false).map(_._2.toString).print Buy the way, i could not under the meaning of the three class : LongWritable, Text, TextInputFormat but it doesn't work... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-monitor- mul ti-directories-in-spark-streaming-task-tp22863.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --- - -- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org [attachment] 0x6D461C4A.asc download: http://u.163.com/t0/fqZhSPbA preview: http://u.163.com/t0/2LRiaRy 0x6D461C4A.asc.sig download: http://u.163.com/t0/Ij1N9 0x6D461C4A.asc0x6D461C4A.asc.sig -BEGIN PGP SIGNATURE- iQEbBAEBAgAGBQJVUxl2AAoJEOSJAMhvLp3L4dsH+KxSz/YF7UUiwZDiP36umD1X 3LVU2Io3CGVRDI4OEYs1mvSE2DqMx820DHApl0VxxkYdLmAPUtaAc1zAtWOPgiqQ GuL0jfdwkVGOBsbF6cycJe6XWMbJUyty0tU1IsvS23OvuhKD2ulgBJieyY/quvSs dIdFDu4bNhVhuz1KN+Vm44cdfZ/rHchOoaOnSej5zOglSerr/hTFyGZUdalAYMxq t2P2M2mkHrlqHqqt4EMtEOyi6iDvVPaiaJB8NQ6xbBDs9fSmv3noB5fl19hPc9gk 8G4JbzZkD01Nh2ZRZgH1voE7NPI4P/Z6UTSJBR9qdIgtinoP5JLSBNpRew4WuA== =7vh9 -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to monitor multi directories in spark streaming task
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 I would suggest creating one DStream per directory and then using StreamingContext#union(...) to get a union DStream. - -- Ankur On 13/05/2015 00:53, hotdog wrote: I want to use use fileStream in spark streaming to monitor multi hdfs directories, such as: val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](/user/root/*/*, check_valid_file(_), false).map(_._2.toString).print Buy the way, i could not under the meaning of the three class : LongWritable, Text, TextInputFormat but it doesn't work... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-monitor-mul ti-directories-in-spark-streaming-task-tp22863.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVUxJ1AAoJEOSJAMhvLp3L2f4IAKK+ouQ2VD7H6s/5w/YGbt2P uBGJPQ92Hb5REq3f4gK4YecygtAlSAwsqXGCoAaaoPAC7vUMs9RM+slqse1gmUPU pbORTIB9dv3iVxjPtZ6R8EX14BAlxcIOR6ni2RBHuQTL+dgIEUekmCg0IhFa5lVF Kt5in8rY5PSnX5l/dX9Yu8LI3uC4TLQ+eJXjjOGXoCHys+SaZWJckA3gVcF9GQdB dwdhv4UCIYVFj3QIVlLf0+B8FgA0DnRfBC+5ZfS88gcWMc4065sDdx5LkySy4oZB tB8IpC4yaY3Mqiu8jdvhcw+SevlYan5YkkkutSvKH7nL/0d1WIkEkHxPBjRqAmY= =U0oQ -END PGP SIGNATURE- 0x6D461C4A.asc Description: application/pgp-keys 0x6D461C4A.asc.sig Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
kafka + Spark Streaming with checkPointing fails to restart
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution): Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266 ) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28 4) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt ream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca la:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251 ) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala: 116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s cala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s cala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca la:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca la:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala :512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s cala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1 5) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableF orwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit $$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10), StorageLevel.MEMORY_AND_DISK_2) val eventStream = rawlogDStream .map({ case (key, rawlogVal) = val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get(id).asInstanceOf[String]) .setAccount(record.get(account).asInstanceOf[String]) .setEvent(record.get(event).asInstanceOf[String]) .setTimestamp(record.get(timestamp).asInstanceOf[Long]) .setUserAgent(record.get(user_agent).asInstanceOf[String]) .setParams(record.get(params).asInstanceOf[java.util.Map[String, String]]) .build()
Re: How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)
Earthson wrote Finally, I've found two ways: 1. search the output with something like Submitted application application_1416319392519_0115 2. use specific AppName. We could query the ApplicationID(yarn) Hi Eathson, Can you explain more about case 2? How can we query the ApplicationID by the specific AppName? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462p22865.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted
Hi, I was running our graphx application(worked finely on Spark 1.2.0) but failed on Spark 1.3.1 with below exception. Anyone has idea on this issue? I guess it was caused by using LZ4 codec? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 7.6 failed 128 times, most recent failure: Lost task 54.127 in stage 7.6 (TID 5311, small15-tap1.common.lip6.fr): com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) at com.esotericsoftware.kryo.io.Input.require(Input.java:155) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:300) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:297) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:152) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:116) at com.esotericsoftware.kryo.io.Input.fill(Input.java:140) ... 35 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Best, Yifan LI
Re: Building Spark
Hi Akhil, Building with sbt tends to need around 3.5GB whereas maven requirements are much lower , around 1.7GB. So try using maven . For reference I have the following settings and both do compile. sbt would not work with lower values. $echo $SBT_OPTS -Xmx3012m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m $echo $MAVEN_OPTS -Xmx1280m -XX:MaxPermSize=384m 2015-05-13 5:57 GMT-07:00 Heisenberg Bb hbbalg...@gmail.com: I tried to build Spark in my local machine Ubuntu 14.04 ( 4 GB Ram), my system is getting hanged (freezed). When I monitered system processes, the build process is found to consume 85% of my memory. Why does it need lot of resources. Is there any efficient method to build Spark. Thanks Akhil
applications are still in progress?
Hi, I have some applications finished(but actually failed before), that in WebUI show Application myApp is still in progress. and, in the eventlog folder, there are several log files like this: app-20150512***.inprogress So, I am wondering what the “inprogress” means… Thanks! :) Best, Yifan LI
Removing FINISHED applications and shuffle data
Hi, Please help me with below two issues: *Environment:* I am running my spark cluster in stand alone mode. I am initializing the spark context from inside my tomcat server. I am setting below properties in environment.sh in $SPARK_HOME/conf directory SPARK_MASTER_OPTS=-Dspark.deploy.retainedApplications=1 -Dspark.deploy.retainedDrivers=1 SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=600 -Dspark.worker.cleanup.appDataTtl=600 SPARK_LOCAL_DIRS=$user.home/tmp *Issue 1:* Still in my $SPARK_HOME/work folder, application-folders continue to grow as and when I restart the tomcat. I also tried to stop the spark context (sc.stop()) in tomcat’s contextDestroyed listener but still I am not able to remove the undesired application folders. *Issue 2:* The ‘tmp’ folder is getting filled up with shuffle data and eating my entire hard disk. Is there any setting to remove shuffle data of ‘FINISHED’ applications. Thanks in advance. Sayantini
Spark and Flink
hi, i use spark and flink in the same maven project, now i get a exception on working with spark, flink work well the problem are transitiv dependencies. maybe somebody know a solution, or versions, which work together. best regards paul ps: a cloudera maven repo flink would be desirable my pom: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdmgm.tp.bigdata/groupId artifactIdtempGeoKmeans/artifactId version0.0.1-SNAPSHOT/version packagingjar/packaging nametempGeoKmeans/name urlhttp://maven.apache.org/url properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties repositories repository idcloudera/id url https://repository.cloudera.com/artifactory/cloudera-repos//url /repository /repositories dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0-cdh5.2.5/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-mapreduce-client-common/artifactId version2.5.0-cdh5.2.5/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId version2.5.0-cdh5.2.5/version /dependency dependency groupIdorg.apache.mahout/groupId artifactIdmahout-core/artifactId version0.9-cdh5.2.5/version /dependency dependency groupIdjunit/groupId artifactIdjunit/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-core/artifactId version0.8.1/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-java/artifactId version0.8.1/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients/artifactId version0.8.1/version /dependency /dependencies /project my exception: 5/05/13 15:00:48 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.servlet.DefaultServlet-461261579: java.lang.NoSuchMethodError: org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V java.lang.NoSuchMethodError: org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V at org.eclipse.jetty.servlet.NIOResourceCache.init(NIOResourceCache.java:41) at org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223) at javax.servlet.GenericServlet.init(GenericServlet.java:244) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442) at org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721) at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279) at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717) at org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.handler.HandlerCollection.doStart(HandlerCollection.java:229) at org.eclipse.jetty.server.handler.ContextHandlerCollection.doStart(ContextHandlerCollection.java:172) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:95) at org.eclipse.jetty.server.Server.doStart(Server.java:282) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:209) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.init(SparkContext.scala:224) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at mgm.tp.bigdata.tempGeoKmeans.Spark.SparkMain.main(SparkMain.java:37) 15/05/13 15:00:48 WARN component.AbstractLifeCycle: FAILED
Re: Kafka Direct Approach + Zookeeper
Many thanks Cody and contributors for the help. On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger c...@koeninger.org wrote: Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka project chose to use 2 different configuration keys. On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com wrote: From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html I'm trying to use the direct approach to read messages form Kafka. Kafka is running as a cluster and configured with Zookeeper. On the above page it mentions: In the Kafka parameters, you must specify either *metadata.broker.list* or *bootstrap.servers*. ... Can someone please explain the difference of between the two config parameters? And which one is more relevant in my case? Regards jk
JavaPairRDD
Hi, I want to get *JavaPairRDDString, String *from the tuple part of *JavaPairRDDString, Tuple2String, String .* As an example: ( http://www.koctas.com.tr/reyon/el-aletleri/7,(0,1,0,0,0,0,0,0,46551)) in my *JavaPairRDDString, Tuple2String, String *and I want to get *( (46551), (0,1,0,0,0,0,0,0) )* I try to split tuple._2() and create new JavaPairRDD but I can't. How can I get that ? Have a nice day yasemin -- hiç ender hiç
Building Spark
I tried to build Spark in my local machine Ubuntu 14.04 ( 4 GB Ram), my system is getting hanged (freezed). When I monitered system processes, the build process is found to consume 85% of my memory. Why does it need lot of resources. Is there any efficient method to build Spark. Thanks Akhil
Re: Spark and Flink
You can run the following command: mvn dependency:tree And see what jetty versions are brought in. Cheers On May 13, 2015, at 6:07 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, i use spark and flink in the same maven project, now i get a exception on working with spark, flink work well the problem are transitiv dependencies. maybe somebody know a solution, or versions, which work together. best regards paul ps: a cloudera maven repo flink would be desirable my pom: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdmgm.tp.bigdata/groupId artifactIdtempGeoKmeans/artifactId version0.0.1-SNAPSHOT/version packagingjar/packaging nametempGeoKmeans/name urlhttp://maven.apache.org/url properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties repositories repository idcloudera/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url /repository /repositories dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0-cdh5.2.5/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-mapreduce-client-common/artifactId version2.5.0-cdh5.2.5/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId version2.5.0-cdh5.2.5/version /dependency dependency groupIdorg.apache.mahout/groupId artifactIdmahout-core/artifactId version0.9-cdh5.2.5/version /dependency dependency groupIdjunit/groupId artifactIdjunit/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-core/artifactId version0.8.1/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-java/artifactId version0.8.1/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients/artifactId version0.8.1/version /dependency /dependencies /project my exception: 5/05/13 15:00:48 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.servlet.DefaultServlet-461261579: java.lang.NoSuchMethodError: org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V java.lang.NoSuchMethodError: org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V at org.eclipse.jetty.servlet.NIOResourceCache.init(NIOResourceCache.java:41) at org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223) at javax.servlet.GenericServlet.init(GenericServlet.java:244) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442) at org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721) at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279) at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717) at org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.handler.HandlerCollection.doStart(HandlerCollection.java:229) at org.eclipse.jetty.server.handler.ContextHandlerCollection.doStart(ContextHandlerCollection.java:172) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:95) at org.eclipse.jetty.server.Server.doStart(Server.java:282) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:209)
Kafka + Direct + Zookeeper
I'm trying Kafka Direct approach (for consume) but when I use only this config: kafkaParams.put(group.id, groupdid); kafkaParams.put(zookeeper.connect, zookeeperHostAndPort + /cb_kafka); I get this Exception in thread main org.apache.spark.SparkException: Must specify metadata.broker.list or bootstrap.servers Zookeeper should have enough information to provide connection details? or am I missing something?
Re: Building Spark
My 2 cents: If you have Java 8, you don't need any extra settings for Maven. -- Emre Sevinç On Wed, May 13, 2015 at 3:02 PM, Stephen Boesch java...@gmail.com wrote: Hi Akhil, Building with sbt tends to need around 3.5GB whereas maven requirements are much lower , around 1.7GB. So try using maven . For reference I have the following settings and both do compile. sbt would not work with lower values. $echo $SBT_OPTS -Xmx3012m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m $echo $MAVEN_OPTS -Xmx1280m -XX:MaxPermSize=384m 2015-05-13 5:57 GMT-07:00 Heisenberg Bb hbbalg...@gmail.com: I tried to build Spark in my local machine Ubuntu 14.04 ( 4 GB Ram), my system is getting hanged (freezed). When I monitered system processes, the build process is found to consume 85% of my memory. Why does it need lot of resources. Is there any efficient method to build Spark. Thanks Akhil -- Emre Sevinc
Re: Kafka Direct Approach + Zookeeper
Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka project chose to use 2 different configuration keys. On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com wrote: From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html I'm trying to use the direct approach to read messages form Kafka. Kafka is running as a cluster and configured with Zookeeper. On the above page it mentions: In the Kafka parameters, you must specify either *metadata.broker.list* or *bootstrap.servers*. ... Can someone please explain the difference of between the two config parameters? And which one is more relevant in my case? Regards jk
how to read lz4 compressed data using fileStream of spark streaming?
in spark streaming, I want to use fileStream to monitor a directory. But the files in that directory are compressed using lz4. So the new lz4 files are not detected by the following code. How to detect these new files? val list_join_action_stream = ssc.fileStream[LongWritable, Text, TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading Real Time Data only from Kafka
As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this
Worker Spark Port
I understated that this port value is randomly selected. Is there a way to enforce which spark port a Worker should use?
Spark Sorted DataFrame Repartitioning
Hi guys, If I load a dataframe via a sql context that has a SORT BY in the query and I want to repartition the data frame will it keep the sort order in each partition? I want to repartition because I'm going to run a Map that generates lots of data internally so to avoid Out Of Memory errors I need to create smaller partitions. The source of the table is a parquet file. sqlContext.sql(select * from tblx sort by colA) .repartition(defaultParallelism * 40) .map { //..make all the rows } Cheers, ~N
Re: Kafka Direct Approach + Zookeeper
In my mind, this isn't really a producer vs consumer distinction, this is a broker vs zookeeper distinction. The producer apis talk to brokers. The low level consumer api (what direct stream uses) also talks to brokers. The high level consumer api talks to zookeeper, at least initially. TLDR; don't worry about it, just specify either of metadata.broker.list or bootstrap.servers, using the exact same host:port,host:port format, and you're good to go. On Wed, May 13, 2015 at 9:03 AM, James King jakwebin...@gmail.com wrote: Looking at Consumer Configs in http://kafka.apache.org/documentation.html#consumerconfigs The properties *metadata.broker.list* or *bootstrap.servers *are not mentioned. Should I need these for consume side? On Wed, May 13, 2015 at 3:52 PM, James King jakwebin...@gmail.com wrote: Many thanks Cody and contributors for the help. On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger c...@koeninger.org wrote: Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka project chose to use 2 different configuration keys. On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com wrote: From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html I'm trying to use the direct approach to read messages form Kafka. Kafka is running as a cluster and configured with Zookeeper. On the above page it mentions: In the Kafka parameters, you must specify either *metadata.broker.list* or *bootstrap.servers*. ... Can someone please explain the difference of between the two config parameters? And which one is more relevant in my case? Regards jk
Re: value toDF is not a member of RDD object
I believe what Dean Wampler was suggesting is to use the sqlContext not the sparkContext (sc), which is where the createDataFrame function resides: https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.SQLContext HTH. -Todd On Wed, May 13, 2015 at 6:00 AM, SLiZn Liu sliznmail...@gmail.com wrote: Additionally, after I successfully packaged the code, and submitted via spark-submit webcat_2.11-1.0.jar, the following error was thrown at the line where toDF() been called: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebcatApp$.main(webcat.scala:49) at WebcatApp.main(webcat.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Unsurprisingly, if I remove toDF, no error occurred. I have moved the case class definition outside of main but inside the outer object scope, and removed the provided specification in build.sbt. However, when I tried *Dean Wampler*‘s suggestion of using sc.createDataFrame() the compiler says this function is not a member of sc, and I cannot find any reference in the latest documents. What else should I try? REGARDS, Todd Leo On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote: Thanks folks, really appreciate all your replies! I tried each of your suggestions and in particular, *Animesh*‘s second suggestion of *making case class definition global* helped me getting off the trap. Plus, I should have paste my entire code with this mail to help the diagnose. REGARDS, Todd Leo On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com wrote: It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested by Xiangrui Meng in that post and got the same error. The Spark version is specified in build.sbt file as follows: scalaVersion := 2.11.6 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % provided libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1 Anyone have ideas the cause of this error? REGARDS, Todd Leo
Re: Kafka Direct Approach + Zookeeper
Many thanks Cody! On Wed, May 13, 2015 at 4:22 PM, Cody Koeninger c...@koeninger.org wrote: In my mind, this isn't really a producer vs consumer distinction, this is a broker vs zookeeper distinction. The producer apis talk to brokers. The low level consumer api (what direct stream uses) also talks to brokers. The high level consumer api talks to zookeeper, at least initially. TLDR; don't worry about it, just specify either of metadata.broker.list or bootstrap.servers, using the exact same host:port,host:port format, and you're good to go. On Wed, May 13, 2015 at 9:03 AM, James King jakwebin...@gmail.com wrote: Looking at Consumer Configs in http://kafka.apache.org/documentation.html#consumerconfigs The properties *metadata.broker.list* or *bootstrap.servers *are not mentioned. Should I need these for consume side? On Wed, May 13, 2015 at 3:52 PM, James King jakwebin...@gmail.com wrote: Many thanks Cody and contributors for the help. On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger c...@koeninger.org wrote: Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka project chose to use 2 different configuration keys. On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com wrote: From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html I'm trying to use the direct approach to read messages form Kafka. Kafka is running as a cluster and configured with Zookeeper. On the above page it mentions: In the Kafka parameters, you must specify either *metadata.broker.list* or *bootstrap.servers*. ... Can someone please explain the difference of between the two config parameters? And which one is more relevant in my case? Regards jk
Re: kafka + Spark Streaming with checkPointing fails to restart
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing shows setting up your stream and calling .checkpoint(checkpointDir) inside the functionToCreateContext. It looks to me like you're setting up your stream and calling checkpoint outside, after getOrCreate. I'm not sure that's the issue (someone who knows checkpoints better than I do should chime in), but that's the first thing I noticed. On Wed, May 13, 2015 at 4:06 AM, ankurcha achau...@brightcove.com wrote: Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution): Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266 ) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28 4) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt ream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca la:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251 ) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala: 116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s cala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s cala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca la:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca la:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala :512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s cala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1 5) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableF orwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit $$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10), StorageLevel.MEMORY_AND_DISK_2) val
Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception
Hi, I am using spark-shell and the steps using which I can reproduce the issue are as follows: scala val dateDimDF= sqlContext.load(jdbc,Map(url-jdbc:teradata://192.168.145.58/DBS_PORT=1025,DATABASE=BENCHQADS,LOB_SUPPORT=OFF,USER= BENCHQADS,PASSWORD=abc,dbtable - date_dim)) scala dateDimDF.printSchema() root |-- d_date_sk: integer (nullable = false) |-- d_date_id: string (nullable = false) |-- d_date: date (nullable = true) |-- d_month_seq: integer (nullable = true) |-- d_week_seq: integer (nullable = true) |-- d_quarter_seq: integer (nullable = true) |-- d_year: integer (nullable = true) |-- d_dow: integer (nullable = true) |-- d_moy: integer (nullable = true) |-- d_dom: integer (nullable = true) |-- d_qoy: integer (nullable = true) |-- d_fy_year: integer (nullable = true) |-- d_fy_quarter_seq: integer (nullable = true) |-- d_fy_week_seq: integer (nullable = true) |-- d_day_name: string (nullable = true) |-- d_quarter_name: string (nullable = true) |-- d_holiday: string (nullable = true) |-- d_weekend: string (nullable = true) |-- d_following_holiday: string (nullable = true) |-- d_first_dom: integer (nullable = true) |-- d_last_dom: integer (nullable = true) |-- d_same_day_ly: integer (nullable = true) |-- d_same_day_lq: integer (nullable = true) |-- d_current_day: string (nullable = true) |-- d_current_week: string (nullable = true) |-- d_current_month: string (nullable = true) |-- d_current_quarter: string (nullable = true) |-- d_current_year: string (nullable = true) scala dateDimDF.saveAsTable(date_dim_tera_save) 15/05/13 19:57:05 INFO JDBCRDD: closed connection 15/05/13 19:57:05 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) 15/05/13 19:57:05 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) scala val
Re: Reading Real Time Data only from Kafka
Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark
Re: Kafka Direct Approach + Zookeeper
Looking at Consumer Configs in http://kafka.apache.org/documentation.html#consumerconfigs The properties *metadata.broker.list* or *bootstrap.servers *are not mentioned. Should I need these for consume side? On Wed, May 13, 2015 at 3:52 PM, James King jakwebin...@gmail.com wrote: Many thanks Cody and contributors for the help. On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger c...@koeninger.org wrote: Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka project chose to use 2 different configuration keys. On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com wrote: From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html I'm trying to use the direct approach to read messages form Kafka. Kafka is running as a cluster and configured with Zookeeper. On the above page it mentions: In the Kafka parameters, you must specify either *metadata.broker.list* or *bootstrap.servers*. ... Can someone please explain the difference of between the two config parameters? And which one is more relevant in my case? Regards jk
Re: Reading Real Time Data only from Kafka
You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single job at a time in the order they were defined, unless you turn on an experimental option for more parallelism (TD or someone more knowledgeable can chime in on this). If you're talking about the possibility of the next job starting before the prior one has fully finished, because your processing is lagging behind... I'm not 100% sure this is possible because I've never observed it. The thing is, it's a moot point, because if you're saving offsets yourself transactionally, you already need to be verifying that offsets are correct (increasing without gaps) in order to handle restarts correctly. If you're super concerned about how batches get generated, the direct api gives you access to KafkaUtils.createRDD... just schedule your own rdds in the order you want. Again, flexible. On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you
Re: Trouble trying to run ./spark-ec2 script
Is python installed on the machine where you ran ./spark-ec2 ? Cheers On Wed, May 13, 2015 at 1:33 PM, Su She suhsheka...@gmail.com wrote: I'm trying to set up my own cluster and am having trouble running this script: ./spark-ec2 --key-pair=xx --identity-file=xx.pem --region=us-west-2 --zone=us-west-2c --num-slaves=1 launch my-spark-cluster based off: https://spark.apache.org/docs/latest/ec2-scripts.html It just tries to open the spark-ec2 file instead of trying to run it (asks me if I want to open the file and what program to use to open it), I'm guessing this is something small that i'm doing wrong? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to use rdd.countApprox
I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context seems no longer valid, which crashes subsequent jobs. My spark version is 1.1.1. I will do more investigation into this issue, perhaps after upgrading to 1.3.1, and then file a JIRA if it persists. Is there a way to get stage or task id of a particular transformation or action on RDD and then selectively kill the stage or tasks? It would be necessary and useful in situations similar to countApprox. Thanks,Du On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com wrote: That is not supposed to happen :/ That is probably a bug.If you have the log4j logs, would be good to file a JIRA. This may be worth debugging. On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote: Actually I tried that before asking. However, it killed the spark context. :-) Du On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com wrote: That is a good question. I dont see a direct way to do that. You could do try the following val jobGroupId = group-id-based-on-current-timerdd.sparkContext.setJobGroup(jobGroupId)val approxCount = rdd.countApprox().getInitialValue // job launched with the set job grouprdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote: Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks,Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
Spark performance in cluster mode using yarn
Hi Friends, please someone can give the idea, Ideally what should be time(complete job execution) for spark job, I have data in a hive table, amount of data would be 1GB , 2 lacs rows for whole month, I want to do monthly aggregation, using SQL queries,groupby I have only one node,1 cluster,below configuration for running job, --num-executors 2 --driver-memory 3g --driver-java-options -XX:MaxPermSize=1G --executor-memory 2g --executor-cores 2 how much approximate time require to finish the job, or can someone suggest the best way to get quickly results, Thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-in-cluster-mode-using-yarn-tp22877.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PostgreSQL JDBC Classpath Issue
Hey all, I seem to be having an issue with PostgreSQL JDBC jar on my classpath. I’ve outlined the issue on Stack Overflow (http://stackoverflow.com/questions/30221677/spark-sql-postgresql-data-source-issues). I’m not sure how to fix this since I built the uber jar using sbt-assembly and the final jar does have org/postgresql/Driver.class. — George Adams, IV Software Craftsman Brand Networks, Inc. (585) 902-8822
Problem with current spark
Hi, I'm trying to run an application that uses a Hive context to perform some queries over JSON files. The code of the application is here: https://github.com/GiovanniPaoloGibilisco/spark-log-processor/tree/fca93d95a227172baca58d51a4d799594a0429a1 I can run it on Spark 1.3.1 after rebuilding it with hive support using: mvn -Phive -Phive-thriftserver -DskipTests clean package but when I try to run the same application on the one built fromt he current master branch (at this commit of today https://github.com/apache/spark/tree/bec938f777a2e18757c7d04504d86a5342e2b49e) again built with hive support I get an error at Stage 2 that is not submitted, and after a while the application is killed. The logs look like this: 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0 15/05/13 16:54:37 INFO DAGScheduler: Got job 2 (run at unknown:0) with 2 output partitions (allowLocal=false) 15/05/13 16:54:37 INFO DAGScheduler: Final stage: ResultStage 4(run at unknown:0) 15/05/13 16:54:37 INFO DAGScheduler: Parents of final stage: List() 15/05/13 16:54:37 INFO Exchange: Using SparkSqlSerializer2. 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0 ^C15/05/13 16:54:42 INFO SparkContext: Invoking stop() from shutdown hook 15/05/13 16:54:42 INFO SparkUI: Stopped Spark web UI at http://192.168.230.130:4040 15/05/13 16:54:42 INFO DAGScheduler: Stopping DAGScheduler 15/05/13 16:54:42 INFO SparkDeploySchedulerBackend: Shutting down all executors 15/05/13 16:54:42 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 15/05/13 16:54:52 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 15/05/13 16:54:52 ERROR TaskSchedulerImpl: Lost executor 0 on 192.168.230.130: remote Rpc client disassociated 15/05/13 16:54:53 INFO AppClient$ClientActor: Executor updated: app-20150513165402-/0 is now EXITED (Command exited with code 0) 15/05/13 16:54:53 INFO SparkDeploySchedulerBackend: Executor app-20150513165402-/0 removed: Command exited with code 0 15/05/13 16:54:53 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 15/05/13 16:56:42 WARN AkkaRpcEndpointRef: Error sending message [message = StopExecutors] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266) at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.stop(SparkDeploySchedulerBackend.scala:95) at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1404) at org.apache.spark.SparkContext.stop(SparkContext.scala:1562) at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:551) at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2252) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1764) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2204) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) Should I submit an Issue for this? What is the best way to do it? Best
Re: how to use rdd.countApprox
That is not supposed to happen :/ That is probably a bug. If you have the log4j logs, would be good to file a JIRA. This may be worth debugging. On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote: Actually I tried that before asking. However, it killed the spark context. :-) Du On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com wrote: That is a good question. I dont see a direct way to do that. You could do try the following val jobGroupId = group-id-based-on-current-time rdd.sparkContext.setJobGroup(jobGroupId) val approxCount = rdd.countApprox().getInitialValue // job launched with the set job group rdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote: Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks, Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000) val bDouble = pResult.getFinalValue() logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks, Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks, Du
Trouble trying to run ./spark-ec2 script
I'm trying to set up my own cluster and am having trouble running this script: ./spark-ec2 --key-pair=xx --identity-file=xx.pem --region=us-west-2 --zone=us-west-2c --num-slaves=1 launch my-spark-cluster based off: https://spark.apache.org/docs/latest/ec2-scripts.html It just tries to open the spark-ec2 file instead of trying to run it (asks me if I want to open the file and what program to use to open it), I'm guessing this is something small that i'm doing wrong? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Trouble trying to run ./spark-ec2 script
Hmm, just tried to run it again, but opened the script with python, the cmd line seemed to pop up really quick and exited. On Wed, May 13, 2015 at 2:06 PM, Su She suhsheka...@gmail.com wrote: Hi Ted, Yes I do have Python 3.5 installed. I just ran py from the ec2 directory and it started up the python shell. Thanks! On Wed, May 13, 2015 at 2:02 PM, Ted Yu yuzhih...@gmail.com wrote: Is python installed on the machine where you ran ./spark-ec2 ? Cheers On Wed, May 13, 2015 at 1:33 PM, Su She suhsheka...@gmail.com wrote: I'm trying to set up my own cluster and am having trouble running this script: ./spark-ec2 --key-pair=xx --identity-file=xx.pem --region=us-west-2 --zone=us-west-2c --num-slaves=1 launch my-spark-cluster based off: https://spark.apache.org/docs/latest/ec2-scripts.html It just tries to open the spark-ec2 file instead of trying to run it (asks me if I want to open the file and what program to use to open it), I'm guessing this is something small that i'm doing wrong? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to use rdd.countApprox
You might get stage information through SparkListener. But I am not sure whether you can use that information to easily kill stages. Though i highly recommend using Spark 1.3.1 (or even Spark master). Things move really fast between releases. 1.1.1 feels really old to me :P TD On Wed, May 13, 2015 at 1:25 PM, Du Li l...@yahoo-inc.com wrote: I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context seems no longer valid, which crashes subsequent jobs. My spark version is 1.1.1. I will do more investigation into this issue, perhaps after upgrading to 1.3.1, and then file a JIRA if it persists. Is there a way to get stage or task id of a particular transformation or action on RDD and then selectively kill the stage or tasks? It would be necessary and useful in situations similar to countApprox. Thanks, Du On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com wrote: That is not supposed to happen :/ That is probably a bug. If you have the log4j logs, would be good to file a JIRA. This may be worth debugging. On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote: Actually I tried that before asking. However, it killed the spark context. :-) Du On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com wrote: That is a good question. I dont see a direct way to do that. You could do try the following val jobGroupId = group-id-based-on-current-time rdd.sparkContext.setJobGroup(jobGroupId) val approxCount = rdd.countApprox().getInitialValue // job launched with the set job group rdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote: Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks, Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000) val bDouble = pResult.getFinalValue() logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks, Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks, Du
force the kafka consumer process to different machines
I 'm using streaming integrated with streaming-kafka. My kafka topic has 80 partitions, while my machines have 40 cores. I found that when the job is running, the kafka consumer processes are only deploy to 2 machines, the bandwidth of the 2 machines will be very very high. I wonder is there any way to control the kafka consumer's dispatch? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: force the kafka consumer process to different machines
With this lowlevel Kafka API https://github.com/dibbhatt/kafka-spark-consumer/, you can actually specify how many receivers that you want to spawn and most of the time it spawns evenly, usually you can put a sleep just after creating the context for the executors to connect to the driver and then spark will evenly distribute the receivers. Thanks Best Regards On Wed, May 13, 2015 at 9:03 PM, hotdog lisend...@163.com wrote: I 'm using streaming integrated with streaming-kafka. My kafka topic has 80 partitions, while my machines have 40 cores. I found that when the job is running, the kafka consumer processes are only deploy to 2 machines, the bandwidth of the 2 machines will be very very high. I wonder is there any way to control the kafka consumer's dispatch? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception
Your stack trace says it can't convert date to integer. You sure about column positions? On 13 May 2015 21:32, Ishwardeep Singh ishwardeep.si...@impetus.co.in wrote: Hi , I am using Spark SQL 1.3.1. I have created a dataFrame using jdbc data source and am using saveAsTable() method but got the following 2 exceptions: java.lang.RuntimeException: Unsupported datatype DecimalType() at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269) at org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:391) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:218) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1121) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1071) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1037) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1015) java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215) at org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) at org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at
Re: Worker Spark Port
I believe most ports are configurable at this point, look at http://spark.apache.org/docs/latest/configuration.html search for .port On Wed, May 13, 2015 at 9:38 AM, James King jakwebin...@gmail.com wrote: I understated that this port value is randomly selected. Is there a way to enforce which spark port a Worker should use?
Re: force the kafka consumer process to different machines
or you can use this Receiver as well : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Where you can specify how many Receivers you need for your topic and it will divides the partitions among the Receiver and return the joined stream for you . Say you specified 20 receivers , in that case each Receiver can handle 4 partitions and you get consumer parallelism of 20 receivers . Dibyendu On Wed, May 13, 2015 at 9:28 PM, 李森栋 lisend...@163.com wrote: thank you very much 来自 魅族 MX4 Pro 原始邮件 发件人:Cody Koeninger c...@koeninger.org 时间:周三 5月13日 23:52 收件人:hotdog lisend...@163.com 抄送:user@spark.apache.org 主题:Re: force the kafka consumer process to different machines I assume you're using the receiver based approach? Have you tried the createDirectStream api? https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html If you're sticking with the receiver based approach I think your only option would be to create more consumer streams and union them. That doesn't give you control over where they're run, but should increase the consumer parallelism. On Wed, May 13, 2015 at 10:33 AM, hotdog lisend...@163.com wrote: I 'm using streaming integrated with streaming-kafka. My kafka topic has 80 partitions, while my machines have 40 cores. I found that when the job is running, the kafka consumer processes are only deploy to 2 machines, the bandwidth of the 2 machines will be very very high. I wonder is there any way to control the kafka consumer's dispatch? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL: preferred syntax for column reference?
Is the $foo or mydf(foo) or both checked at compile time to verify that the column reference is valid? Thx. Dean On Wednesday, May 13, 2015, Michael Armbrust mich...@databricks.com wrote: I would not say that either method is preferred (neither is old/deprecated). One advantage to the second is that you are referencing a column from a specific dataframe, instead of just providing a string that will be resolved much like an identifier in a SQL query. This means given: df1 = [id: int, name: string ] df2 = [id: int, zip: int] I can do something like: df1.join(df2, df1(id) === df2(id)) Where as I would need aliases if I was only using strings: df1.as(a).join(df2.as(b), $a.id === $b.id) On Wed, May 13, 2015 at 9:55 AM, Diana Carroll dcarr...@cloudera.com javascript:_e(%7B%7D,'cvml','dcarr...@cloudera.com'); wrote: I'm just getting started with Spark SQL and DataFrames in 1.3.0. I notice that the Spark API shows a different syntax for referencing columns in a dataframe than the Spark SQL Programming Guide. For instance, the API docs for the select method show this: df.select($colA, $colB) Whereas the programming guide shows this: df.filter(df(name) 21).show() I tested and both the $column and df(column) syntax works, but I'm wondering which is *preferred*. Is one the original and one a new feature we should be using? Thanks, Diana (Spark Curriculum Developer for Cloudera) -- Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Spark recovery takes long
Hello Spark gurus, We have a spark streaming application that is consuming from a Flume stream and has some window operations. The input batch sizes are 1 minute and intermediate Window operations have window sizes of 1 minute, 1 hour and 6 hours. I enabled checkpointing and Write ahead log so that we can recover from any failures. I added explicit checkpoint Duration directives for each of the intermediate Window streams also and tried with 2 minute duration. However, I am running into a couple of issues at the moment: 1. The recovery time is long. Its close to 15 minutes after running the application for only a couple hours and then restarting it to test recovery. I tried running for shorter time i.e. about half hour and restarting and the recovery process still took 15 minutes ( I had started with a fresh checkpoint folder). 2. After recovery is completed, the input stream does seem to recover just fine and continue where it had left off at the crash, however, as the computations continue, I am getting incorrect results for every batch after recovery e.g. some double values are NaNs etc. This does not happen if I let the application just continue. Seems to me that some of the intermediate streams were not recovered properly and caused some of our computations to produce incorrect values. Any help/insights into how to go about tackling this will be appreciated. Thanks NB. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-recovery-takes-long-tp22876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-streaming whit flume error
Hi all, I want use spark-streaming with flume ,now i am in truble, I don't know how to configure the flume ,I use I configure flume like this : a1.sources = r1 a1.channels = c1 c2 a1.sources.r1.type = avro a1.sources.r1.channels = c1 c2 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4 a1.sources.r1.selector.type=replicating a1.channels.c1.type = memory a1.channels.c1.capacity = 1 a1.channels.c1.transactionCapacity = 1 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 80 a1.channels.c2.type = memory a1.channels.c2.capacity = 1 a1.channels.c2.transactionCapacity = 1 a1.channels.c2.byteCapacityBufferPercentage = 20 a1.channels.c2.byteCapacity = 80 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /user/hxf/flume a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.hdfs.rollSize=0 a1.channels = c1 but when I send data to the 4 port I get an error like this : org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 154218761 items! Connection closed. dose anybody can help me? thanks!
Re: JavaPairRDD
You could use a map() operation, but the easiest way is probably to just call values() method on the JavaPairRDDA,B to get a JavaRDDB. See this link: https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html Tristan On 13 May 2015 at 23:12, Yasemin Kaya godo...@gmail.com wrote: Hi, I want to get *JavaPairRDDString, String *from the tuple part of *JavaPairRDDString, Tuple2String, String .* As an example: ( http://www.koctas.com.tr/reyon/el-aletleri/7,(0,1,0,0,0,0,0,0,46551)) in my *JavaPairRDDString, Tuple2String, String *and I want to get *( (46551), (0,1,0,0,0,0,0,0) )* I try to split tuple._2() and create new JavaPairRDD but I can't. How can I get that ? Have a nice day yasemin -- hiç ender hiç
Re: --jars works in yarn-client but not yarn-cluster mode, why?
I look into the Environment in both modes. yarn-client: spark.jars local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar,file:/home/xxx/my-app.jar yarn-cluster: spark.yarn.secondary.jars local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar I wonder why htrace exists in spark.yarn.secondary.jars but still not found in URLClassLoader. I tried both local and file mode for the jar, still the same error. 2015-05-14 11:37 GMT+08:00 Fengyun RAO raofeng...@gmail.com: Hadoop version: CDH 5.4. We need to connect to HBase, thus need extra /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar dependency. It works in yarn-client mode: spark-submit --class xxx.xxx.MyApp --master yarn-client --num-executors 10 --executor-memory 10g --jars /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar my-app.jar /input /output However, if we change yarn-client to yarn-cluster', it throws an ClassNotFoundException (actually the class exists in htrace-core-3.1.0-incubating.jar): Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:218) at org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:481) at org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65) at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:86) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:850) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.init(ConnectionManager.java:635) ... 21 more Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) Why --jars doesn't work in yarn-cluster mode? How to add extra dependency in yarn-cluster mode?
--jars works in yarn-client but not yarn-cluster mode, why?
Hadoop version: CDH 5.4. We need to connect to HBase, thus need extra /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar dependency. It works in yarn-client mode: spark-submit --class xxx.xxx.MyApp --master yarn-client --num-executors 10 --executor-memory 10g --jars /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar my-app.jar /input /output However, if we change yarn-client to yarn-cluster', it throws an ClassNotFoundException (actually the class exists in htrace-core-3.1.0-incubating.jar): Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:218) at org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:481) at org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65) at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:86) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:850) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.init(ConnectionManager.java:635) ... 21 more Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) Why --jars doesn't work in yarn-cluster mode? How to add extra dependency in yarn-cluster mode?
Re: JavaPairRDD
Thank you Tristan. It is totally what I am looking for :) 2015-05-14 5:05 GMT+03:00 Tristan Blakers tris...@blackfrog.org: You could use a map() operation, but the easiest way is probably to just call values() method on the JavaPairRDDA,B to get a JavaRDDB. See this link: https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html Tristan On 13 May 2015 at 23:12, Yasemin Kaya godo...@gmail.com wrote: Hi, I want to get *JavaPairRDDString, String *from the tuple part of *JavaPairRDDString, Tuple2String, String .* As an example: ( http://www.koctas.com.tr/reyon/el-aletleri/7,(0,1,0,0,0,0,0,0,46551)) in my *JavaPairRDDString, Tuple2String, String *and I want to get *( (46551), (0,1,0,0,0,0,0,0) )* I try to split tuple._2() and create new JavaPairRDD but I can't. How can I get that ? Have a nice day yasemin -- hiç ender hiç -- hiç ender hiç
spark sql hive-shims
Hi, Using spark sql with HiveContext. Spark version is 1.3.1 When running local spark everything works fine. When running on spark cluster I get ClassNotFoundError org.apache.hadoop.hive.shims.Hadoop23Shims. This class belongs to hive-shims-0.23, and is a runtime dependency for spark-hive: [INFO] org.apache.spark:spark-hive_2.10:jar:1.3.1 [INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime [INFO] | | +- org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime [INFO] | | \- org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime My spark distribution is: make-distribution.sh --tgz -Phive -Phive-thriftserver -DskipTests If I try to add this dependency to my driver project, then the exception disappears, but then the task is stuck when registering an rdd as a table (I get timeout after 30 seconds). I should emphasize that the first rdd I register as a table is a very small one (about 60K row), and as I said - it runs swiftly in local. I suspect maybe other dependencies are missing, but they fail silently. Would be grateful if anyone knows how to solve it. Lior
How to run multiple jobs in one sparkcontext from separate threads in pyspark?
Hi all, Quote Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? I found some examples in scala and java, but couldn't find python code. Can anyone help me with a pyspark example? Thanks Regards, Meethu M
Spark SQL: preferred syntax for column reference?
I'm just getting started with Spark SQL and DataFrames in 1.3.0. I notice that the Spark API shows a different syntax for referencing columns in a dataframe than the Spark SQL Programming Guide. For instance, the API docs for the select method show this: df.select($colA, $colB) Whereas the programming guide shows this: df.filter(df(name) 21).show() I tested and both the $column and df(column) syntax works, but I'm wondering which is *preferred*. Is one the original and one a new feature we should be using? Thanks, Diana (Spark Curriculum Developer for Cloudera)
Word2Vec with billion-word corpora
Hi all, I'm experimenting with Spark's Word2Vec implementation for a relatively large (5B word, vocabulary size 4M) corpora. Has anybody had success running it at this scale? -Shilad -- Shilad W. Sen Associate Professor Mathematics, Statistics, and Computer Science Dept. Macalester College s...@macalester.edu http://www.shilad.com https://www.linkedin.com/in/shilad 651-696-6273
Re: Kryo serialization of classes in additional jars
I cherry-picked this commit into my local 1.2 branch. It fixed the problem with setting spark.serializer, but I get a similar problem with spark.closure.serializer org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:100) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:152) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:114) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:73) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ClassNotFoundException: com.foo.Foo at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:93) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:93) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:93) ... 21 more On Mon, May 4, 2015 at 5:43 PM, Akshat Aranya aara...@gmail.com wrote: Actually, after some digging, I did find a JIRA for it: SPARK-5470. The fix for this has gone into master, but it isn't in 1.2. On Mon, May 4, 2015 at 2:47 PM, Imran Rashid iras...@cloudera.com wrote: Oh, this seems like a real pain. You should file a jira, I didn't see an open issue -- if nothing else just to document the issue. As you've noted, the problem is that the serializer is created immediately in the executors, right when the SparkEnv is created, but the other jars aren't downloaded later. I think you could workaround with some combination of pushing the jars to the cluster manually, and then using spark.executor.extraClassPath On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya aara...@gmail.com wrote: Hi, Is it possible to register kryo serialization for classes contained in jars that are added with spark.jars? In my experiment it doesn't seem to work, likely because the class registration happens before the jar is shipped to the executor and added to the classloader. Here's the general idea of what I want to do: val sparkConf = new SparkConf(true) .set(spark.jars, foo.jar) .setAppName(foo) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register classes contained in foo.jar sparkConf.registerKryoClasses(Array( classOf[com.foo.Foo], classOf[com.foo.Bar]))
Re: Backward compatibility with org.apache.spark.sql.api.java.Row class
Sorry for missing that in the upgrade guide. As part of unifying the Java and Scala interfaces we got rid of the java specific row. You are correct in assuming that you want to use row in org.apache.spark.sql from both Scala and Java now. On Wed, May 13, 2015 at 2:48 AM, Emerson Castañeda eme...@gmail.com wrote: Hello everyone I'm adopting the latest version of Apache Spark on my project, moving from *1.2.x* to *1.3.x*, and the only significative incompatibility for now is related to the *Row *class. Any idea about what did happen to* org.apache.spark.sql.api.java.Row* class in Apache Spark 1.3 ? Migration guide on Spark SQL and DataFrames - Spark 1.3.0 Documentation does not mention anything about it. https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13 Looking around there is a new *Row *Interface on *org.apache.spark.sql package,* but I'm not 100% sure if this is related to my question and about how to proceed with the upgrading, Note that this new interface *Row* was not available in the previous Spark's versions *1.0.0 1.1.0 1.2.0* and even *1.3.0* Thanks in ahead Emerson
Re: how to use rdd.countApprox
Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
data schema and serialization format suggestions
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi all, I want to get a general idea about best practices around data format serialization. Currently, I am using avro as the data serialization format but the emitted types aren't very scala friendly. So, I was wondering how others deal with this problem. At the high level, the requirements are fairly simple: 1. Simple and easy to understand and extend. 2. Usable in places other than spark. ( I would want to use them in other applications and tools ). 3. Ability to play nice with parquet and Kafka (nice to have). - -- Ankur Chauhan -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVU4spAAoJEOSJAMhvLp3LYYQH/A3AvjBhclydO4pqxoLzUSiK e844eteYEHqLSiwrFKxG8I8uLDLRYVvln0XnmKnEqoaGwNbqC5IbqovNKE8FwFzk F7XU30O7CEgExBrSXsv7nSFq/BBSELnCGpuszf92lF2XRaFtOz0kJZ7YOf+IIZEn mV4K4IodJhkCzX3oeAO/3PwzMlUXTV+qDDA9pa6pwrQ2TKRMYs8HTzAf526cL5/F 0RdFHse1JjOWDiiaCCI1aHNbtRM/TJvsyGcLlqNDjeOVFcTFcKU3QQLxydfqNRK6 JYn9jY/NRqseePaW4L9BqEBSQ7aTNRk1P88r5+8FFes2qImkgQ5+VIw+DlwEePM= =LwBs -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Mesos
Sander, I eventually solved this problem via the --[no-]switch_user flag, which is set to true by default. I set this to false, which would have the user that owns the process run the job, otherwise it was my username (scarman) running the job, which would fail because obviously my username didn’t exist there. When ran as root, it ran totally fine with no problems what so ever. Hopefully this works for you too, Steve On May 13, 2015, at 11:45 AM, Sander van Dijk sgvand...@gmail.com wrote: Hey all, I seem to be experiencing the same thing as Stephen. I run Spark 1.2.1 with Mesos 0.22.1, with Spark coming from the spark-1.2.1-bin-hadoop2.4.tgz prebuilt package, and Mesos installed from the Mesosphere repositories. I have been running with Spark standalone successfully for a while and now trying to setup Mesos. Mesos is up and running, the UI at port 5050 reports all slaves alive. I then run Spark shell with: `spark-shell --master mesos://1.1.1.1:5050` (with 1.1.1.1 the master's ip address), which starts up fine, with output: I0513 15:02:45.340287 28804 sched.cpp:448] Framework registered with 20150512-150459-2618695596-5050-3956-0009 15/05/13 15:02:45 INFO mesos.MesosSchedulerBackend: Registered as framework ID 20150512-150459-2618695596-5050-3956-0009 and the framework shows up in the Mesos UI. Then when trying to run something (e.g. 'val rdd = sc.txtFile(path); rdd.count') fails with lost executors. In /var/log/mesos-slave.ERROR on the slave instances there are entries like: E0513 14:57:01.198995 13077 slave.cpp:3112] Container 'eaf33d36-dde5-498a-9ef1-70138810a38c' for executor '20150512-145720-2618695596-5050-3082-S10' of framework '20150512-150459-2618695596-5050-3956-0009' failed to start: Failed to execute mesos-fetcher: Failed to chown work directory From what I can find, the work directory is in /tmp/mesos, where indeed I see a directory structure with executor and framework IDs, with at the leaves stdout and stderr files of size 0. Everything there is owned by root, but I assume the processes are also run by root, so any chowning in there should be possible. I was thinking maybe it fails to fetch the Spark package executor? I uploaded spark-1.2.1-bin-hadoop2.4.tgz to hdfs, SPARK_EXECUTOR_URI is set in spark-env.sh, and in the Environment section of the web UI I see this picked up in the spark.executor.uriparameter. I checked and the URI is reachable by the slaves: an `hdfs dfs -stat $SPARK_EXECUTOR_URI` is successful. Any pointers? Many thanks, Sander On Fri, May 1, 2015 at 8:35 AM Tim Chen t...@mesosphere.io wrote: Hi Stephen, It looks like Mesos slave was most likely not able to launch some mesos helper processes (fetcher probably?). How did you install Mesos? Did you build from source yourself? Please install Mesos through a package or actually from source run make install and run from the installed binary. Tim On Mon, Apr 27, 2015 at 11:11 AM, Stephen Carman scar...@coldlight.com wrote: So I installed spark on each of the slaves 1.3.1 built with hadoop2.6 I just basically got the pre-built from the spark website… I placed those compiled spark installs on each slave at /opt/spark My spark properties seem to be getting picked up on my side fine… Screen Shot 2015-04-27 at 10.30.01 AM.png The framework is registered in Mesos, it shows up just fine, it doesn’t matter if I turn off the executor uri or not, but I always get the same error… org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 (TID 23, 10.253.1.117): ExecutorLostFailure (executor 20150424-104711-1375862026-5050-20113-S1 lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) These boxes
Re: Spark on Mesos
Hi Stephen, You probably didn't run the Spark driver/shell as root, as Mesos scheduler will pick up your local user and tries to impersonate as the same user and chown the directory before executing any task. If you try to run Spark driver as root it should resolve the problem. No switch user can also work as it won't try to switch user for you. Tim On Wed, May 13, 2015 at 10:50 AM, Stephen Carman scar...@coldlight.com wrote: Sander, I eventually solved this problem via the --[no-]switch_user flag, which is set to true by default. I set this to false, which would have the user that owns the process run the job, otherwise it was my username (scarman) running the job, which would fail because obviously my username didn’t exist there. When ran as root, it ran totally fine with no problems what so ever. Hopefully this works for you too, Steve On May 13, 2015, at 11:45 AM, Sander van Dijk sgvand...@gmail.com wrote: Hey all, I seem to be experiencing the same thing as Stephen. I run Spark 1.2.1 with Mesos 0.22.1, with Spark coming from the spark-1.2.1-bin-hadoop2.4.tgz prebuilt package, and Mesos installed from the Mesosphere repositories. I have been running with Spark standalone successfully for a while and now trying to setup Mesos. Mesos is up and running, the UI at port 5050 reports all slaves alive. I then run Spark shell with: `spark-shell --master mesos://1.1.1.1:5050` (with 1.1.1.1 the master's ip address), which starts up fine, with output: I0513 15:02:45.340287 28804 sched.cpp:448] Framework registered with 20150512-150459-2618695596-5050-3956-0009 15/05/13 15:02:45 INFO mesos.MesosSchedulerBackend: Registered as framework ID 20150512-150459-2618695596-5050-3956-0009 and the framework shows up in the Mesos UI. Then when trying to run something (e.g. 'val rdd = sc.txtFile(path); rdd.count') fails with lost executors. In /var/log/mesos-slave.ERROR on the slave instances there are entries like: E0513 14:57:01.198995 13077 slave.cpp:3112] Container 'eaf33d36-dde5-498a-9ef1-70138810a38c' for executor '20150512-145720-2618695596-5050-3082-S10' of framework '20150512-150459-2618695596-5050-3956-0009' failed to start: Failed to execute mesos-fetcher: Failed to chown work directory From what I can find, the work directory is in /tmp/mesos, where indeed I see a directory structure with executor and framework IDs, with at the leaves stdout and stderr files of size 0. Everything there is owned by root, but I assume the processes are also run by root, so any chowning in there should be possible. I was thinking maybe it fails to fetch the Spark package executor? I uploaded spark-1.2.1-bin-hadoop2.4.tgz to hdfs, SPARK_EXECUTOR_URI is set in spark-env.sh, and in the Environment section of the web UI I see this picked up in the spark.executor.uriparameter. I checked and the URI is reachable by the slaves: an `hdfs dfs -stat $SPARK_EXECUTOR_URI` is successful. Any pointers? Many thanks, Sander On Fri, May 1, 2015 at 8:35 AM Tim Chen t...@mesosphere.io wrote: Hi Stephen, It looks like Mesos slave was most likely not able to launch some mesos helper processes (fetcher probably?). How did you install Mesos? Did you build from source yourself? Please install Mesos through a package or actually from source run make install and run from the installed binary. Tim On Mon, Apr 27, 2015 at 11:11 AM, Stephen Carman scar...@coldlight.com wrote: So I installed spark on each of the slaves 1.3.1 built with hadoop2.6 I just basically got the pre-built from the spark website… I placed those compiled spark installs on each slave at /opt/spark My spark properties seem to be getting picked up on my side fine… Screen Shot 2015-04-27 at 10.30.01 AM.png The framework is registered in Mesos, it shows up just fine, it doesn’t matter if I turn off the executor uri or not, but I always get the same error… org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 (TID 23, 10.253.1.117): ExecutorLostFailure (executor 20150424-104711-1375862026-5050-20113-S1 lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at
Re: Spark on Mesos
Yup, exactly as Tim mentioned on it too. I went back and tried what you just suggested and that was also perfectly fine. Steve On May 13, 2015, at 1:58 PM, Tim Chen t...@mesosphere.iomailto:t...@mesosphere.io wrote: Hi Stephen, You probably didn't run the Spark driver/shell as root, as Mesos scheduler will pick up your local user and tries to impersonate as the same user and chown the directory before executing any task. If you try to run Spark driver as root it should resolve the problem. No switch user can also work as it won't try to switch user for you. Tim On Wed, May 13, 2015 at 10:50 AM, Stephen Carman scar...@coldlight.commailto:scar...@coldlight.com wrote: Sander, I eventually solved this problem via the --[no-]switch_user flag, which is set to true by default. I set this to false, which would have the user that owns the process run the job, otherwise it was my username (scarman) running the job, which would fail because obviously my username didn’t exist there. When ran as root, it ran totally fine with no problems what so ever. Hopefully this works for you too, Steve On May 13, 2015, at 11:45 AM, Sander van Dijk sgvand...@gmail.commailto:sgvand...@gmail.com wrote: Hey all, I seem to be experiencing the same thing as Stephen. I run Spark 1.2.1 with Mesos 0.22.1, with Spark coming from the spark-1.2.1-bin-hadoop2.4.tgz prebuilt package, and Mesos installed from the Mesosphere repositories. I have been running with Spark standalone successfully for a while and now trying to setup Mesos. Mesos is up and running, the UI at port 5050 reports all slaves alive. I then run Spark shell with: `spark-shell --master mesos://1.1.1.1:5050` (with 1.1.1.1 the master's ip address), which starts up fine, with output: I0513 15:02:45.340287 28804 sched.cpp:448] Framework registered with 20150512-150459-2618695596-5050-3956-0009 15/05/13 15:02:45 INFO mesos.MesosSchedulerBackend: Registered as framework ID 20150512-150459-2618695596-5050-3956-0009 and the framework shows up in the Mesos UI. Then when trying to run something (e.g. 'val rdd = sc.txtFile(path); rdd.count') fails with lost executors. In /var/log/mesos-slave.ERROR on the slave instances there are entries like: E0513 14:57:01.198995 13077 slave.cpp:3112] Container 'eaf33d36-dde5-498a-9ef1-70138810a38c' for executor '20150512-145720-2618695596-5050-3082-S10' of framework '20150512-150459-2618695596-5050-3956-0009' failed to start: Failed to execute mesos-fetcher: Failed to chown work directory From what I can find, the work directory is in /tmp/mesos, where indeed I see a directory structure with executor and framework IDs, with at the leaves stdout and stderr files of size 0. Everything there is owned by root, but I assume the processes are also run by root, so any chowning in there should be possible. I was thinking maybe it fails to fetch the Spark package executor? I uploaded spark-1.2.1-bin-hadoop2.4.tgz to hdfs, SPARK_EXECUTOR_URI is set in spark-env.sh, and in the Environment section of the web UI I see this picked up in the spark.executor.uriparameter. I checked and the URI is reachable by the slaves: an `hdfs dfs -stat $SPARK_EXECUTOR_URI` is successful. Any pointers? Many thanks, Sander On Fri, May 1, 2015 at 8:35 AM Tim Chen t...@mesosphere.iomailto:t...@mesosphere.io wrote: Hi Stephen, It looks like Mesos slave was most likely not able to launch some mesos helper processes (fetcher probably?). How did you install Mesos? Did you build from source yourself? Please install Mesos through a package or actually from source run make install and run from the installed binary. Tim On Mon, Apr 27, 2015 at 11:11 AM, Stephen Carman scar...@coldlight.commailto:scar...@coldlight.com wrote: So I installed spark on each of the slaves 1.3.1 built with hadoop2.6 I just basically got the pre-built from the spark website… I placed those compiled spark installs on each slave at /opt/spark My spark properties seem to be getting picked up on my side fine… Screen Shot 2015-04-27 at 10.30.01 AM.png The framework is registered in Mesos, it shows up just fine, it doesn’t matter if I turn off the executor uri or not, but I always get the same error… org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 (TID 23, 10.253.1.117): ExecutorLostFailure (executor 20150424-104711-1375862026-5050-20113-S1 lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.orghttp://org.apache.spark.scheduler.dagscheduler.org/$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at
Word2Vec with billion-word corpora
Hi all, I'm experimenting with Spark's Word2Vec implementation for a relatively large (5B word, vocabulary size 4M, 400-dimensional vectors) corpora. Has anybody had success running it at this scale? Thanks in advance for your guidance! -Shilad -- Shilad W. Sen Associate Professor Mathematics, Statistics, and Computer Science Dept. Macalester College s...@macalester.edu http://www.shilad.com https://www.linkedin.com/in/shilad 651-696-6273
Re: Worker Spark Port
Indeed, many thanks. On Wednesday, 13 May 2015, Cody Koeninger c...@koeninger.org wrote: I believe most ports are configurable at this point, look at http://spark.apache.org/docs/latest/configuration.html search for .port On Wed, May 13, 2015 at 9:38 AM, James King jakwebin...@gmail.com javascript:_e(%7B%7D,'cvml','jakwebin...@gmail.com'); wrote: I understated that this port value is randomly selected. Is there a way to enforce which spark port a Worker should use?
Re: how to use rdd.countApprox
Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks,Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
Re: how to set random seed
Easiest way is to broadcast it. On 13 May 2015 10:40, Charles Hayden charles.hay...@atigeo.com wrote: In pySpark, I am writing a map with a lambda that calls random.shuffle. For testing, I want to be able to give it a seed, so that successive runs will produce the same shuffle. I am looking for a way to set this same random seed once on each worker. Is there any simple way to do it?
回复:RE: 回复:Re: sparksql running slow while joining_2_tables.
Hi Hao: I tried broadcastjoin with following steps, and found that my query is still running slow ,not very sure if I'm doing right with broadcastjoin:1.add spark.sql.autoBroadcastJoinThreshold 104857600(100MB) in conf/spark-default.conf. 100MB is larger than any of my 2 tables.2.start bin/spark-sql and confirm this setting worked both in environment page of my spark cluster web UI and sparksql console;3.run ANALYZE TABLE db1 COMPUTE STATISTICS noscan and ANALYZE TABLE sample3 COMPUTE STATISTICS noscan and cache both these tables; 4.use extend plan my query and confirmed broadcasthashjoin is used in the physical plan; 5.run my query select a.chrname,a.startpoint,a.endpoint, a.piece from db1 a join sample3 b on (a.chrname = b.name) where (b.startpoint a.startpoint + 25) and b.endpoint = a.endpoint; So, if there is mistakes in my operation pls point out.thanks. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Cheng, Hao hao.ch...@intel.com 收件人:Cheng, Hao hao.ch...@intel.com, luohui20...@sina.com luohui20...@sina.com, Olivier Girardot ssab...@gmail.com, user user@spark.apache.org 主题:RE: 回复:Re: sparksql running slow while joining_2_tables. 日期:2015年05月05日 08点38分 Or, have you ever try broadcast join? From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Tuesday, May 5, 2015 8:33 AM To: luohui20...@sina.com; Olivier Girardot; user Subject: RE: 回复:Re: sparksql running slow while joining 2 tables. Can you print out the physical plan? EXPLAIN SELECT xxx… From: luohui20...@sina.com [mailto:luohui20...@sina.com] Sent: Monday, May 4, 2015 9:08 PM To: Olivier Girardot; user Subject: 回复:Re: sparksql running slow while joining 2 tables. hi Olivier spark1.3.1, with java1.8.0.45 and add 2 pics . it seems like a GC issue. I also tried with different parameters like memory size of driverexecutor, memory fraction, java opts... but this issue still happens. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Olivier Girardot ssab...@gmail.com 收件人:luohui20...@sina.com, user user@spark.apache.org 主题:Re: sparksql running slow while joining 2 tables. 日期:2015年05月04日 20点46分 Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.name = b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org