kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi,

I have a simple application which fails with the following exception
only when the application is restarted (i.e. the checkpointDir has
entires from a previous execution):

Exception in thread main org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not
been initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266
)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28
4)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt
ream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
la:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251
)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:
116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:227)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:222)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
cala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s
cala:222)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca
la:90)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca
la:67)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala
:512)
at
com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s
cala:115)
at
com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1
5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableF
orwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
at com.brightcove.analytics.tacoma.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit
$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The relavant source is:

class RawLogProcessor(ssc: StreamingContext, topic: String,
kafkaParams: Map[String, String]) {
 // create kafka stream
 val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
 //KafkaUtils.createStream[String, Object, StringDecoder,
KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10),
StorageLevel.MEMORY_AND_DISK_2)

 val eventStream = rawlogDStream
   .map({
 case (key, rawlogVal) =
   val record = rawlogVal.asInstanceOf[GenericData.Record]
   val rlog = RawLog.newBuilder()
 .setId(record.get(id).asInstanceOf[String])
 .setAccount(record.get(account).asInstanceOf[String])
 .setEvent(record.get(event).asInstanceOf[String])
 .setTimestamp(record.get(timestamp).asInstanceOf[Long])
 .setUserAgent(record.get(user_agent).asInstanceOf[String])

.setParams(record.get(params).asInstanceOf[java.util.Map[String,
String]])
 .build()
 

kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread ankurcha
Hi,

I have a simple application which fails with the following exception
only when the application is restarted (i.e. the checkpointDir has
entires from a previous execution):

Exception in thread main org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not
been initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266
)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28
4)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt
ream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
la:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251
)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:
116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:227)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:222)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
cala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s
cala:222)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca
la:90)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca
la:67)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala
:512)
at
com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s
cala:115)
at
com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1
5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableF
orwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
at com.brightcove.analytics.tacoma.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit
$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The relavant source is:

class RawLogProcessor(ssc: StreamingContext, topic: String,
kafkaParams: Map[String, String]) {
 // create kafka stream
 val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
 //KafkaUtils.createStream[String, Object, StringDecoder,
KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10),
StorageLevel.MEMORY_AND_DISK_2)

 val eventStream = rawlogDStream
   .map({
 case (key, rawlogVal) =
   val record = rawlogVal.asInstanceOf[GenericData.Record]
   val rlog = RawLog.newBuilder()
 .setId(record.get(id).asInstanceOf[String])
 .setAccount(record.get(account).asInstanceOf[String])
 .setEvent(record.get(event).asInstanceOf[String])
 .setTimestamp(record.get(timestamp).asInstanceOf[Long])
 .setUserAgent(record.get(user_agent).asInstanceOf[String])

.setParams(record.get(params).asInstanceOf[java.util.Map[String,
String]])
 .build()
   val norm = Normalizer(rlog)
   (key, 

Backward compatibility with org.apache.spark.sql.api.java.Row class

2015-05-13 Thread Emerson Castañeda
Hello everyone

I'm adopting the latest version of Apache Spark on my project, moving from
*1.2.x* to *1.3.x*, and the only significative incompatibility for now is
related to the *Row *class.

Any idea about what did happen to* org.apache.spark.sql.api.java.Row* class
in Apache Spark 1.3 ?


Migration guide on Spark SQL and DataFrames - Spark 1.3.0 Documentation
does not mention anything about it.
https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13


Looking around there is a new *Row *Interface on *org.apache.spark.sql
package,* but I'm not 100% sure if  this is related to my question and
about how to proceed with the upgrading,

Note that this new interface *Row* was not available in the previous
Spark's versions *1.0.0 1.1.0 1.2.0* and even *1.3.0*

Thanks in ahead

Emerson


Increase maximum amount of columns for covariance matrix for principal components

2015-05-13 Thread Sebastian Alfers
Hello,


in order to compute a huge dataset, the amount of columns to calculate the
covariance matrix is limited:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L129

What is the reason behind this limitation and can it be extended?

Greetings

Sebastian


Re: value toDF is not a member of RDD object

2015-05-13 Thread SLiZn Liu
No, creating DF using createDataFrame won’t work:

val peopleDF = sqlContext.createDataFrame(people)

the code can be compiled but raised the same error as toDF at the line
above.

On Wed, May 13, 2015 at 6:22 PM Sebastian Alfers
[sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com)
http://mailto:[sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com)
wrote:

I use:

 val conf = new SparkConf()...
 val sc = new SparkContext(conf)
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 val rdd: RDD[...] = ...
 val schema: StructType = ...

 sqlContext.createDataFrame(rdd, schema)



 2015-05-13 12:00 GMT+02:00 SLiZn Liu sliznmail...@gmail.com:

 Additionally, after I successfully packaged the code, and submitted via 
 spark-submit
 webcat_2.11-1.0.jar, the following error was thrown at the line where
 toDF() been called:

 Exception in thread main java.lang.NoSuchMethodError: 
 scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
   at WebcatApp$.main(webcat.scala:49)
   at WebcatApp.main(webcat.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:569)
   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 Unsurprisingly, if I remove toDF, no error occurred.

 I have moved the case class definition outside of main but inside the
 outer object scope, and removed the provided specification in build.sbt.
 However, when I tried *Dean Wampler*‘s suggestion of using
 sc.createDataFrame() the compiler says this function is not a member of
 sc, and I cannot find any reference in the latest documents. What else
 should I try?

 REGARDS,
 Todd Leo
 ​

 On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com
 wrote:

 Thanks folks, really appreciate all your replies! I tried each of your
 suggestions and in particular, *Animesh*‘s second suggestion of *making
 case class definition global* helped me getting off the trap.

 Plus, I should have paste my entire code with this mail to help the
 diagnose.

 REGARDS,
 Todd Leo
 ​

 On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com
 wrote:

 It's the import statement Olivier showed that makes the method
 available.

 Note that you can also use `sc.createDataFrame(myRDD)`, without the
 need for the import statement. I personally prefer this approach.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com
 wrote:

 you need to instantiate a SQLContext :
 val sc : SparkContext = ...
 val sqlContext = new SQLContext(sc)
 import sqlContext.implicits._

 Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 I added `libraryDependencies += org.apache.spark % spark-sql_2.11
 % 1.3.1` to `build.sbt` but the error remains. Do I need to import
 modules other than `import org.apache.spark.sql.{ Row, SQLContext }`?

 On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com
 wrote:

 toDF is part of spark SQL so you need Spark SQL dependency + import
 sqlContext.implicits._ to get the toDF method.

 Regards,

 Olivier.

 Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 Hi User Group,

 I’m trying to reproduce the example on Spark SQL Programming Guide
 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection,
 and got a compile error when packaging with sbt:

 [error] myfile.scala:30: value toDF is not a member of 
 org.apache.spark.rdd.RDD[Person]
 [error] val people = 
 sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p
  = Person(p(0), p(1).trim.toInt)).toDF()
 [error]
   ^
 [error] one error found
 [error] (compile:compileIncremental) Compilation failed
 [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM

 I double checked my code includes import sqlContext.implicits._
 after reading this post
 https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E
 on spark mailing list, even tried to use toDF(col1, col2)
 suggested by 

Fwd: value toDF is not a member of RDD object

2015-05-13 Thread SLiZn Liu
Are you sure that you are submitting it correctly? Can you post the entire
command you are using to run the .jar file via spark-submit?

Ok, here it is:

/opt/spark-1.3.1-bin-hadoop2.6/bin/spark-submit
target/scala-2.11/webcat_2.11-1.0.jar

However, on the server somehow I have to specify main class using spark-submit
--class WebcatApp --verbose webcat_2.11-1.0.jar to make spark recognize the
main class. The error and stack trace remains the same.
​

-- Forwarded message -
From: Animesh Baranawal animeshbarana...@gmail.com
Date: Wed, May 13, 2015 at 6:49 PM
Subject: Re: value toDF is not a member of RDD object
To: SLiZn Liu sliznmail...@gmail.com


Are you sure that you are submitting it correctly? Can you post the entire
command you are using to run the .jar file via spark-submit?

On Wed, May 13, 2015 at 4:07 PM, SLiZn Liu sliznmail...@gmail.com wrote:

 No, creating DF using createDataFrame won’t work:

 val peopleDF = sqlContext.createDataFrame(people)

 the code can be compiled but raised the same error as toDF at the line
 above.

 On Wed, May 13, 2015 at 6:22 PM Sebastian Alfers
 [sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com)
 http://mailto:%5bsebastian.alf...@googlemail.com%5D(mailto:sebastian.alf...@googlemail.com)
 wrote:

 I use:

 val conf = new SparkConf()...
 val sc = new SparkContext(conf)
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 val rdd: RDD[...] = ...
 val schema: StructType = ...

 sqlContext.createDataFrame(rdd, schema)



 2015-05-13 12:00 GMT+02:00 SLiZn Liu sliznmail...@gmail.com:

 Additionally, after I successfully packaged the code, and submitted via 
 spark-submit
 webcat_2.11-1.0.jar, the following error was thrown at the line where
 toDF() been called:

 Exception in thread main java.lang.NoSuchMethodError: 
 scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
   at WebcatApp$.main(webcat.scala:49)
   at WebcatApp.main(webcat.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:569)
   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 Unsurprisingly, if I remove toDF, no error occurred.

 I have moved the case class definition outside of main but inside the
 outer object scope, and removed the provided specification in build.sbt.
 However, when I tried *Dean Wampler*‘s suggestion of using
 sc.createDataFrame() the compiler says this function is not a member of
 sc, and I cannot find any reference in the latest documents. What else
 should I try?

 REGARDS,
 Todd Leo
 ​

 On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com
 wrote:

 Thanks folks, really appreciate all your replies! I tried each of your
 suggestions and in particular, *Animesh*‘s second suggestion of *making
 case class definition global* helped me getting off the trap.

 Plus, I should have paste my entire code with this mail to help the
 diagnose.

 REGARDS,
 Todd Leo
 ​

 On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com
 wrote:

 It's the import statement Olivier showed that makes the method
 available.

 Note that you can also use `sc.createDataFrame(myRDD)`, without the
 need for the import statement. I personally prefer this approach.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com
 wrote:

 you need to instantiate a SQLContext :
 val sc : SparkContext = ...
 val sqlContext = new SQLContext(sc)
 import sqlContext.implicits._

 Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 I added `libraryDependencies += org.apache.spark %
 spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I 
 need
 to import modules other than `import org.apache.spark.sql.{ Row, 
 SQLContext
 }`?

 On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com
 wrote:

 toDF is part of spark SQL so you need Spark SQL dependency + import
 sqlContext.implicits._ to get the toDF method.

 Regards,

 Olivier.

 Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 Hi User Group,

 I’m trying to reproduce the example on Spark SQL Programming Guide
 

Re: value toDF is not a member of RDD object

2015-05-13 Thread SLiZn Liu
Additionally, after I successfully packaged the code, and submitted
via spark-submit
webcat_2.11-1.0.jar, the following error was thrown at the line where toDF()
been called:

Exception in thread main java.lang.NoSuchMethodError:
scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
  at WebcatApp$.main(webcat.scala:49)
  at WebcatApp.main(webcat.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Unsurprisingly, if I remove toDF, no error occurred.

I have moved the case class definition outside of main but inside the outer
object scope, and removed the provided specification in build.sbt. However,
when I tried *Dean Wampler*‘s suggestion of using sc.createDataFrame() the
compiler says this function is not a member of sc, and I cannot find any
reference in the latest documents. What else should I try?

REGARDS,
Todd Leo
​

On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote:

 Thanks folks, really appreciate all your replies! I tried each of your
 suggestions and in particular, *Animesh*‘s second suggestion of *making
 case class definition global* helped me getting off the trap.

 Plus, I should have paste my entire code with this mail to help the
 diagnose.

 REGARDS,
 Todd Leo
 ​

 On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com
 wrote:

 It's the import statement Olivier showed that makes the method available.

 Note that you can also use `sc.createDataFrame(myRDD)`, without the need
 for the import statement. I personally prefer this approach.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com
 wrote:

 you need to instantiate a SQLContext :
 val sc : SparkContext = ...
 val sqlContext = new SQLContext(sc)
 import sqlContext.implicits._

 Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 I added `libraryDependencies += org.apache.spark % spark-sql_2.11 %
 1.3.1` to `build.sbt` but the error remains. Do I need to import modules
 other than `import org.apache.spark.sql.{ Row, SQLContext }`?

 On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com
 wrote:

 toDF is part of spark SQL so you need Spark SQL dependency + import
 sqlContext.implicits._ to get the toDF method.

 Regards,

 Olivier.

 Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 Hi User Group,

 I’m trying to reproduce the example on Spark SQL Programming Guide
 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection,
 and got a compile error when packaging with sbt:

 [error] myfile.scala:30: value toDF is not a member of 
 org.apache.spark.rdd.RDD[Person]
 [error] val people = 
 sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p
  = Person(p(0), p(1).trim.toInt)).toDF()
 [error]  
 ^
 [error] one error found
 [error] (compile:compileIncremental) Compilation failed
 [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM

 I double checked my code includes import sqlContext.implicits._
 after reading this post
 https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E
 on spark mailing list, even tried to use toDF(col1, col2)
 suggested by Xiangrui Meng in that post and got the same error.

 The Spark version is specified in build.sbt file as follows:

 scalaVersion := 2.11.6
 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 
 % provided
 libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1

 Anyone have ideas the cause of this error?

 REGARDS,
 Todd Leo
 ​





Kafka Direct Approach + Zookeeper

2015-05-13 Thread James King
From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html

I'm trying to use the direct approach to read messages form Kafka.

Kafka is running as a cluster and configured with Zookeeper.

 On the above page it mentions:

In the Kafka parameters, you must specify either *metadata.broker.list* or
*bootstrap.servers*.  ...

Can someone please explain the difference of between the two config
parameters?

And which one is more relevant in my case?

Regards
jk


[Spark SQL 1.3.1] data frame saveAsTable returns exception

2015-05-13 Thread Ishwardeep Singh
Hi ,

I am using Spark SQL 1.3.1.

I have created a dataFrame using jdbc data source and am using saveAsTable()
method but got the following 2 exceptions:

java.lang.RuntimeException: Unsupported datatype DecimalType()
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
at
org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:391)
at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)
at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128)
at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
at
org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:218)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54)
at
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)
at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1121)
at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1071)
at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1037)
at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1015)

java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at
org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215)
at
org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  

Re: how to monitor multi directories in spark streaming task

2015-05-13 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi,

You could retroactively union an existing DStream with one from a
newly created file. Then when another file is detected, you would
need to re-union the stream an create another DStream. It seems like
the implementation of FileInputDStream only looks for files in the
directory and the filtering is applied using
FileSystem.listStatus(dir, filter) method which does not provide
recursive listing.

A cleaner solution would be to extend FileInputDStream and override
the findNewFiles(...) with the ability to recursively list files
(probably by using FileSystem.listFiles.

Refer: http://stackoverflow.com/a/25645225/113411

- -- Ankur


On 13/05/2015 02:03, lisendong wrote:
 but in fact the directories are not ready at the beginning to my
 task .
 
 for example:
 
 /user/root/2015/05/11/data.txt /user/root/2015/05/12/data.txt 
 /user/root/2015/05/13/data.txt
 
 like this.
 
 and one new directory one day.
 
 how to create the new DStream for tomorrow’s new
 directory(/user/root/2015/05/13/) ??
 
 
 在 2015年5月13日,下午4:59,Ankur Chauhan achau...@brightcove.com 写道:
 
 I would suggest creating one DStream per directory and then using 
 StreamingContext#union(...) to get a union DStream.
 
 -- Ankur
 
 On 13/05/2015 00:53, hotdog wrote:
 I want to use use fileStream in spark streaming to monitor
 multi hdfs directories, such as:
 
 val list_join_action_stream = ssc.fileStream[LongWritable,
 Text, TextInputFormat](/user/root/*/*, check_valid_file(_),
  false).map(_._2.toString).print
 
 
 Buy the way, i could not under the meaning of the three class
 : LongWritable, Text, TextInputFormat
 
 but it doesn't work...
 
 
 
 -- View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-monitor-
mul

 
ti-directories-in-spark-streaming-task-tp22863.html
 
 
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
 ---
- --



 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 [attachment]
 
 0x6D461C4A.asc download: http://u.163.com/t0/fqZhSPbA
 
 preview: http://u.163.com/t0/2LRiaRy
 
 
 0x6D461C4A.asc.sig download: http://u.163.com/t0/Ij1N9
 
 0x6D461C4A.asc0x6D461C4A.asc.sig
 
 
-BEGIN PGP SIGNATURE-

iQEbBAEBAgAGBQJVUxl2AAoJEOSJAMhvLp3L4dsH+KxSz/YF7UUiwZDiP36umD1X
3LVU2Io3CGVRDI4OEYs1mvSE2DqMx820DHApl0VxxkYdLmAPUtaAc1zAtWOPgiqQ
GuL0jfdwkVGOBsbF6cycJe6XWMbJUyty0tU1IsvS23OvuhKD2ulgBJieyY/quvSs
dIdFDu4bNhVhuz1KN+Vm44cdfZ/rHchOoaOnSej5zOglSerr/hTFyGZUdalAYMxq
t2P2M2mkHrlqHqqt4EMtEOyi6iDvVPaiaJB8NQ6xbBDs9fSmv3noB5fl19hPc9gk
8G4JbzZkD01Nh2ZRZgH1voE7NPI4P/Z6UTSJBR9qdIgtinoP5JLSBNpRew4WuA==
=7vh9
-END PGP SIGNATURE-

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to monitor multi directories in spark streaming task

2015-05-13 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

I would suggest creating one DStream per directory and then using
StreamingContext#union(...) to get a union DStream.

- -- Ankur

On 13/05/2015 00:53, hotdog wrote:
 I want to use use fileStream in spark streaming to monitor multi
 hdfs directories, such as:
 
 val list_join_action_stream = ssc.fileStream[LongWritable, Text, 
 TextInputFormat](/user/root/*/*, check_valid_file(_), 
 false).map(_._2.toString).print
 
 
 Buy the way, i could not under the meaning of the three class : 
 LongWritable, Text, TextInputFormat
 
 but it doesn't work...
 
 
 
 -- View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-monitor-mul
ti-directories-in-spark-streaming-task-tp22863.html

 
Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -

 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
-BEGIN PGP SIGNATURE-

iQEcBAEBAgAGBQJVUxJ1AAoJEOSJAMhvLp3L2f4IAKK+ouQ2VD7H6s/5w/YGbt2P
uBGJPQ92Hb5REq3f4gK4YecygtAlSAwsqXGCoAaaoPAC7vUMs9RM+slqse1gmUPU
pbORTIB9dv3iVxjPtZ6R8EX14BAlxcIOR6ni2RBHuQTL+dgIEUekmCg0IhFa5lVF
Kt5in8rY5PSnX5l/dX9Yu8LI3uC4TLQ+eJXjjOGXoCHys+SaZWJckA3gVcF9GQdB
dwdhv4UCIYVFj3QIVlLf0+B8FgA0DnRfBC+5ZfS88gcWMc4065sDdx5LkySy4oZB
tB8IpC4yaY3Mqiu8jdvhcw+SevlYan5YkkkutSvKH7nL/0d1WIkEkHxPBjRqAmY=
=U0oQ
-END PGP SIGNATURE-


0x6D461C4A.asc
Description: application/pgp-keys


0x6D461C4A.asc.sig
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi,

I have a simple application which fails with the following exception
only when the application is restarted (i.e. the checkpointDir has
entires from a previous execution):

Exception in thread main org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not
been initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266
)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28
4)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt
ream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
la:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251
)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:
116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:227)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:222)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
cala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s
cala:222)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca
la:90)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca
la:67)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala
:512)
at
com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s
cala:115)
at
com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1
5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableF
orwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
at com.brightcove.analytics.tacoma.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit
$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The relavant source is:

class RawLogProcessor(ssc: StreamingContext, topic: String,
kafkaParams: Map[String, String]) {
 // create kafka stream
 val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
 //KafkaUtils.createStream[String, Object, StringDecoder,
KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10),
StorageLevel.MEMORY_AND_DISK_2)

 val eventStream = rawlogDStream
   .map({
 case (key, rawlogVal) =
   val record = rawlogVal.asInstanceOf[GenericData.Record]
   val rlog = RawLog.newBuilder()
 .setId(record.get(id).asInstanceOf[String])
 .setAccount(record.get(account).asInstanceOf[String])
 .setEvent(record.get(event).asInstanceOf[String])
 .setTimestamp(record.get(timestamp).asInstanceOf[Long])
 .setUserAgent(record.get(user_agent).asInstanceOf[String])

.setParams(record.get(params).asInstanceOf[java.util.Map[String,
String]])
 .build()
 

Re: How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)

2015-05-13 Thread thanhtien522
Earthson wrote
 Finally, I've found two ways:
 
 1. search the output with something like Submitted application
 application_1416319392519_0115
 2. use specific AppName. We could query the ApplicationID(yarn)

Hi Eathson,
Can you explain more about case 2? How can we query the ApplicationID by the
specific AppName?
Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462p22865.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted

2015-05-13 Thread Yifan LI
Hi,

I was running our graphx application(worked finely on Spark 1.2.0) but failed 
on Spark 1.3.1 with below exception.

Anyone has idea on this issue? I guess it was caused by using LZ4 codec?

Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 54 in stage 7.6 failed 128 times, most recent failure: Lost 
task 54.127 in stage 7.6 (TID 5311, small15-tap1.common.lip6.fr): 
com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is 
corrupted
at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
at com.esotericsoftware.kryo.io.Input.require(Input.java:155)
at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:300)
at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:297)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Stream is corrupted
at 
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:152)
at 
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:116)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
... 35 more

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Best,
Yifan LI







Re: Building Spark

2015-05-13 Thread Stephen Boesch
Hi Akhil,   Building with sbt tends to need around 3.5GB whereas maven
requirements are much lower , around 1.7GB. So try using maven .

For reference I have the following settings and both do compile.  sbt would
not work with lower values.


$echo $SBT_OPTS
-Xmx3012m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
$echo $MAVEN_OPTS
-Xmx1280m -XX:MaxPermSize=384m

2015-05-13 5:57 GMT-07:00 Heisenberg Bb hbbalg...@gmail.com:

 I tried to build Spark in my local machine Ubuntu 14.04 ( 4 GB Ram), my
 system  is getting hanged (freezed). When I monitered system processes, the
 build process is found to consume 85% of my memory. Why does it need lot of
 resources. Is there any efficient method to build Spark.

 Thanks
 Akhil



applications are still in progress?

2015-05-13 Thread Yifan LI
Hi,

I have some applications finished(but actually failed before), that in WebUI 
show
Application myApp is still in progress.

and, in the eventlog folder, there are several log files like this:

app-20150512***.inprogress

So, I am wondering what the “inprogress” means…

Thanks! :)


Best,
Yifan LI







Removing FINISHED applications and shuffle data

2015-05-13 Thread sayantini
Hi,



Please help me with below two issues:



*Environment:*



I am running my spark cluster in stand alone mode.



I am initializing the spark context from inside my tomcat server.



I am setting below properties in environment.sh in $SPARK_HOME/conf
directory



SPARK_MASTER_OPTS=-Dspark.deploy.retainedApplications=1
-Dspark.deploy.retainedDrivers=1

SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true
-Dspark.worker.cleanup.interval=600 -Dspark.worker.cleanup.appDataTtl=600



SPARK_LOCAL_DIRS=$user.home/tmp



*Issue 1:*



Still in my $SPARK_HOME/work folder, application-folders continue to grow
as and when I restart the tomcat.



I also tried to stop the spark context (sc.stop()) in tomcat’s
contextDestroyed listener but still I am not able to remove the undesired
application folders.



*Issue 2:*

The ‘tmp’ folder is getting filled up with shuffle data and eating my
entire hard disk. Is there any setting to remove shuffle data of ‘FINISHED’
applications.



Thanks in advance.

 Sayantini


Spark and Flink

2015-05-13 Thread Pa Rö
hi,

i use spark and flink in the same maven project,

now i get a exception on working with spark, flink work well

the problem are transitiv dependencies.

maybe somebody know a solution, or versions, which work together.

best regards
paul

ps: a cloudera maven repo flink would be desirable

my pom:

project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=
http://www.w3.org/2001/XMLSchema-instance;
  xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;
  modelVersion4.0.0/modelVersion

  groupIdmgm.tp.bigdata/groupId
  artifactIdtempGeoKmeans/artifactId
  version0.0.1-SNAPSHOT/version
  packagingjar/packaging

  nametempGeoKmeans/name
  urlhttp://maven.apache.org/url

  properties
project.build.sourceEncodingUTF-8/project.build.sourceEncoding
  /properties

repositories
repository
  idcloudera/id
  url
https://repository.cloudera.com/artifactory/cloudera-repos//url
/repository
  /repositories

  dependencies
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.0-cdh5.2.5/version
/dependency
dependency
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-mapreduce-client-common/artifactId
version2.5.0-cdh5.2.5/version
/dependency
dependency
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-common/artifactId
version2.5.0-cdh5.2.5/version
/dependency
dependency
groupIdorg.apache.mahout/groupId
artifactIdmahout-core/artifactId
version0.9-cdh5.2.5/version
/dependency
dependency
groupIdjunit/groupId
artifactIdjunit/artifactId
version3.8.1/version
scopetest/scope
/dependency
dependency
groupIdorg.apache.flink/groupId
artifactIdflink-core/artifactId
version0.8.1/version
/dependency
dependency
groupIdorg.apache.flink/groupId
artifactIdflink-java/artifactId
version0.8.1/version
/dependency
dependency
groupIdorg.apache.flink/groupId
artifactIdflink-clients/artifactId
version0.8.1/version
/dependency
  /dependencies
/project

my exception:

5/05/13 15:00:48 WARN component.AbstractLifeCycle: FAILED
org.eclipse.jetty.servlet.DefaultServlet-461261579:
java.lang.NoSuchMethodError:
org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V
java.lang.NoSuchMethodError:
org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V
at
org.eclipse.jetty.servlet.NIOResourceCache.init(NIOResourceCache.java:41)
at
org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223)
at javax.servlet.GenericServlet.init(GenericServlet.java:244)
at
org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442)
at
org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at
org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721)
at
org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279)
at
org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717)
at
org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at
org.eclipse.jetty.server.handler.HandlerCollection.doStart(HandlerCollection.java:229)
at
org.eclipse.jetty.server.handler.ContextHandlerCollection.doStart(ContextHandlerCollection.java:172)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at
org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:95)
at org.eclipse.jetty.server.Server.doStart(Server.java:282)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199)
at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
at
org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:209)
at org.apache.spark.ui.WebUI.bind(WebUI.scala:102)
at org.apache.spark.SparkContext.init(SparkContext.scala:224)
at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
at mgm.tp.bigdata.tempGeoKmeans.Spark.SparkMain.main(SparkMain.java:37)
15/05/13 15:00:48 WARN component.AbstractLifeCycle: FAILED

Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread James King
Many thanks Cody and contributors for the help.


On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger c...@koeninger.org wrote:

 Either one will work, there is no semantic difference.

 The reason I designed the direct api to accept both of those keys is
 because they were used to define lists of brokers in pre-existing Kafka
 project apis.  I don't know why the Kafka project chose to use 2 different
 configuration keys.

 On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com wrote:

 From:
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html

 I'm trying to use the direct approach to read messages form Kafka.

 Kafka is running as a cluster and configured with Zookeeper.

  On the above page it mentions:

 In the Kafka parameters, you must specify either *metadata.broker.list*
  or *bootstrap.servers*.  ...

 Can someone please explain the difference of between the two config
 parameters?

 And which one is more relevant in my case?

 Regards
 jk





JavaPairRDD

2015-05-13 Thread Yasemin Kaya
Hi,

I want to get  *JavaPairRDDString, String *from the tuple part of
*JavaPairRDDString,
Tuple2String, String  .*

As an example: (
http://www.koctas.com.tr/reyon/el-aletleri/7,(0,1,0,0,0,0,0,0,46551))
in my *JavaPairRDDString,
Tuple2String, String *and I want to get
*( (46551), (0,1,0,0,0,0,0,0) )*

I try to split tuple._2() and create new JavaPairRDD but I can't.
How can I get that ?

Have a nice day
yasemin
-- 
hiç ender hiç


Building Spark

2015-05-13 Thread Heisenberg Bb
I tried to build Spark in my local machine Ubuntu 14.04 ( 4 GB Ram), my
system  is getting hanged (freezed). When I monitered system processes, the
build process is found to consume 85% of my memory. Why does it need lot of
resources. Is there any efficient method to build Spark.

Thanks
Akhil


Re: Spark and Flink

2015-05-13 Thread Ted Yu
You can run the following command:
mvn dependency:tree

And see what jetty versions are brought in. 

Cheers



 On May 13, 2015, at 6:07 AM, Pa Rö paul.roewer1...@googlemail.com wrote:
 
 hi,
 
 i use spark and flink in the same maven project,
 
 now i get a exception on working with spark, flink work well
 
 the problem are transitiv dependencies.
 
 maybe somebody know a solution, or versions, which work together.
 
 best regards
 paul
 
 ps: a cloudera maven repo flink would be desirable
 
 my pom:
 
 project xmlns=http://maven.apache.org/POM/4.0.0; 
 xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
   xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
 http://maven.apache.org/xsd/maven-4.0.0.xsd;
   modelVersion4.0.0/modelVersion
 
   groupIdmgm.tp.bigdata/groupId
   artifactIdtempGeoKmeans/artifactId
   version0.0.1-SNAPSHOT/version
   packagingjar/packaging
 
   nametempGeoKmeans/name
   urlhttp://maven.apache.org/url
 
   properties
 project.build.sourceEncodingUTF-8/project.build.sourceEncoding
   /properties
 
 repositories
 repository
   idcloudera/id
   
 urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url
 /repository
   /repositories
 
   dependencies
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.1.0-cdh5.2.5/version
 /dependency
 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-mapreduce-client-common/artifactId
 version2.5.0-cdh5.2.5/version
 /dependency
 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-common/artifactId
 version2.5.0-cdh5.2.5/version
 /dependency
 dependency
 groupIdorg.apache.mahout/groupId
 artifactIdmahout-core/artifactId
 version0.9-cdh5.2.5/version
 /dependency
 dependency
 groupIdjunit/groupId
 artifactIdjunit/artifactId
 version3.8.1/version
 scopetest/scope
 /dependency
 dependency
 groupIdorg.apache.flink/groupId
 artifactIdflink-core/artifactId
 version0.8.1/version
 /dependency
 dependency
 groupIdorg.apache.flink/groupId
 artifactIdflink-java/artifactId
 version0.8.1/version
 /dependency
 dependency
 groupIdorg.apache.flink/groupId
 artifactIdflink-clients/artifactId
 version0.8.1/version
 /dependency
   /dependencies
 /project
 
 my exception:
 
 5/05/13 15:00:48 WARN component.AbstractLifeCycle: FAILED 
 org.eclipse.jetty.servlet.DefaultServlet-461261579: 
 java.lang.NoSuchMethodError: 
 org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V
 java.lang.NoSuchMethodError: 
 org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V
 at 
 org.eclipse.jetty.servlet.NIOResourceCache.init(NIOResourceCache.java:41)
 at org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223)
 at javax.servlet.GenericServlet.init(GenericServlet.java:244)
 at 
 org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442)
 at org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270)
 at 
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at 
 org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721)
 at 
 org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279)
 at 
 org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717)
 at 
 org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155)
 at 
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at 
 org.eclipse.jetty.server.handler.HandlerCollection.doStart(HandlerCollection.java:229)
 at 
 org.eclipse.jetty.server.handler.ContextHandlerCollection.doStart(ContextHandlerCollection.java:172)
 at 
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at 
 org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:95)
 at org.eclipse.jetty.server.Server.doStart(Server.java:282)
 at 
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at 
 org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199)
 at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
 at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
 at 
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
 at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:209)
 

Kafka + Direct + Zookeeper

2015-05-13 Thread James King
I'm trying Kafka Direct approach (for consume) but when I use only this
config:

kafkaParams.put(group.id, groupdid);
kafkaParams.put(zookeeper.connect, zookeeperHostAndPort + /cb_kafka);

I get this

Exception in thread main org.apache.spark.SparkException: Must specify
metadata.broker.list or bootstrap.servers

Zookeeper should have enough information to provide connection details?

or am I missing something?


Re: Building Spark

2015-05-13 Thread Emre Sevinc
My 2 cents: If you have Java 8, you don't need any extra settings for
Maven.

--
Emre Sevinç

On Wed, May 13, 2015 at 3:02 PM, Stephen Boesch java...@gmail.com wrote:

 Hi Akhil,   Building with sbt tends to need around 3.5GB whereas maven
 requirements are much lower , around 1.7GB. So try using maven .

 For reference I have the following settings and both do compile.  sbt
 would not work with lower values.


 $echo $SBT_OPTS
 -Xmx3012m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
 $echo $MAVEN_OPTS
 -Xmx1280m -XX:MaxPermSize=384m

 2015-05-13 5:57 GMT-07:00 Heisenberg Bb hbbalg...@gmail.com:

 I tried to build Spark in my local machine Ubuntu 14.04 ( 4 GB Ram), my
 system  is getting hanged (freezed). When I monitered system processes, the
 build process is found to consume 85% of my memory. Why does it need lot of
 resources. Is there any efficient method to build Spark.

 Thanks
 Akhil





-- 
Emre Sevinc


Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread Cody Koeninger
Either one will work, there is no semantic difference.

The reason I designed the direct api to accept both of those keys is
because they were used to define lists of brokers in pre-existing Kafka
project apis.  I don't know why the Kafka project chose to use 2 different
configuration keys.

On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com wrote:

 From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html

 I'm trying to use the direct approach to read messages form Kafka.

 Kafka is running as a cluster and configured with Zookeeper.

  On the above page it mentions:

 In the Kafka parameters, you must specify either *metadata.broker.list*
  or *bootstrap.servers*.  ...

 Can someone please explain the difference of between the two config
 parameters?

 And which one is more relevant in my case?

 Regards
 jk



how to read lz4 compressed data using fileStream of spark streaming?

2015-05-13 Thread hotdog
in spark streaming, I want to use fileStream to monitor a directory. But the
files in that directory are compressed using lz4. So the new lz4 files are
not detected by the following code. How to detect these new files?

val list_join_action_stream = ssc.fileStream[LongWritable, Text,
TextInputFormat](gc.input_dir, (t: Path) = true, false).map(_._2.toString)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Cody Koeninger
As far as I can tell, Dibyendu's cons boil down to:

1. Spark checkpoints can't be recovered if you upgrade code
2. Some Spark transformations involve a shuffle, which can repartition data

It's not accurate to imply that either one of those things are inherently
cons of the direct stream api.

Regarding checkpoints, nothing about the direct stream requires you to use
checkpoints.  You can save offsets in a checkpoint, your own database, or
not save offsets at all (as James wants).  One might even say that the
direct stream api is . . . flexible . . . in that regard.

Regarding partitions, the direct stream api gives you the same ordering
guarantee as Kafka, namely that within a given partition messages will be
in increasing offset order.   Clearly if you do a transformation that
repartitions the stream, that no longer holds.  Thing is, that doesn't
matter if you're saving offsets and results for each rdd in the driver.
The offset ranges for the original rdd don't change as a result of the
transformation you executed, they're immutable.

Sure, you can get into trouble if you're trying to save offsets / results
per partition on the executors, after a shuffle of some kind. You can avoid
this pretty easily by just using normal scala code to do your
transformation on the iterator inside a foreachPartition.  Again, this
isn't a con of the direct stream api, this is just a need to understand
how Spark works.



On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in Pearson
 for last 4-5 months without any downtime. I think this one is the reliable
 Receiver Based Kafka consumer as of today for Spark .. if you say it that
 way ..

 Prior to Spark 1.3 other Receiver based consumers have used Kafka High
 level APIs which has serious issue with re-balancing and lesser fault
 tolerant aspect and data loss .

 Cody's implementation is definitely a good approach using direct stream ,
 but both direct stream based approach and receiver based low level consumer
 approach has pros and cons. Like Receiver based approach need to use WAL
 for recovery from Driver failure which is a overhead for Kafka like system
 . For direct stream the offsets stored as check-pointed directory got lost
 if driver code is modified ..you can manage offset from your driver but for
 derived stream generated from this direct stream , there is no guarantee
 that batches are processed is order ( and offsets commits in order ) .. etc
 ..

 So whoever use whichever consumer need to study pros and cons of both
 approach before taking a call ..

 Regards,
 Dibyendu







 On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with the
 low level kafka api prior to KafkfaRDDs which is the future it seems. My
 apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you have personal
 issues at stake, please take them up outside of the public list.  If you
 have actual factual concerns about the kafka integration, please share them
 in a jira.

 Regarding reliability, here's a screenshot of a current production job
 with a 3 week uptime  Was a month before that, only took it down to change
 code.

 http://tinypic.com/r/2e4vkht/8

 Regarding flexibility, both of the apis available in spark will do what
 James needs, as I described.



 On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for
 days maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked library
 is much more flexible/reliable than what's available in Spark at this
 point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will 
 start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com
 wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Yep, you can try this 

Worker Spark Port

2015-05-13 Thread James King
I understated that this port value is randomly selected.

Is there a way to enforce which spark port a Worker should use?


Spark Sorted DataFrame Repartitioning

2015-05-13 Thread Night Wolf
Hi guys,

If I load a dataframe via a sql context that has a SORT BY in the query and
I want to repartition the data frame will it keep the sort order in each
partition?

I want to repartition because I'm going to run a Map that generates lots of
data internally so to avoid Out Of Memory errors I need to create smaller
partitions.

The source of the table is a parquet file.

sqlContext.sql(select * from tblx sort by colA)
  .repartition(defaultParallelism * 40)
  .map { //..make all the rows }

Cheers,
~N


Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread Cody Koeninger
In my mind, this isn't really a producer vs consumer distinction, this is a
broker vs zookeeper distinction.

The producer apis talk to brokers. The low level consumer api (what direct
stream uses) also talks to brokers.  The high level consumer api talks to
zookeeper, at least initially.

TLDR; don't worry about it, just specify either of metadata.broker.list or
bootstrap.servers, using the exact same host:port,host:port format, and
you're good to go.


On Wed, May 13, 2015 at 9:03 AM, James King jakwebin...@gmail.com wrote:

 Looking at Consumer Configs in
 http://kafka.apache.org/documentation.html#consumerconfigs

 The properties  *metadata.broker.list* or *bootstrap.servers *are not
 mentioned.

 Should I need these for consume side?

 On Wed, May 13, 2015 at 3:52 PM, James King jakwebin...@gmail.com wrote:

 Many thanks Cody and contributors for the help.


 On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Either one will work, there is no semantic difference.

 The reason I designed the direct api to accept both of those keys is
 because they were used to define lists of brokers in pre-existing Kafka
 project apis.  I don't know why the Kafka project chose to use 2 different
 configuration keys.

 On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com
 wrote:

 From:
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html

 I'm trying to use the direct approach to read messages form Kafka.

 Kafka is running as a cluster and configured with Zookeeper.

  On the above page it mentions:

 In the Kafka parameters, you must specify either
 *metadata.broker.list* or *bootstrap.servers*.  ...

 Can someone please explain the difference of between the two config
 parameters?

 And which one is more relevant in my case?

 Regards
 jk







Re: value toDF is not a member of RDD object

2015-05-13 Thread Todd Nist
I believe what Dean Wampler was suggesting is to use the sqlContext not the
sparkContext (sc), which is where the createDataFrame function resides:

https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.SQLContext

HTH.

-Todd

On Wed, May 13, 2015 at 6:00 AM, SLiZn Liu sliznmail...@gmail.com wrote:

 Additionally, after I successfully packaged the code, and submitted via 
 spark-submit
 webcat_2.11-1.0.jar, the following error was thrown at the line where
 toDF() been called:

 Exception in thread main java.lang.NoSuchMethodError: 
 scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
   at WebcatApp$.main(webcat.scala:49)
   at WebcatApp.main(webcat.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 Unsurprisingly, if I remove toDF, no error occurred.

 I have moved the case class definition outside of main but inside the
 outer object scope, and removed the provided specification in build.sbt.
 However, when I tried *Dean Wampler*‘s suggestion of using
 sc.createDataFrame() the compiler says this function is not a member of sc,
 and I cannot find any reference in the latest documents. What else should I
 try?

 REGARDS,
 Todd Leo
 ​

 On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote:

 Thanks folks, really appreciate all your replies! I tried each of your
 suggestions and in particular, *Animesh*‘s second suggestion of *making
 case class definition global* helped me getting off the trap.

 Plus, I should have paste my entire code with this mail to help the
 diagnose.

 REGARDS,
 Todd Leo
 ​

 On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com
 wrote:

 It's the import statement Olivier showed that makes the method
 available.

 Note that you can also use `sc.createDataFrame(myRDD)`, without the need
 for the import statement. I personally prefer this approach.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com
 wrote:

 you need to instantiate a SQLContext :
 val sc : SparkContext = ...
 val sqlContext = new SQLContext(sc)
 import sqlContext.implicits._

 Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 I added `libraryDependencies += org.apache.spark % spark-sql_2.11
 % 1.3.1` to `build.sbt` but the error remains. Do I need to import
 modules other than `import org.apache.spark.sql.{ Row, SQLContext }`?

 On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com
 wrote:

 toDF is part of spark SQL so you need Spark SQL dependency + import
 sqlContext.implicits._ to get the toDF method.

 Regards,

 Olivier.

 Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 Hi User Group,

 I’m trying to reproduce the example on Spark SQL Programming Guide
 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection,
 and got a compile error when packaging with sbt:

 [error] myfile.scala:30: value toDF is not a member of 
 org.apache.spark.rdd.RDD[Person]
 [error] val people = 
 sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p
  = Person(p(0), p(1).trim.toInt)).toDF()
 [error] 
  ^
 [error] one error found
 [error] (compile:compileIncremental) Compilation failed
 [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM

 I double checked my code includes import sqlContext.implicits._
 after reading this post
 https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E
 on spark mailing list, even tried to use toDF(col1, col2)
 suggested by Xiangrui Meng in that post and got the same error.

 The Spark version is specified in build.sbt file as follows:

 scalaVersion := 2.11.6
 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 
 % provided
 libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1

 Anyone have ideas the cause of this error?

 REGARDS,
 Todd Leo
 ​





Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread James King
Many thanks Cody!

On Wed, May 13, 2015 at 4:22 PM, Cody Koeninger c...@koeninger.org wrote:

 In my mind, this isn't really a producer vs consumer distinction, this is
 a broker vs zookeeper distinction.

 The producer apis talk to brokers. The low level consumer api (what direct
 stream uses) also talks to brokers.  The high level consumer api talks to
 zookeeper, at least initially.

 TLDR; don't worry about it, just specify either of metadata.broker.list or
 bootstrap.servers, using the exact same host:port,host:port format, and
 you're good to go.


 On Wed, May 13, 2015 at 9:03 AM, James King jakwebin...@gmail.com wrote:

 Looking at Consumer Configs in
 http://kafka.apache.org/documentation.html#consumerconfigs

 The properties  *metadata.broker.list* or *bootstrap.servers *are not
 mentioned.

 Should I need these for consume side?

 On Wed, May 13, 2015 at 3:52 PM, James King jakwebin...@gmail.com
 wrote:

 Many thanks Cody and contributors for the help.


 On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Either one will work, there is no semantic difference.

 The reason I designed the direct api to accept both of those keys is
 because they were used to define lists of brokers in pre-existing Kafka
 project apis.  I don't know why the Kafka project chose to use 2 different
 configuration keys.

 On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com
 wrote:

 From:
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html

 I'm trying to use the direct approach to read messages form Kafka.

 Kafka is running as a cluster and configured with Zookeeper.

  On the above page it mentions:

 In the Kafka parameters, you must specify either
 *metadata.broker.list* or *bootstrap.servers*.  ...

 Can someone please explain the difference of between the two config
 parameters?

 And which one is more relevant in my case?

 Regards
 jk








Re: kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

shows setting up your stream and calling .checkpoint(checkpointDir) inside
the functionToCreateContext.  It looks to me like you're setting up your
stream and calling checkpoint outside, after getOrCreate.

I'm not sure that's the issue (someone who knows checkpoints better than I
do should chime in), but that's the first thing I noticed.

On Wed, May 13, 2015 at 4:06 AM, ankurcha achau...@brightcove.com wrote:

 Hi,

 I have a simple application which fails with the following exception
 only when the application is restarted (i.e. the checkpointDir has
 entires from a previous execution):

 Exception in thread main org.apache.spark.SparkException:
 org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not
 been initialized
 at
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266
 )
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
 (DStream.scala:287)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
 (DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28
 4)
 at
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt
 ream.scala:38)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
 ala:116)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
 ala:116)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
 e.scala:251)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
 e.scala:251)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
 la:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251
 )
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:
 116)
 at
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
 ly(JobGenerator.scala:227)
 at
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
 ly(JobGenerator.scala:222)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
 cala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s
 cala:222)
 at
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca
 la:90)
 at
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca
 la:67)
 at
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala
 :512)
 at
 com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s
 cala:115)
 at
 com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1
 5)
 at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
 at
 scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
 at scala.App$$anonfun$main$1.apply(App.scala:71)
 at scala.App$$anonfun$main$1.apply(App.scala:71)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.generic.TraversableForwarder$class.foreach(TraversableF
 orwarder.scala:32)
 at scala.App$class.main(App.scala:71)
 at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
 at com.brightcove.analytics.tacoma.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
 a:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
 Impl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:483)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit
 $$runMain(SparkSubmit.scala:569)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 The relavant source is:

 class RawLogProcessor(ssc: StreamingContext, topic: String,
 kafkaParams: Map[String, String]) {
  // create kafka stream
  val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
 StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
  //KafkaUtils.createStream[String, Object, StringDecoder,
 KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10),
 StorageLevel.MEMORY_AND_DISK_2)

  val 

Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception

2015-05-13 Thread Ishwardeep Singh
Hi,

I am using spark-shell and the steps using which I can reproduce the issue
are as follows:

scala val dateDimDF=
sqlContext.load(jdbc,Map(url-jdbc:teradata://192.168.145.58/DBS_PORT=1025,DATABASE=BENCHQADS,LOB_SUPPORT=OFF,USER=
BENCHQADS,PASSWORD=abc,dbtable - date_dim)) 

scala dateDimDF.printSchema()

root
 |-- d_date_sk: integer (nullable = false)
 |-- d_date_id: string (nullable = false)
 |-- d_date: date (nullable = true)
 |-- d_month_seq: integer (nullable = true)
 |-- d_week_seq: integer (nullable = true)
 |-- d_quarter_seq: integer (nullable = true)
 |-- d_year: integer (nullable = true)
 |-- d_dow: integer (nullable = true)
 |-- d_moy: integer (nullable = true)
 |-- d_dom: integer (nullable = true)
 |-- d_qoy: integer (nullable = true)
 |-- d_fy_year: integer (nullable = true)
 |-- d_fy_quarter_seq: integer (nullable = true)
 |-- d_fy_week_seq: integer (nullable = true)
 |-- d_day_name: string (nullable = true)
 |-- d_quarter_name: string (nullable = true)
 |-- d_holiday: string (nullable = true)
 |-- d_weekend: string (nullable = true)
 |-- d_following_holiday: string (nullable = true)
 |-- d_first_dom: integer (nullable = true)
 |-- d_last_dom: integer (nullable = true)
 |-- d_same_day_ly: integer (nullable = true)
 |-- d_same_day_lq: integer (nullable = true)
 |-- d_current_day: string (nullable = true)
 |-- d_current_week: string (nullable = true)
 |-- d_current_month: string (nullable = true)
 |-- d_current_quarter: string (nullable = true)
 |-- d_current_year: string (nullable = true)

scala dateDimDF.saveAsTable(date_dim_tera_save)

15/05/13 19:57:05 INFO JDBCRDD: closed connection
15/05/13 19:57:05 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at
org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215)
at
org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
15/05/13 19:57:05 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
localhost): java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at
org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215)
at
org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
at
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)


scala  val 

Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Dibyendu Bhattacharya
Thanks Cody for your email. I think my concern was not to get the ordering
of message within a partition , which as you said is possible if one knows
how Spark works. The issue is how Spark schedule jobs on every batch  which
is not on the same order they generated. So if that is not guaranteed it
does not matter if you manege order within your partition. So depends on
par-partition ordering to commit offset may leads to offsets commit in
wrong order.

In this thread you have discussed this as well and some workaround  :

https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

So again , one need to understand every details of a Consumer to take a
decision if that solves their use case.

Regards,
Dibyendu

On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition data

 It's not accurate to imply that either one of those things are inherently
 cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to use
 checkpoints.  You can save offsets in a checkpoint, your own database, or
 not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing is, that doesn't
 matter if you're saving offsets and results for each rdd in the driver.
 The offset ranges for the original rdd don't change as a result of the
 transformation you executed, they're immutable.

 Sure, you can get into trouble if you're trying to save offsets / results
 per partition on the executors, after a shuffle of some kind. You can avoid
 this pretty easily by just using normal scala code to do your
 transformation on the iterator inside a foreachPartition.  Again, this
 isn't a con of the direct stream api, this is just a need to understand
 how Spark works.



 On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in
 Pearson for last 4-5 months without any downtime. I think this one is the
 reliable Receiver Based Kafka consumer as of today for Spark .. if you
 say it that way ..

 Prior to Spark 1.3 other Receiver based consumers have used Kafka High
 level APIs which has serious issue with re-balancing and lesser fault
 tolerant aspect and data loss .

 Cody's implementation is definitely a good approach using direct stream ,
 but both direct stream based approach and receiver based low level consumer
 approach has pros and cons. Like Receiver based approach need to use WAL
 for recovery from Driver failure which is a overhead for Kafka like system
 . For direct stream the offsets stored as check-pointed directory got lost
 if driver code is modified ..you can manage offset from your driver but for
 derived stream generated from this direct stream , there is no guarantee
 that batches are processed is order ( and offsets commits in order ) .. etc
 ..

 So whoever use whichever consumer need to study pros and cons of both
 approach before taking a call ..

 Regards,
 Dibyendu







 On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with the
 low level kafka api prior to KafkfaRDDs which is the future it seems. My
 apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you have personal
 issues at stake, please take them up outside of the public list.  If you
 have actual factual concerns about the kafka integration, please share them
 in a jira.

 Regarding reliability, here's a screenshot of a current production job
 with a 3 week uptime  Was a month before that, only took it down to change
 code.

 http://tinypic.com/r/2e4vkht/8

 Regarding flexibility, both of the apis available in spark will do what
 James needs, as I described.



 On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for
 days maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked
 library is much more flexible/reliable than what's available in Spark 
 at
 this point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark 

Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread James King
Looking at Consumer Configs in
http://kafka.apache.org/documentation.html#consumerconfigs

The properties  *metadata.broker.list* or *bootstrap.servers *are not
mentioned.

Should I need these for consume side?

On Wed, May 13, 2015 at 3:52 PM, James King jakwebin...@gmail.com wrote:

 Many thanks Cody and contributors for the help.


 On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Either one will work, there is no semantic difference.

 The reason I designed the direct api to accept both of those keys is
 because they were used to define lists of brokers in pre-existing Kafka
 project apis.  I don't know why the Kafka project chose to use 2 different
 configuration keys.

 On Wed, May 13, 2015 at 5:00 AM, James King jakwebin...@gmail.com
 wrote:

 From:
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html

 I'm trying to use the direct approach to read messages form Kafka.

 Kafka is running as a cluster and configured with Zookeeper.

  On the above page it mentions:

 In the Kafka parameters, you must specify either *metadata.broker.list*
  or *bootstrap.servers*.  ...

 Can someone please explain the difference of between the two config
 parameters?

 And which one is more relevant in my case?

 Regards
 jk






Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Cody Koeninger
You linked to a google mail tab, not a public archive, so I don't know
exactly which conversation you're referring to.

As far as I know, streaming only runs a single job at a time in the order
they were defined, unless you turn on an experimental option for more
parallelism (TD or someone more knowledgeable can chime in on this).  If
you're talking about the possibility of the next job starting before the
prior one has fully finished, because your processing is lagging behind...
I'm not 100% sure this is possible because I've never observed it.

The thing is, it's a moot point, because if you're saving offsets yourself
transactionally, you already need to be verifying that offsets are correct
(increasing without gaps) in order to handle restarts correctly.

If you're super concerned about how batches get generated, the direct api
gives you access to KafkaUtils.createRDD... just schedule your own rdds in
the order you want.  Again, flexible.




On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Thanks Cody for your email. I think my concern was not to get the ordering
 of message within a partition , which as you said is possible if one knows
 how Spark works. The issue is how Spark schedule jobs on every batch  which
 is not on the same order they generated. So if that is not guaranteed it
 does not matter if you manege order within your partition. So depends on
 par-partition ordering to commit offset may leads to offsets commit in
 wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are inherently
 cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing is, that doesn't
 matter if you're saving offsets and results for each rdd in the driver.
 The offset ranges for the original rdd don't change as a result of the
 transformation you executed, they're immutable.

 Sure, you can get into trouble if you're trying to save offsets / results
 per partition on the executors, after a shuffle of some kind. You can avoid
 this pretty easily by just using normal scala code to do your
 transformation on the iterator inside a foreachPartition.  Again, this
 isn't a con of the direct stream api, this is just a need to understand
 how Spark works.



 On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in
 Pearson for last 4-5 months without any downtime. I think this one is the
 reliable Receiver Based Kafka consumer as of today for Spark .. if you
 say it that way ..

 Prior to Spark 1.3 other Receiver based consumers have used Kafka High
 level APIs which has serious issue with re-balancing and lesser fault
 tolerant aspect and data loss .

 Cody's implementation is definitely a good approach using direct stream
 , but both direct stream based approach and receiver based low level
 consumer approach has pros and cons. Like Receiver based approach need to
 use WAL for recovery from Driver failure which is a overhead for Kafka like
 system . For direct stream the offsets stored as check-pointed directory
 got lost if driver code is modified ..you can manage offset from your
 driver but for derived stream generated from this direct stream , there is
 no guarantee that batches are processed is order ( and offsets commits in
 order ) .. etc ..

 So whoever use whichever consumer need to study pros and cons of both
 approach before taking a call ..

 Regards,
 Dibyendu







 On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with
 the low level kafka api prior to KafkfaRDDs which is the future it seems.
 My apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you 

Re: Trouble trying to run ./spark-ec2 script

2015-05-13 Thread Ted Yu
Is python installed on the machine where you ran ./spark-ec2 ?

Cheers

On Wed, May 13, 2015 at 1:33 PM, Su She suhsheka...@gmail.com wrote:

 I'm trying to set up my own cluster and am having trouble running this
 script:

 ./spark-ec2 --key-pair=xx --identity-file=xx.pem --region=us-west-2
 --zone=us-west-2c --num-slaves=1 launch my-spark-cluster

 based off: https://spark.apache.org/docs/latest/ec2-scripts.html

 It just tries to open the spark-ec2 file instead of trying to run it
 (asks me if I want to open the file and what program to use to open
 it), I'm guessing this is something small that i'm doing wrong?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: how to use rdd.countApprox

2015-05-13 Thread Du Li
I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside 
dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context 
seems no longer valid, which crashes subsequent jobs.
My spark version is 1.1.1. I will do more investigation into this issue, 
perhaps after upgrading to 1.3.1, and then file a JIRA if it persists.
Is there a way to get stage or task id of a particular transformation or action 
on RDD and then selectively kill the stage or tasks? It would be necessary and 
useful in situations similar to countApprox.
Thanks,Du 


 On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com 
wrote:
   

 That is not supposed to happen :/ That is probably a bug.If you have the log4j 
logs, would be good to file a JIRA. This may be worth debugging.
On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote:

Actually I tried that before asking. However, it killed the spark context. :-)
Du 


 On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com 
wrote:
   

 That is a good question. I dont see a direct way to do that. 
You could do try the following 
val jobGroupId = 
group-id-based-on-current-timerdd.sparkContext.setJobGroup(jobGroupId)val 
approxCount = rdd.countApprox().getInitialValue   // job launched with the set 
job grouprdd.sparkContext.cancelJobGroup(jobGroupId)           // cancel the job


On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote:

Hi TD,
Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? 
Otherwise it keeps running until completion, producing results not used but 
consuming resources.
Thanks,Du 


 On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID 
wrote:
   

  Hi TD,
Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app 
is standing a much better chance to complete processing each batch within the 
batch interval.
Du


 On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com 
wrote:
   

 From the code it seems that as soon as the  rdd.countApprox(5000) returns, 
you can call pResult.initialValue() to get the approximate count at that 
point of time (that is after timeout). Calling pResult.getFinalValue() will 
further block until the job is over, and give the final correct values that you 
would have received by rdd.count()
On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote:

HI,
I tested the following in my streaming app and hoped to get an approximate 
count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed 
to always return after it finishes completely, just like rdd.count(), which 
often exceeded 5 seconds. The values for low, mean, and high were the same.
val pResult = rdd.countApprox(5000)val bDouble = 
pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): 
low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, 
high=${bDouble.high.toLong})
Can any expert here help explain the right way of usage?
Thanks,Du


 



 On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID 
wrote:
   

 I have to count RDD's in a spark streaming app. When data goes large, count() 
becomes expensive. Did anybody have experience using countApprox()? How 
accurate/reliable is it? 
The documentation is pretty modest. Suppose the timeout parameter is in 
milliseconds. Can I retrieve the count value by calling getFinalValue()? Does 
it block and return only after the timeout? Or do I need to define 
onComplete/onFail handlers to extract count value from the partial results?
Thanks,Du

   



   

   



   



  

Spark performance in cluster mode using yarn

2015-05-13 Thread sachin Singh
Hi Friends,
please someone can give the idea, Ideally what should be time(complete job
execution) for spark job,

I have data in a hive table, amount of data would be 1GB , 2 lacs rows for
whole month,
I want to do monthly aggregation, using SQL queries,groupby

I have only one node,1 cluster,below configuration for running job,
--num-executors 2 --driver-memory 3g --driver-java-options
-XX:MaxPermSize=1G --executor-memory 2g --executor-cores 2

how much approximate time require to finish the job,

or can someone suggest the best way to get quickly results,

Thanks in advance,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-in-cluster-mode-using-yarn-tp22877.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PostgreSQL JDBC Classpath Issue

2015-05-13 Thread George Adams
Hey all, I seem to be having an issue with PostgreSQL JDBC jar on my classpath. 
I’ve outlined the issue on Stack Overflow 
(http://stackoverflow.com/questions/30221677/spark-sql-postgresql-data-source-issues).
 I’m not sure how to fix this since I built the uber jar using sbt-assembly and 
the final jar does have org/postgresql/Driver.class.


—
George Adams, IV
Software Craftsman
Brand Networks, Inc.
(585) 902-8822

Problem with current spark

2015-05-13 Thread Giovanni Paolo Gibilisco
Hi,
I'm trying to run an application that uses a Hive context to perform some
queries over JSON files.
The code of the application is here:
https://github.com/GiovanniPaoloGibilisco/spark-log-processor/tree/fca93d95a227172baca58d51a4d799594a0429a1

I can run it on Spark 1.3.1 after rebuilding it with hive support
using: mvn -Phive -Phive-thriftserver -DskipTests clean package
but when I try to run the same application on the one built fromt he
current master branch (at this commit of today
https://github.com/apache/spark/tree/bec938f777a2e18757c7d04504d86a5342e2b49e)
again built with hive support I get an error at Stage 2 that is not
submitted, and after a while the application is killed.
The logs look like this:

15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0
15/05/13 16:54:37 INFO DAGScheduler: Got job 2 (run at unknown:0) with 2
output partitions (allowLocal=false)
15/05/13 16:54:37 INFO DAGScheduler: Final stage: ResultStage 4(run at
unknown:0)
15/05/13 16:54:37 INFO DAGScheduler: Parents of final stage: List()
15/05/13 16:54:37 INFO Exchange: Using SparkSqlSerializer2.
15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0
15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0
15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0
^C15/05/13 16:54:42 INFO SparkContext: Invoking stop() from shutdown hook
15/05/13 16:54:42 INFO SparkUI: Stopped Spark web UI at
http://192.168.230.130:4040
15/05/13 16:54:42 INFO DAGScheduler: Stopping DAGScheduler
15/05/13 16:54:42 INFO SparkDeploySchedulerBackend: Shutting down all
executors
15/05/13 16:54:42 INFO SparkDeploySchedulerBackend: Asking each executor to
shut down
15/05/13 16:54:52 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
15/05/13 16:54:52 ERROR TaskSchedulerImpl: Lost executor 0 on
192.168.230.130: remote Rpc client disassociated
15/05/13 16:54:53 INFO AppClient$ClientActor: Executor updated:
app-20150513165402-/0 is now EXITED (Command exited with code 0)
15/05/13 16:54:53 INFO SparkDeploySchedulerBackend: Executor
app-20150513165402-/0 removed: Command exited with code 0
15/05/13 16:54:53 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 0
15/05/13 16:56:42 WARN AkkaRpcEndpointRef: Error sending message [message =
StopExecutors] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
at
org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.stop(SparkDeploySchedulerBackend.scala:95)
at
org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1404)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1562)
at
org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:551)
at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2252)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1764)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:)
at
org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2204)
at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

Should I submit an Issue for this?
What is the best way to do it?
Best


Re: how to use rdd.countApprox

2015-05-13 Thread Tathagata Das
That is not supposed to happen :/ That is probably a bug.
If you have the log4j logs, would be good to file a JIRA. This may be worth
debugging.

On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote:

 Actually I tried that before asking. However, it killed the spark context.
 :-)

 Du



   On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com
 wrote:


 That is a good question. I dont see a direct way to do that.

 You could do try the following

 val jobGroupId = group-id-based-on-current-time
 rdd.sparkContext.setJobGroup(jobGroupId)
 val approxCount = rdd.countApprox().getInitialValue   // job launched with
 the set job group
 rdd.sparkContext.cancelJobGroup(jobGroupId)   // cancel the job


 On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote:

 Hi TD,

 Do you know how to cancel the rdd.countApprox(5000) tasks after the
 timeout? Otherwise it keeps running until completion, producing results not
 used but consuming resources.

 Thanks,
 Du



   On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


  Hi TD,

 Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming
 app is standing a much better chance to complete processing each batch
 within the batch interval.

 Du


   On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com
 wrote:


 From the code it seems that as soon as the  rdd.countApprox(5000)
 returns, you can call pResult.initialValue() to get the approximate count
 at that point of time (that is after timeout). Calling
 pResult.getFinalValue() will further block until the job is over, and
 give the final correct values that you would have received by rdd.count()

 On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 HI,

 I tested the following in my streaming app and hoped to get an approximate
 count within 5 seconds. However, rdd.countApprox(5000).getFinalValue()
 seemed to always return after it finishes completely, just like
 rdd.count(), which often exceeded 5 seconds. The values for low, mean, and
 high were the same.

 val pResult = rdd.countApprox(5000)
 val bDouble = pResult.getFinalValue()
 logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong},
 mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong})

 Can any expert here help explain the right way of usage?

 Thanks,
 Du







   On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


 I have to count RDD's in a spark streaming app. When data goes large,
 count() becomes expensive. Did anybody have experience using countApprox()?
 How accurate/reliable is it?

 The documentation is pretty modest. Suppose the timeout parameter is in
 milliseconds. Can I retrieve the count value by calling getFinalValue()?
 Does it block and return only after the timeout? Or do I need to define
 onComplete/onFail handlers to extract count value from the partial results?

 Thanks,
 Du













Trouble trying to run ./spark-ec2 script

2015-05-13 Thread Su She
I'm trying to set up my own cluster and am having trouble running this script:

./spark-ec2 --key-pair=xx --identity-file=xx.pem --region=us-west-2
--zone=us-west-2c --num-slaves=1 launch my-spark-cluster

based off: https://spark.apache.org/docs/latest/ec2-scripts.html

It just tries to open the spark-ec2 file instead of trying to run it
(asks me if I want to open the file and what program to use to open
it), I'm guessing this is something small that i'm doing wrong?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Trouble trying to run ./spark-ec2 script

2015-05-13 Thread Su She
Hmm, just tried to run it again, but opened the script with python,
the cmd line seemed to pop up really quick and exited.


On Wed, May 13, 2015 at 2:06 PM, Su She suhsheka...@gmail.com wrote:
 Hi Ted, Yes I do have Python 3.5 installed. I just ran py from the
 ec2 directory and it started up the python shell.

 Thanks!


 On Wed, May 13, 2015 at 2:02 PM, Ted Yu yuzhih...@gmail.com wrote:
 Is python installed on the machine where you ran ./spark-ec2 ?

 Cheers

 On Wed, May 13, 2015 at 1:33 PM, Su She suhsheka...@gmail.com wrote:

 I'm trying to set up my own cluster and am having trouble running this
 script:

 ./spark-ec2 --key-pair=xx --identity-file=xx.pem --region=us-west-2
 --zone=us-west-2c --num-slaves=1 launch my-spark-cluster

 based off: https://spark.apache.org/docs/latest/ec2-scripts.html

 It just tries to open the spark-ec2 file instead of trying to run it
 (asks me if I want to open the file and what program to use to open
 it), I'm guessing this is something small that i'm doing wrong?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to use rdd.countApprox

2015-05-13 Thread Tathagata Das
You might get stage information through SparkListener. But I am not sure
whether you can use that information to easily kill stages.
Though i highly recommend using Spark 1.3.1 (or even Spark master). Things
move really fast between releases. 1.1.1 feels really old to me :P

TD

On Wed, May 13, 2015 at 1:25 PM, Du Li l...@yahoo-inc.com wrote:

 I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside
 dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context
 seems no longer valid, which crashes subsequent jobs.

 My spark version is 1.1.1. I will do more investigation into this issue,
 perhaps after upgrading to 1.3.1, and then file a JIRA if it persists.

 Is there a way to get stage or task id of a particular transformation or
 action on RDD and then selectively kill the stage or tasks? It would be
 necessary and useful in situations similar to countApprox.

 Thanks,
 Du



   On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com
 wrote:


 That is not supposed to happen :/ That is probably a bug.
 If you have the log4j logs, would be good to file a JIRA. This may be
 worth debugging.

 On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote:

 Actually I tried that before asking. However, it killed the spark context.
 :-)

 Du



   On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com
 wrote:


 That is a good question. I dont see a direct way to do that.

 You could do try the following

 val jobGroupId = group-id-based-on-current-time
 rdd.sparkContext.setJobGroup(jobGroupId)
 val approxCount = rdd.countApprox().getInitialValue   // job launched with
 the set job group
 rdd.sparkContext.cancelJobGroup(jobGroupId)   // cancel the job


 On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote:

 Hi TD,

 Do you know how to cancel the rdd.countApprox(5000) tasks after the
 timeout? Otherwise it keeps running until completion, producing results not
 used but consuming resources.

 Thanks,
 Du



   On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


  Hi TD,

 Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming
 app is standing a much better chance to complete processing each batch
 within the batch interval.

 Du


   On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com
 wrote:


 From the code it seems that as soon as the  rdd.countApprox(5000)
 returns, you can call pResult.initialValue() to get the approximate count
 at that point of time (that is after timeout). Calling
 pResult.getFinalValue() will further block until the job is over, and
 give the final correct values that you would have received by rdd.count()

 On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 HI,

 I tested the following in my streaming app and hoped to get an approximate
 count within 5 seconds. However, rdd.countApprox(5000).getFinalValue()
 seemed to always return after it finishes completely, just like
 rdd.count(), which often exceeded 5 seconds. The values for low, mean, and
 high were the same.

 val pResult = rdd.countApprox(5000)
 val bDouble = pResult.getFinalValue()
 logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong},
 mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong})

 Can any expert here help explain the right way of usage?

 Thanks,
 Du







   On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


 I have to count RDD's in a spark streaming app. When data goes large,
 count() becomes expensive. Did anybody have experience using countApprox()?
 How accurate/reliable is it?

 The documentation is pretty modest. Suppose the timeout parameter is in
 milliseconds. Can I retrieve the count value by calling getFinalValue()?
 Does it block and return only after the timeout? Or do I need to define
 onComplete/onFail handlers to extract count value from the partial results?

 Thanks,
 Du
















force the kafka consumer process to different machines

2015-05-13 Thread hotdog
I 'm using streaming integrated with streaming-kafka.

My kafka topic has 80 partitions, while my machines have 40 cores. I found
that when the job is running, the kafka consumer processes are only deploy
to 2 machines, the bandwidth of the 2 machines will be very very high. 

I wonder is there any way to control the kafka consumer's dispatch?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: force the kafka consumer process to different machines

2015-05-13 Thread Akhil Das
With this lowlevel Kafka API
https://github.com/dibbhatt/kafka-spark-consumer/, you can actually
specify how many receivers that you want to spawn and most of the time it
spawns evenly, usually you can put a sleep just after creating the context
for the executors to connect to the driver and then spark will evenly
distribute the receivers.

Thanks
Best Regards

On Wed, May 13, 2015 at 9:03 PM, hotdog lisend...@163.com wrote:

 I 'm using streaming integrated with streaming-kafka.

 My kafka topic has 80 partitions, while my machines have 40 cores. I found
 that when the job is running, the kafka consumer processes are only deploy
 to 2 machines, the bandwidth of the 2 machines will be very very high.

 I wonder is there any way to control the kafka consumer's dispatch?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception

2015-05-13 Thread ayan guha
Your stack trace says it can't convert date to integer. You sure about
column positions?
On 13 May 2015 21:32, Ishwardeep Singh ishwardeep.si...@impetus.co.in
wrote:

 Hi ,

 I am using Spark SQL 1.3.1.

 I have created a dataFrame using jdbc data source and am using
 saveAsTable()
 method but got the following 2 exceptions:

 java.lang.RuntimeException: Unsupported datatype DecimalType()
 at scala.sys.package$.error(package.scala:27)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
 at scala.Option.getOrElse(Option.scala:120)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
 at
 org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:391)
 at

 org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)
 at

 org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128)
 at
 org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
 at

 org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:218)
 at

 org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54)
 at

 org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54)
 at
 org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64)
 at

 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)
 at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1121)
 at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1071)
 at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1037)
 at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1015)

 java.lang.ClassCastException: java.sql.Date cannot be cast to
 java.lang.Integer
 at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
 at

 parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
 at
 parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
 at
 parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
 at
 org.apache.spark.sql.parquet.ParquetRelation2.org
 $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
 at
 

Re: Worker Spark Port

2015-05-13 Thread Cody Koeninger
I believe most ports are configurable at this point, look at

http://spark.apache.org/docs/latest/configuration.html

search for .port

On Wed, May 13, 2015 at 9:38 AM, James King jakwebin...@gmail.com wrote:

 I understated that this port value is randomly selected.

 Is there a way to enforce which spark port a Worker should use?



Re: force the kafka consumer process to different machines

2015-05-13 Thread Dibyendu Bhattacharya
or you can use this Receiver as well :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Where you can specify how many Receivers you need for your topic and it
will divides the partitions among the Receiver and return the joined stream
for you .

Say you specified 20 receivers , in that case each Receiver can handle 4
partitions and you get consumer parallelism of 20 receivers .

Dibyendu

On Wed, May 13, 2015 at 9:28 PM, 李森栋 lisend...@163.com wrote:

 thank you very much


 来自 魅族 MX4 Pro

  原始邮件 
 发件人:Cody Koeninger c...@koeninger.org
 时间:周三 5月13日 23:52
 收件人:hotdog lisend...@163.com
 抄送:user@spark.apache.org
 主题:Re: force the kafka consumer process to different machines

 I assume you're using the receiver based approach?  Have you tried the
 createDirectStream api?
 
 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
 
 If you're sticking with the receiver based approach I think your only
 option would be to create more consumer streams and union them.  That
 doesn't give you control over where they're run, but should increase the
 consumer parallelism.
 
 On Wed, May 13, 2015 at 10:33 AM, hotdog lisend...@163.com wrote:
 
  I 'm using streaming integrated with streaming-kafka.
 
  My kafka topic has 80 partitions, while my machines have 40 cores. I
 found
  that when the job is running, the kafka consumer processes are only
 deploy
  to 2 machines, the bandwidth of the 2 machines will be very very high.
 
  I wonder is there any way to control the kafka consumer's dispatch?
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: Spark SQL: preferred syntax for column reference?

2015-05-13 Thread Dean Wampler
Is the $foo or mydf(foo) or both checked at compile time to verify that
the column reference is valid? Thx.

Dean

On Wednesday, May 13, 2015, Michael Armbrust mich...@databricks.com wrote:

 I would not say that either method is preferred (neither is
 old/deprecated).  One advantage to the second is that you are referencing a
 column from a specific dataframe, instead of just providing a string that
 will be resolved much like an identifier in a SQL query.

 This means given:
 df1 = [id: int, name: string ]
 df2 = [id: int, zip: int]

 I can do something like:

 df1.join(df2, df1(id) === df2(id))

 Where as I would need aliases if I was only using strings:

 df1.as(a).join(df2.as(b), $a.id === $b.id)

 On Wed, May 13, 2015 at 9:55 AM, Diana Carroll dcarr...@cloudera.com
 javascript:_e(%7B%7D,'cvml','dcarr...@cloudera.com'); wrote:

 I'm just getting started with Spark SQL and DataFrames in 1.3.0.

 I notice that the Spark API shows a different syntax for referencing
 columns in a dataframe than the Spark SQL Programming Guide.

 For instance, the API docs for the select method show this:
 df.select($colA, $colB)


 Whereas the programming guide shows this:
 df.filter(df(name)  21).show()

 I tested and both the $column and df(column) syntax works, but I'm
 wondering which is *preferred*.  Is one the original and one a new
 feature we should be using?

 Thanks,
 Diana
 (Spark Curriculum Developer for Cloudera)




-- 
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com


Spark recovery takes long

2015-05-13 Thread NB
Hello Spark gurus,

We have a spark streaming application that is consuming from a Flume stream
and has some window operations. The input batch sizes are 1 minute and
intermediate Window operations have window sizes of 1 minute, 1 hour and 6
hours. I enabled checkpointing and Write ahead log so that we can recover
from any failures. I added explicit checkpoint Duration directives for each
of the intermediate Window streams also and tried with 2 minute duration.
However, I am running into a couple of issues at the moment:

1. The recovery time is long. Its close to 15 minutes after running the
application for only a couple hours and then restarting it to test recovery.
I tried running for shorter time i.e. about half hour and restarting and the
recovery process still took 15 minutes ( I had started with a fresh
checkpoint folder).

2. After recovery is completed, the input stream does seem to recover just
fine and continue where it had left off at the crash, however, as the
computations continue, I am getting incorrect results for every batch after
recovery e.g. some double values are NaNs etc. This does not happen if I let
the application just continue. Seems to me that some of the intermediate
streams were not recovered properly and caused some of our computations to
produce incorrect values.

Any help/insights into how to go about tackling this will be appreciated.

Thanks
NB.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-recovery-takes-long-tp22876.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark-streaming whit flume error

2015-05-13 Thread ??
Hi all,
   I want use spark-streaming with flume ,now i am in truble, I don't 
know how to configure the flume ,I use I configure flume like this :
a1.sources = r1
a1.channels = c1 c2
a1.sources.r1.type = avro
a1.sources.r1.channels = c1 c2
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4
a1.sources.r1.selector.type=replicating
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1
a1.channels.c1.transactionCapacity = 1
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 80
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1
a1.channels.c2.transactionCapacity = 1
a1.channels.c2.byteCapacityBufferPercentage = 20
a1.channels.c2.byteCapacity = 80
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /user/hxf/flume
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=0
a1.channels = c1
but when I send data to the 4 port  I get an error like this :
org.apache.avro.AvroRuntimeException: Excessively large list allocation request 
detected: 154218761 items! Connection closed.


dose anybody can help me? thanks!

Re: JavaPairRDD

2015-05-13 Thread Tristan Blakers
You could use a map() operation, but the easiest way is probably to just
call values() method on the JavaPairRDDA,B to get a JavaRDDB.

See this link:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

Tristan





On 13 May 2015 at 23:12, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I want to get  *JavaPairRDDString, String *from the tuple part of 
 *JavaPairRDDString,
 Tuple2String, String  .*

 As an example: (
 http://www.koctas.com.tr/reyon/el-aletleri/7,(0,1,0,0,0,0,0,0,46551)) in
 my *JavaPairRDDString, Tuple2String, String *and I want to get
 *( (46551), (0,1,0,0,0,0,0,0) )*

 I try to split tuple._2() and create new JavaPairRDD but I can't.
 How can I get that ?

 Have a nice day
 yasemin
 --
 hiç ender hiç



Re: --jars works in yarn-client but not yarn-cluster mode, why?

2015-05-13 Thread Fengyun RAO
I look into the Environment in both modes.

yarn-client:
spark.jars
local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar,file:/home/xxx/my-app.jar
yarn-cluster:
spark.yarn.secondary.jars
local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
I wonder why htrace exists in spark.yarn.secondary.jars but still not
found in URLClassLoader.

I tried both local and file mode for the jar, still the same error.


2015-05-14 11:37 GMT+08:00 Fengyun RAO raofeng...@gmail.com:

 Hadoop version: CDH 5.4.

 We need to connect to HBase, thus need extra
 /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
 dependency.

 It works in yarn-client mode:
 spark-submit --class xxx.xxx.MyApp --master yarn-client --num-executors
 10 --executor-memory 10g --jars
 /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
 my-app.jar /input /output

 However, if we change yarn-client to yarn-cluster', it throws an
 ClassNotFoundException (actually the class exists in
 htrace-core-3.1.0-incubating.jar):

 Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace
   at 
 org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:218)
   at org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:481)
   at 
 org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65)
   at 
 org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:86)
   at 
 org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:850)
   at 
 org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.init(ConnectionManager.java:635)
   ... 21 more
 Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace
   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
   at java.security.AccessController.doPrivileged(Native Method)
   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)


 Why --jars doesn't work in yarn-cluster mode? How to add extra dependency in 
 yarn-cluster mode?





--jars works in yarn-client but not yarn-cluster mode, why?

2015-05-13 Thread Fengyun RAO
Hadoop version: CDH 5.4.

We need to connect to HBase, thus need extra
/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
dependency.

It works in yarn-client mode:
spark-submit --class xxx.xxx.MyApp --master yarn-client --num-executors 10
--executor-memory 10g --jars
/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
my-app.jar /input /output

However, if we change yarn-client to yarn-cluster', it throws an
ClassNotFoundException (actually the class exists in
htrace-core-3.1.0-incubating.jar):

Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace
at 
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:218)
at org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:481)
at 
org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65)
at 
org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:86)
at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:850)
at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.init(ConnectionManager.java:635)
... 21 more
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)


Why --jars doesn't work in yarn-cluster mode? How to add extra
dependency in yarn-cluster mode?


Re: JavaPairRDD

2015-05-13 Thread Yasemin Kaya
Thank you Tristan. It is totally what I am looking for :)


2015-05-14 5:05 GMT+03:00 Tristan Blakers tris...@blackfrog.org:

 You could use a map() operation, but the easiest way is probably to just
 call values() method on the JavaPairRDDA,B to get a JavaRDDB.

 See this link:

 https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

 Tristan





 On 13 May 2015 at 23:12, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I want to get  *JavaPairRDDString, String *from the tuple part of 
 *JavaPairRDDString,
 Tuple2String, String  .*

 As an example: (
 http://www.koctas.com.tr/reyon/el-aletleri/7,(0,1,0,0,0,0,0,0,46551)) in
 my *JavaPairRDDString, Tuple2String, String *and I want to get
 *( (46551), (0,1,0,0,0,0,0,0) )*

 I try to split tuple._2() and create new JavaPairRDD but I can't.
 How can I get that ?

 Have a nice day
 yasemin
 --
 hiç ender hiç





-- 
hiç ender hiç


spark sql hive-shims

2015-05-13 Thread Lior Chaga
Hi,

Using spark sql with HiveContext. Spark version is 1.3.1
When running local spark everything works fine. When running on spark
cluster I get ClassNotFoundError org.apache.hadoop.hive.shims.Hadoop23Shims.
This class belongs to hive-shims-0.23, and is a runtime dependency for
spark-hive:

[INFO] org.apache.spark:spark-hive_2.10:jar:1.3.1
[INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile
[INFO] |  +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile
[INFO] |  |  +-
org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile
[INFO] |  |  +-
org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime
[INFO] |  |  +-
org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile
[INFO] |  |  +-
org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime
[INFO] |  |  \-
org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime



My spark distribution is:
make-distribution.sh --tgz  -Phive -Phive-thriftserver -DskipTests


If I try to add this dependency to my driver project, then the exception
disappears, but then the task is stuck when registering an rdd as a table
(I get timeout after 30 seconds). I should emphasize that the first rdd I
register as a table is a very small one (about 60K row), and as I said - it
runs swiftly in local.
I suspect maybe other dependencies are missing, but they fail silently.

Would be grateful if anyone knows how to solve it.

Lior


How to run multiple jobs in one sparkcontext from separate threads in pyspark?

2015-05-13 Thread MEETHU MATHEW
Hi all,
 Quote Inside a given Spark application (SparkContext instance), multiple 
parallel jobs can run simultaneously if they were submitted from separate 
threads.  
How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? 
I found some examples in scala and java, but couldn't find python code. Can 
anyone help me with a pyspark example? 
Thanks  Regards,
Meethu M

Spark SQL: preferred syntax for column reference?

2015-05-13 Thread Diana Carroll
I'm just getting started with Spark SQL and DataFrames in 1.3.0.

I notice that the Spark API shows a different syntax for referencing
columns in a dataframe than the Spark SQL Programming Guide.

For instance, the API docs for the select method show this:
df.select($colA, $colB)


Whereas the programming guide shows this:
df.filter(df(name)  21).show()

I tested and both the $column and df(column) syntax works, but I'm
wondering which is *preferred*.  Is one the original and one a new feature
we should be using?

Thanks,
Diana
(Spark Curriculum Developer for Cloudera)


Word2Vec with billion-word corpora

2015-05-13 Thread Shilad Sen
Hi all,

I'm experimenting with Spark's Word2Vec implementation for a relatively
large (5B word, vocabulary size 4M) corpora. Has anybody had success
running it at this scale?

-Shilad

-- 
Shilad W. Sen
Associate Professor
Mathematics, Statistics, and Computer Science Dept.
Macalester College
s...@macalester.edu
http://www.shilad.com
https://www.linkedin.com/in/shilad
651-696-6273


Re: Kryo serialization of classes in additional jars

2015-05-13 Thread Akshat Aranya
I cherry-picked this commit into my local 1.2 branch.  It fixed the problem
with setting spark.serializer, but I get a similar problem with
spark.closure.serializer

org.apache.spark.SparkException: Failed to register classes with Kryo
  at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:100)
  at
org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:152)
  at
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:114)
  at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:73)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
  at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at
org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: com.foo.Foo
  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:270)
  at
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:93)
  at
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:93)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:93)
  ... 21 more


On Mon, May 4, 2015 at 5:43 PM, Akshat Aranya aara...@gmail.com wrote:

 Actually, after some digging, I did find a JIRA for it: SPARK-5470.
 The fix for this has gone into master, but it isn't in 1.2.

 On Mon, May 4, 2015 at 2:47 PM, Imran Rashid iras...@cloudera.com wrote:
  Oh, this seems like a real pain.  You should file a jira, I didn't see an
  open issue -- if nothing else just to document the issue.
 
  As you've noted, the problem is that the serializer is created
 immediately
  in the executors, right when the SparkEnv is created, but the other jars
  aren't downloaded later.  I think you could workaround with some
 combination
  of pushing the jars to the cluster manually, and then using
  spark.executor.extraClassPath
 
  On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya aara...@gmail.com
 wrote:
 
  Hi,
 
  Is it possible to register kryo serialization for classes contained in
  jars that are added with spark.jars?  In my experiment it doesn't
 seem to
  work, likely because the class registration happens before the jar is
  shipped to the executor and added to the classloader.  Here's the
 general
  idea of what I want to do:
 
 val sparkConf = new SparkConf(true)
.set(spark.jars, foo.jar)
.setAppName(foo)
.set(spark.serializer,
  org.apache.spark.serializer.KryoSerializer)
 
  // register classes contained in foo.jar
  sparkConf.registerKryoClasses(Array(
classOf[com.foo.Foo],
classOf[com.foo.Bar]))
 
 



Re: Backward compatibility with org.apache.spark.sql.api.java.Row class

2015-05-13 Thread Michael Armbrust
Sorry for missing that in the upgrade guide.  As part of unifying the Java
and Scala interfaces we got rid of the java specific row.  You are correct
in assuming that you want to use row in org.apache.spark.sql from both
Scala and Java now.

On Wed, May 13, 2015 at 2:48 AM, Emerson Castañeda eme...@gmail.com wrote:

 Hello everyone

 I'm adopting the latest version of Apache Spark on my project, moving from
 *1.2.x* to *1.3.x*, and the only significative incompatibility for now is
 related to the *Row *class.

 Any idea about what did happen to* org.apache.spark.sql.api.java.Row*
 class in Apache Spark 1.3 ?


 Migration guide on Spark SQL and DataFrames - Spark 1.3.0 Documentation
 does not mention anything about it.
 https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13


 Looking around there is a new *Row *Interface on *org.apache.spark.sql
 package,* but I'm not 100% sure if  this is related to my question and
 about how to proceed with the upgrading,

 Note that this new interface *Row* was not available in the previous
 Spark's versions *1.0.0 1.1.0 1.2.0* and even *1.3.0*

 Thanks in ahead

 Emerson



Re: how to use rdd.countApprox

2015-05-13 Thread Du Li
 Hi TD,
Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app 
is standing a much better chance to complete processing each batch within the 
batch interval.
Du


 On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com 
wrote:
   

 From the code it seems that as soon as the  rdd.countApprox(5000) returns, 
you can call pResult.initialValue() to get the approximate count at that 
point of time (that is after timeout). Calling pResult.getFinalValue() will 
further block until the job is over, and give the final correct values that you 
would have received by rdd.count()
On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote:

HI,
I tested the following in my streaming app and hoped to get an approximate 
count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed 
to always return after it finishes completely, just like rdd.count(), which 
often exceeded 5 seconds. The values for low, mean, and high were the same.
val pResult = rdd.countApprox(5000)val bDouble = 
pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): 
low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, 
high=${bDouble.high.toLong})
Can any expert here help explain the right way of usage?
Thanks,Du


 



 On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID 
wrote:
   

 I have to count RDD's in a spark streaming app. When data goes large, count() 
becomes expensive. Did anybody have experience using countApprox()? How 
accurate/reliable is it? 
The documentation is pretty modest. Suppose the timeout parameter is in 
milliseconds. Can I retrieve the count value by calling getFinalValue()? Does 
it block and return only after the timeout? Or do I need to define 
onComplete/onFail handlers to extract count value from the partial results?
Thanks,Du

   



  

data schema and serialization format suggestions

2015-05-13 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi all,

I want to get a general idea about best practices around data format
serialization. Currently, I am using avro as the data serialization
format but the emitted types aren't very scala friendly. So, I was
wondering how others deal with this problem.

At the high level, the requirements are fairly simple:

1. Simple and easy to understand and extend.
2. Usable in places other than spark. ( I would want to use them in
other applications and tools ).
3. Ability to play nice with parquet and Kafka (nice to have).

- -- Ankur Chauhan
-BEGIN PGP SIGNATURE-

iQEcBAEBAgAGBQJVU4spAAoJEOSJAMhvLp3LYYQH/A3AvjBhclydO4pqxoLzUSiK
e844eteYEHqLSiwrFKxG8I8uLDLRYVvln0XnmKnEqoaGwNbqC5IbqovNKE8FwFzk
F7XU30O7CEgExBrSXsv7nSFq/BBSELnCGpuszf92lF2XRaFtOz0kJZ7YOf+IIZEn
mV4K4IodJhkCzX3oeAO/3PwzMlUXTV+qDDA9pa6pwrQ2TKRMYs8HTzAf526cL5/F
0RdFHse1JjOWDiiaCCI1aHNbtRM/TJvsyGcLlqNDjeOVFcTFcKU3QQLxydfqNRK6
JYn9jY/NRqseePaW4L9BqEBSQ7aTNRk1P88r5+8FFes2qImkgQ5+VIw+DlwEePM=
=LwBs
-END PGP SIGNATURE-

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on Mesos

2015-05-13 Thread Stephen Carman
Sander,

I eventually solved this problem via the --[no-]switch_user flag, which is set 
to true by default. I set this to false, which would have the user that owns 
the process run the job, otherwise it was my username (scarman)
running the job, which would fail because obviously my username didn’t exist 
there. When ran as root, it ran totally fine with no problems what so ever.

Hopefully this works for you too,

Steve
 On May 13, 2015, at 11:45 AM, Sander van Dijk sgvand...@gmail.com wrote:

 Hey all,

 I seem to be experiencing the same thing as Stephen. I run Spark 1.2.1 with 
 Mesos 0.22.1, with Spark coming from the spark-1.2.1-bin-hadoop2.4.tgz 
 prebuilt package, and Mesos installed from the Mesosphere repositories. I 
 have been running with Spark standalone successfully for a while and now 
 trying to setup Mesos. Mesos is up and running, the UI at port 5050 reports 
 all slaves alive. I then run Spark shell with: `spark-shell --master 
 mesos://1.1.1.1:5050` (with 1.1.1.1 the master's ip address), which starts up 
 fine, with output:

 I0513 15:02:45.340287 28804 sched.cpp:448] Framework registered with 
 20150512-150459-2618695596-5050-3956-0009 15/05/13 15:02:45 INFO 
 mesos.MesosSchedulerBackend: Registered as framework ID 
 20150512-150459-2618695596-5050-3956-0009

 and the framework shows up in the Mesos UI. Then when trying to run something 
 (e.g. 'val rdd = sc.txtFile(path); rdd.count') fails with lost executors. 
 In /var/log/mesos-slave.ERROR on the slave instances there are entries like:

 E0513 14:57:01.198995 13077 slave.cpp:3112] Container 
 'eaf33d36-dde5-498a-9ef1-70138810a38c' for executor 
 '20150512-145720-2618695596-5050-3082-S10' of framework 
 '20150512-150459-2618695596-5050-3956-0009' failed to start: Failed to 
 execute mesos-fetcher: Failed to chown work directory

 From what I can find, the work directory is in /tmp/mesos, where indeed I see 
 a directory structure with executor and framework IDs, with at the leaves 
 stdout and stderr files of size 0. Everything there is owned by root, but I 
 assume the processes are also run by root, so any chowning in there should be 
 possible.

 I was thinking maybe it fails to fetch the Spark package executor? I uploaded 
 spark-1.2.1-bin-hadoop2.4.tgz to hdfs, SPARK_EXECUTOR_URI is set in 
 spark-env.sh, and in the Environment section of the web UI I see this picked 
 up in the spark.executor.uriparameter. I checked and the URI is reachable by 
 the slaves: an `hdfs dfs -stat $SPARK_EXECUTOR_URI` is successful.

 Any pointers?

 Many thanks,
 Sander

 On Fri, May 1, 2015 at 8:35 AM Tim Chen t...@mesosphere.io wrote:
 Hi Stephen,

 It looks like Mesos slave was most likely not able to launch some mesos 
 helper processes (fetcher probably?).

 How did you install Mesos? Did you build from source yourself?

 Please install Mesos through a package or actually from source run make 
 install and run from the installed binary.

 Tim

 On Mon, Apr 27, 2015 at 11:11 AM, Stephen Carman scar...@coldlight.com 
 wrote:
 So I installed spark on each of the slaves 1.3.1 built with hadoop2.6 I just 
 basically got the pre-built from the spark website…

 I placed those compiled spark installs on each slave at /opt/spark

 My spark properties seem to be getting picked up on my side fine…

 Screen Shot 2015-04-27 at 10.30.01 AM.png
 The framework is registered in Mesos, it shows up just fine, it doesn’t 
 matter if I turn off the executor uri or not, but I always get the same error…

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in 
 stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 
 (TID 23, 10.253.1.117): ExecutorLostFailure (executor 
 20150424-104711-1375862026-5050-20113-S1 lost)
 Driver stacktrace:
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at 
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at 
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at 
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

 These boxes 

Re: Spark on Mesos

2015-05-13 Thread Tim Chen
Hi Stephen,

You probably didn't run the Spark driver/shell as root, as Mesos scheduler
will pick up your local user and tries to impersonate as the same user and
chown the directory before executing any task.

If you try to run Spark driver as root it should resolve the problem. No
switch user can also work as it won't try to switch user for you.

Tim

On Wed, May 13, 2015 at 10:50 AM, Stephen Carman scar...@coldlight.com
wrote:

 Sander,

 I eventually solved this problem via the --[no-]switch_user flag, which is
 set to true by default. I set this to false, which would have the user that
 owns the process run the job, otherwise it was my username (scarman)
 running the job, which would fail because obviously my username didn’t
 exist there. When ran as root, it ran totally fine with no problems what so
 ever.

 Hopefully this works for you too,

 Steve
  On May 13, 2015, at 11:45 AM, Sander van Dijk sgvand...@gmail.com
 wrote:
 
  Hey all,
 
  I seem to be experiencing the same thing as Stephen. I run Spark 1.2.1
 with Mesos 0.22.1, with Spark coming from the spark-1.2.1-bin-hadoop2.4.tgz
 prebuilt package, and Mesos installed from the Mesosphere repositories. I
 have been running with Spark standalone successfully for a while and now
 trying to setup Mesos. Mesos is up and running, the UI at port 5050 reports
 all slaves alive. I then run Spark shell with: `spark-shell --master
 mesos://1.1.1.1:5050` (with 1.1.1.1 the master's ip address), which
 starts up fine, with output:
 
  I0513 15:02:45.340287 28804 sched.cpp:448] Framework registered with
 20150512-150459-2618695596-5050-3956-0009 15/05/13 15:02:45 INFO
 mesos.MesosSchedulerBackend: Registered as framework ID
 20150512-150459-2618695596-5050-3956-0009
 
  and the framework shows up in the Mesos UI. Then when trying to run
 something (e.g. 'val rdd = sc.txtFile(path); rdd.count') fails with lost
 executors. In /var/log/mesos-slave.ERROR on the slave instances there are
 entries like:
 
  E0513 14:57:01.198995 13077 slave.cpp:3112] Container
 'eaf33d36-dde5-498a-9ef1-70138810a38c' for executor
 '20150512-145720-2618695596-5050-3082-S10' of framework
 '20150512-150459-2618695596-5050-3956-0009' failed to start: Failed to
 execute mesos-fetcher: Failed to chown work directory
 
  From what I can find, the work directory is in /tmp/mesos, where indeed
 I see a directory structure with executor and framework IDs, with at the
 leaves stdout and stderr files of size 0. Everything there is owned by
 root, but I assume the processes are also run by root, so any chowning in
 there should be possible.
 
  I was thinking maybe it fails to fetch the Spark package executor? I
 uploaded spark-1.2.1-bin-hadoop2.4.tgz to hdfs, SPARK_EXECUTOR_URI is set
 in spark-env.sh, and in the Environment section of the web UI I see this
 picked up in the spark.executor.uriparameter. I checked and the URI is
 reachable by the slaves: an `hdfs dfs -stat $SPARK_EXECUTOR_URI` is
 successful.
 
  Any pointers?
 
  Many thanks,
  Sander
 
  On Fri, May 1, 2015 at 8:35 AM Tim Chen t...@mesosphere.io wrote:
  Hi Stephen,
 
  It looks like Mesos slave was most likely not able to launch some mesos
 helper processes (fetcher probably?).
 
  How did you install Mesos? Did you build from source yourself?
 
  Please install Mesos through a package or actually from source run make
 install and run from the installed binary.
 
  Tim
 
  On Mon, Apr 27, 2015 at 11:11 AM, Stephen Carman scar...@coldlight.com
 wrote:
  So I installed spark on each of the slaves 1.3.1 built with hadoop2.6 I
 just basically got the pre-built from the spark website…
 
  I placed those compiled spark installs on each slave at /opt/spark
 
  My spark properties seem to be getting picked up on my side fine…
 
  Screen Shot 2015-04-27 at 10.30.01 AM.png
  The framework is registered in Mesos, it shows up just fine, it doesn’t
 matter if I turn off the executor uri or not, but I always get the same
 error…
 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
 6 in stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage
 0.0 (TID 23, 10.253.1.117): ExecutorLostFailure (executor
 20150424-104711-1375862026-5050-20113-S1 lost)
  Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
  at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
  at
 

Re: Spark on Mesos

2015-05-13 Thread Stephen Carman
Yup, exactly as Tim mentioned on it too. I went back and tried what you just 
suggested and that was also perfectly fine.

Steve

On May 13, 2015, at 1:58 PM, Tim Chen 
t...@mesosphere.iomailto:t...@mesosphere.io wrote:

Hi Stephen,

You probably didn't run the Spark driver/shell as root, as Mesos scheduler will 
pick up your local user and tries to impersonate as the same user and chown the 
directory before executing any task.

If you try to run Spark driver as root it should resolve the problem. No switch 
user can also work as it won't try to switch user for you.

Tim

On Wed, May 13, 2015 at 10:50 AM, Stephen Carman 
scar...@coldlight.commailto:scar...@coldlight.com wrote:
Sander,

I eventually solved this problem via the --[no-]switch_user flag, which is set 
to true by default. I set this to false, which would have the user that owns 
the process run the job, otherwise it was my username (scarman)
running the job, which would fail because obviously my username didn’t exist 
there. When ran as root, it ran totally fine with no problems what so ever.

Hopefully this works for you too,

Steve
 On May 13, 2015, at 11:45 AM, Sander van Dijk 
 sgvand...@gmail.commailto:sgvand...@gmail.com wrote:

 Hey all,

 I seem to be experiencing the same thing as Stephen. I run Spark 1.2.1 with 
 Mesos 0.22.1, with Spark coming from the spark-1.2.1-bin-hadoop2.4.tgz 
 prebuilt package, and Mesos installed from the Mesosphere repositories. I 
 have been running with Spark standalone successfully for a while and now 
 trying to setup Mesos. Mesos is up and running, the UI at port 5050 reports 
 all slaves alive. I then run Spark shell with: `spark-shell --master 
 mesos://1.1.1.1:5050` (with 1.1.1.1 the master's ip address), which starts up 
 fine, with output:

 I0513 15:02:45.340287 28804 sched.cpp:448] Framework registered with 
 20150512-150459-2618695596-5050-3956-0009 15/05/13 15:02:45 INFO 
 mesos.MesosSchedulerBackend: Registered as framework ID 
 20150512-150459-2618695596-5050-3956-0009

 and the framework shows up in the Mesos UI. Then when trying to run something 
 (e.g. 'val rdd = sc.txtFile(path); rdd.count') fails with lost executors. 
 In /var/log/mesos-slave.ERROR on the slave instances there are entries like:

 E0513 14:57:01.198995 13077 slave.cpp:3112] Container 
 'eaf33d36-dde5-498a-9ef1-70138810a38c' for executor 
 '20150512-145720-2618695596-5050-3082-S10' of framework 
 '20150512-150459-2618695596-5050-3956-0009' failed to start: Failed to 
 execute mesos-fetcher: Failed to chown work directory

 From what I can find, the work directory is in /tmp/mesos, where indeed I see 
 a directory structure with executor and framework IDs, with at the leaves 
 stdout and stderr files of size 0. Everything there is owned by root, but I 
 assume the processes are also run by root, so any chowning in there should be 
 possible.

 I was thinking maybe it fails to fetch the Spark package executor? I uploaded 
 spark-1.2.1-bin-hadoop2.4.tgz to hdfs, SPARK_EXECUTOR_URI is set in 
 spark-env.sh, and in the Environment section of the web UI I see this picked 
 up in the spark.executor.uriparameter. I checked and the URI is reachable by 
 the slaves: an `hdfs dfs -stat $SPARK_EXECUTOR_URI` is successful.

 Any pointers?

 Many thanks,
 Sander

 On Fri, May 1, 2015 at 8:35 AM Tim Chen 
 t...@mesosphere.iomailto:t...@mesosphere.io wrote:
 Hi Stephen,

 It looks like Mesos slave was most likely not able to launch some mesos 
 helper processes (fetcher probably?).

 How did you install Mesos? Did you build from source yourself?

 Please install Mesos through a package or actually from source run make 
 install and run from the installed binary.

 Tim

 On Mon, Apr 27, 2015 at 11:11 AM, Stephen Carman 
 scar...@coldlight.commailto:scar...@coldlight.com wrote:
 So I installed spark on each of the slaves 1.3.1 built with hadoop2.6 I just 
 basically got the pre-built from the spark website…

 I placed those compiled spark installs on each slave at /opt/spark

 My spark properties seem to be getting picked up on my side fine…

 Screen Shot 2015-04-27 at 10.30.01 AM.png
 The framework is registered in Mesos, it shows up just fine, it doesn’t 
 matter if I turn off the executor uri or not, but I always get the same error…

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in 
 stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 
 (TID 23, 10.253.1.117): ExecutorLostFailure (executor 
 20150424-104711-1375862026-5050-20113-S1 lost)
 Driver stacktrace:
 at 
 org.apache.spark.scheduler.DAGScheduler.orghttp://org.apache.spark.scheduler.dagscheduler.org/$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at 
 

Word2Vec with billion-word corpora

2015-05-13 Thread Shilad Sen
Hi all,

I'm experimenting with Spark's Word2Vec implementation for a relatively
large (5B word, vocabulary size 4M, 400-dimensional vectors) corpora. Has
anybody had success running it at this scale?

Thanks in advance for your guidance!

-Shilad

-- 
Shilad W. Sen
Associate Professor
Mathematics, Statistics, and Computer Science Dept.
Macalester College
s...@macalester.edu
http://www.shilad.com
https://www.linkedin.com/in/shilad
651-696-6273


Re: Worker Spark Port

2015-05-13 Thread James King
Indeed, many thanks.

On Wednesday, 13 May 2015, Cody Koeninger c...@koeninger.org wrote:

 I believe most ports are configurable at this point, look at

 http://spark.apache.org/docs/latest/configuration.html

 search for .port

 On Wed, May 13, 2015 at 9:38 AM, James King jakwebin...@gmail.com
 javascript:_e(%7B%7D,'cvml','jakwebin...@gmail.com'); wrote:

 I understated that this port value is randomly selected.

 Is there a way to enforce which spark port a Worker should use?





Re: how to use rdd.countApprox

2015-05-13 Thread Du Li
Hi TD,
Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? 
Otherwise it keeps running until completion, producing results not used but 
consuming resources.
Thanks,Du 


 On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID 
wrote:
   

  Hi TD,
Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app 
is standing a much better chance to complete processing each batch within the 
batch interval.
Du


 On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com 
wrote:
   

 From the code it seems that as soon as the  rdd.countApprox(5000) returns, 
you can call pResult.initialValue() to get the approximate count at that 
point of time (that is after timeout). Calling pResult.getFinalValue() will 
further block until the job is over, and give the final correct values that you 
would have received by rdd.count()
On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote:

HI,
I tested the following in my streaming app and hoped to get an approximate 
count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed 
to always return after it finishes completely, just like rdd.count(), which 
often exceeded 5 seconds. The values for low, mean, and high were the same.
val pResult = rdd.countApprox(5000)val bDouble = 
pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): 
low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, 
high=${bDouble.high.toLong})
Can any expert here help explain the right way of usage?
Thanks,Du


 



 On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID 
wrote:
   

 I have to count RDD's in a spark streaming app. When data goes large, count() 
becomes expensive. Did anybody have experience using countApprox()? How 
accurate/reliable is it? 
The documentation is pretty modest. Suppose the timeout parameter is in 
milliseconds. Can I retrieve the count value by calling getFinalValue()? Does 
it block and return only after the timeout? Or do I need to define 
onComplete/onFail handlers to extract count value from the partial results?
Thanks,Du

   



   

  

Re: how to set random seed

2015-05-13 Thread ayan guha
Easiest way is to broadcast it.
On 13 May 2015 10:40, Charles Hayden charles.hay...@atigeo.com wrote:

  In pySpark, I am writing a map with a lambda that calls random.shuffle.
 For testing, I want to be able to give it a seed, so that successive runs
 will produce the same shuffle.
  I am looking for a way to set this same random seed once on each worker.
 Is there any simple way to do it?​





回复:RE: 回复:Re: sparksql running slow while joining_2_tables.

2015-05-13 Thread luohui20001
Hi Hao:  I tried broadcastjoin with following steps, and found that my 
query is still running slow ,not very sure if I'm doing right with 
broadcastjoin:1.add spark.sql.autoBroadcastJoinThreshold   104857600(100MB) 
in conf/spark-default.conf. 100MB is larger than any of my 2 tables.2.start 
bin/spark-sql and confirm this setting worked both in environment page of my 
spark cluster web UI and sparksql console;3.run ANALYZE TABLE db1 COMPUTE 
STATISTICS noscan and ANALYZE TABLE sample3 COMPUTE STATISTICS noscan and 
cache both these tables;
4.use extend plan my query and confirmed broadcasthashjoin is used in the 
physical plan;
5.run my query select a.chrname,a.startpoint,a.endpoint, a.piece from db1 a 
join sample3 b on (a.chrname = b.name) where (b.startpoint  a.startpoint + 25) 
and b.endpoint = a.endpoint;
So, if there is mistakes in my operation pls point out.thanks.






 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Cheng, Hao hao.ch...@intel.com
收件人:Cheng, Hao hao.ch...@intel.com, luohui20...@sina.com 
luohui20...@sina.com, Olivier Girardot ssab...@gmail.com, user 
user@spark.apache.org
主题:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 08点38分





Or, have you ever try broadcast join?
 


From: Cheng, Hao [mailto:hao.ch...@intel.com]


Sent: Tuesday, May 5, 2015 8:33 AM

To: luohui20...@sina.com; Olivier Girardot; user

Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.


 
Can you print out the physical plan?
 
EXPLAIN SELECT xxx…
 
From:
luohui20...@sina.com [mailto:luohui20...@sina.com]


Sent: Monday, May 4, 2015 9:08 PM

To: Olivier Girardot; user

Subject: 回复:Re: sparksql running slow while joining 2 tables.
 
hi Olivier
spark1.3.1, with java1.8.0.45
and add 2 pics .
it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...
but this issue still happens.
 





 

Thanksamp;Best regards!

罗辉 San.Luo

 


- 
原始邮件 -

发件人:Olivier Girardot ssab...@gmail.com

收件人:luohui20...@sina.com, user user@spark.apache.org

主题:Re: sparksql running slow while joining 2 tables.

日期:2015年05月04日 20点46分

 

Hi, 

What is you Spark version ?

 


Regards, 


 


Olivier.


 

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit :

hi guys
when i am running a sql  like select 
a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.name =
b.name) where (b.startpoint  a.startpoint + 25); I found sparksql running 
slow in minutes which may caused by very long GC and shuffle time.
 
   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.
   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.
   any advises? thank you guys.
 
 





 

Thanksamp;Best regards!

罗辉 San.Luo



-

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org

For additional commands, e-mail: 
user-h...@spark.apache.org