Re: [ERROR] Insufficient Space
Awesome. -Vadim ᐧ On Fri, Jun 19, 2015 at 8:30 PM, Kelly, Jonathan jonat...@amazon.com wrote: Yep, I'm on the EMR team at Amazon, and I was at the Spark Summit. ;-) So of course I'm biased toward EMR, even over EC2. I'm not sure if there's a way to resize an EC2 Spark cluster, or at least if there is, it probably still requires manual work on the new nodes. This would be the advantage of EMR over EC2, as we take care of all of that configuration. ~ Jonathan From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, June 19, 2015 at 5:21 PM To: Jonathan Kelly jonat...@amazon.com Cc: user user@spark.apache.org Subject: Re: [ERROR] Insufficient Space Thanks Jonathan. I should totally move to EMR. Spark on EMR was announced at Spark Summit! There's no easy way to resize the cluster on EC2. You basically have to destroy it and launch a new one. Right? -Vadim ᐧ On Fri, Jun 19, 2015 at 3:41 PM, Kelly, Jonathan jonat...@amazon.com wrote: Would you be able to use Spark on EMR rather than on EC2? EMR clusters allow easy resizing of the cluster, and EMR also now supports Spark 1.3.1 as of EMR AMI 3.8.0. See http://aws.amazon.com/emr/spark ~ Jonathan From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, June 19, 2015 at 7:15 AM To: user user@spark.apache.org Subject: [ERROR] Insufficient Space Hello Spark Experts, I've been running a standalone Spark cluster on EC2 for a few months now, and today I get this error: IOError: [Errno 28] No space left on device Spark assembly has been built with Hive, including Datanucleus jars on classpath OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file I guess I need to resize the cluster. What's the best way to do that? Thanks, Vadim ᐧ
Re: [ERROR] Insufficient Space
Thanks Jonathan. I should totally move to EMR. Spark on EMR was announced at Spark Summit! There's no easy way to resize the cluster on EC2. You basically have to destroy it and launch a new one. Right? -Vadim ᐧ On Fri, Jun 19, 2015 at 3:41 PM, Kelly, Jonathan jonat...@amazon.com wrote: Would you be able to use Spark on EMR rather than on EC2? EMR clusters allow easy resizing of the cluster, and EMR also now supports Spark 1.3.1 as of EMR AMI 3.8.0. See http://aws.amazon.com/emr/spark ~ Jonathan From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, June 19, 2015 at 7:15 AM To: user user@spark.apache.org Subject: [ERROR] Insufficient Space Hello Spark Experts, I've been running a standalone Spark cluster on EC2 for a few months now, and today I get this error: IOError: [Errno 28] No space left on device Spark assembly has been built with Hive, including Datanucleus jars on classpath OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file I guess I need to resize the cluster. What's the best way to do that? Thanks, Vadim ᐧ
[ERROR] Insufficient Space
Hello Spark Experts, I've been running a standalone Spark cluster on EC2 for a few months now, and today I get this error: IOError: [Errno 28] No space left on device Spark assembly has been built with Hive, including Datanucleus jars on classpath OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file I guess I need to resize the cluster. What's the best way to do that? Thanks, Vadim ᐧ
Re: Is anyone using Amazon EC2?
Yes, we're running Spark on EC2. Will transition to EMR soon. -Vadim ᐧ On Sat, May 23, 2015 at 2:22 PM, Johan Beisser j...@caustic.org wrote: Yes. We're looking at bootstrapping in EMR... On Sat, May 23, 2015 at 07:21 Joe Wass jw...@crossref.org wrote: I used Spark on EC2 a while ago
Re: textFileStream Question
This is cool. Thanks Akhil. ᐧ On Sun, May 17, 2015 at 11:25 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With file timestamp, you can actually see the finding new files logic from here https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L172 Thanks Best Regards On Fri, May 15, 2015 at 2:25 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: How does textFileStream work behind the scenes? How does Spark Streaming know what files are new and need to be processed? Is it based on time stamp, file name? Thanks, Vadim ᐧ
Re: DStream Union vs. StreamingContext Union
@TD How do I file a JIRA? ᐧ On Tue, May 12, 2015 at 2:06 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I wonder that may be a bug in the Python API. Please file it as a JIRA along with sample code to reproduce it and sample output you get. On Tue, May 12, 2015 at 10:00 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: @TD I kept getting an empty RDD (i.e. rdd.take(1) was False). ᐧ On Tue, May 12, 2015 at 12:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: @Vadim What happened when you tried unioning using DStream.union in python? TD On Tue, May 12, 2015 at 9:53 AM, Evo Eftimov evo.efti...@isecc.com wrote: I can confirm it does work in Java *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:53 PM *To:* Evo Eftimov *Cc:* Saisai Shao; user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Evo. I tried chaining Dstream unions like what you have and it didn't work for me. But passing multiple arguments to StreamingContext.union worked fine. Any idea why? I am using Python, BTW. ᐧ On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com wrote: You can also union multiple DstreamRDDs in this way DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3) etc etc Ps: the API is not “redundant” it offers several ways for achivieving the same thing as a convenience depending on the situation *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:37 PM *To:* Saisai Shao *Cc:* user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Saisai. That makes sense. Just seems redundant to have both. ᐧ On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote: DStream.union can only union two DStream, one is itself. While StreamingContext.union can union an array of DStreams, internally DStream.union is a special case of StreamingContext.union: def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) So there's no difference, if you want to union more than two DStreams, just use the one in StreamingContext, otherwise, both two APIs are fine. 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com : Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
textFileStream Question
How does textFileStream work behind the scenes? How does Spark Streaming know what files are new and need to be processed? Is it based on time stamp, file name? Thanks, Vadim ᐧ
Re: DStream Union vs. StreamingContext Union
@TD I kept getting an empty RDD (i.e. rdd.take(1) was False). ᐧ On Tue, May 12, 2015 at 12:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: @Vadim What happened when you tried unioning using DStream.union in python? TD On Tue, May 12, 2015 at 9:53 AM, Evo Eftimov evo.efti...@isecc.com wrote: I can confirm it does work in Java *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:53 PM *To:* Evo Eftimov *Cc:* Saisai Shao; user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Evo. I tried chaining Dstream unions like what you have and it didn't work for me. But passing multiple arguments to StreamingContext.union worked fine. Any idea why? I am using Python, BTW. ᐧ On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com wrote: You can also union multiple DstreamRDDs in this way DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3) etc etc Ps: the API is not “redundant” it offers several ways for achivieving the same thing as a convenience depending on the situation *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:37 PM *To:* Saisai Shao *Cc:* user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Saisai. That makes sense. Just seems redundant to have both. ᐧ On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote: DStream.union can only union two DStream, one is itself. While StreamingContext.union can union an array of DStreams, internally DStream.union is a special case of StreamingContext.union: def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) So there's no difference, if you want to union more than two DStreams, just use the one in StreamingContext, otherwise, both two APIs are fine. 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
Re: DStream Union vs. StreamingContext Union
Thanks Saisai. That makes sense. Just seems redundant to have both. ᐧ On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote: DStream.union can only union two DStream, one is itself. While StreamingContext.union can union an array of DStreams, internally DStream.union is a special case of StreamingContext.union: def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) So there's no difference, if you want to union more than two DStreams, just use the one in StreamingContext, otherwise, both two APIs are fine. 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
Re: DStream Union vs. StreamingContext Union
Thanks Evo. I tried chaining Dstream unions like what you have and it didn't work for me. But passing multiple arguments to StreamingContext.union worked fine. Any idea why? I am using Python, BTW. ᐧ On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com wrote: You can also union multiple DstreamRDDs in this way DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3) etc etc Ps: the API is not “redundant” it offers several ways for achivieving the same thing as a convenience depending on the situation *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:37 PM *To:* Saisai Shao *Cc:* user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Saisai. That makes sense. Just seems redundant to have both. ᐧ On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote: DStream.union can only union two DStream, one is itself. While StreamingContext.union can union an array of DStreams, internally DStream.union is a special case of StreamingContext.union: def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) So there's no difference, if you want to union more than two DStreams, just use the one in StreamingContext, otherwise, both two APIs are fine. 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
DStream Union vs. StreamingContext Union
Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from -- bit1...@163.com *From:* Saisai Shao sai.sai.s...@gmail.com *Date:* 2015-04-29 10:10 *To:* lokeshkumar lok...@dataken.net *CC:* spark users user@spark.apache.org *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Weird error/exception
I was having this issue when my batch interval was very big -- like 5 minutes. When my batch interval is smaller, I don't get this exception. Can someone explain to me why this might be happening? Vadim ᐧ On Tue, Apr 28, 2015 at 4:26 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming to monitor an S3 bucket. Everything appears to be fine. But every batch interval I get the following: *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Attempting to release HttpMethod in finalize() as its response data stream has gone out of scope. This attempt will not always succeed and cannot be relied upon! Please ensure response data streams are always fully consumed or closed to avoid HTTP connection starvation.* *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Successfully released HttpMethod in finalize(). You were lucky this time... Please ensure response data streams are always fully consumed or closed.* *Traceback (most recent call last):* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py, line 162, in manager* *code = worker(sock)* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py, line 60, in worker* *worker_main(infile, outfile)* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/worker.py, line 126, in main* *if read_int(infile) == SpecialLengths.END_OF_STREAM:* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/serializers.py, line 528, in read_int* *raise EOFError* *EOFError* Does anyone know the cause of this and how to fix it? Thanks, Vadim ᐧ
Weird error/exception
I am using Spark Streaming to monitor an S3 bucket. Everything appears to be fine. But every batch interval I get the following: *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Attempting to release HttpMethod in finalize() as its response data stream has gone out of scope. This attempt will not always succeed and cannot be relied upon! Please ensure response data streams are always fully consumed or closed to avoid HTTP connection starvation.* *15/04/28 16:12:36 WARN HttpMethodReleaseInputStream: Successfully released HttpMethod in finalize(). You were lucky this time... Please ensure response data streams are always fully consumed or closed.* *Traceback (most recent call last):* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py, line 162, in manager* *code = worker(sock)* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/daemon.py, line 60, in worker* *worker_main(infile, outfile)* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/worker.py, line 126, in main* *if read_int(infile) == SpecialLengths.END_OF_STREAM:* * File /Users/vb/spark-1.3.0-bin-hadoop2.4/python/pyspark/serializers.py, line 528, in read_int* *raise EOFError* *EOFError* Does anyone know the cause of this and how to fix it? Thanks, Vadim ᐧ
Re: Map Question
Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Thanks Ilya. I am having trouble doing that. Can you give me an example? ᐧ On Thu, Apr 23, 2015 at 12:06 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You need to expose that variable the same way you'd expose any other variable in Python that you wanted to see across modules. As long as you share a spark context all will work as expected. http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable Sent with Good (www.good.com) -Original Message- *From: *Vadim Bichutskiy [vadim.bichuts...@gmail.com] *Sent: *Thursday, April 23, 2015 12:00 PM Eastern Standard Time *To: *Tathagata Das *Cc: *user@spark.apache.org *Subject: *Re: Map Question Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted
Re: Map Question
Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
saveAsTextFile
I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ
Re: saveAsTextFile
Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
Copy should be doable but I'm not sure how to specify a prefix for the directory while keeping the filename (ie part-0) fixed in copy command. On Apr 16, 2015, at 1:51 PM, Sean Owen so...@cloudera.com wrote: Just copy the files? it shouldn't matter that much where they are as you can find them easily. Or consider somehow sending the batches of data straight into Redshift? no idea how that is done but I imagine it's doable. On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
Thanks Evo for your detailed explanation. On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote: The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ
Re: sbt-assembly spark-streaming-kinesis-asl error
Thanks guys. This might explain why I might be having problems. Vadim ᐧ On Tue, Apr 14, 2015 at 5:27 PM, Mike Trienis mike.trie...@orcsol.com wrote: Richard, You response was very helpful and actually resolved my issue. In case others run into a similar issue, I followed the procedure: - Upgraded to spark 1.3.0 - Add all spark related libraries are provided - Include spark transitive library dependencies where my build.sbt file libraryDependencies ++= { Seq( org.apache.spark %% spark-core % 1.3.0 % provided, org.apache.spark %% spark-streaming % 1.3.0 % provided, org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided, joda-time % joda-time % 2.2, org.joda % joda-convert % 1.2, com.amazonaws % aws-java-sdk % 1.8.3, com.amazonaws % amazon-kinesis-client % 1.2.0) and submitting a spark job can done via sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar Thanks again Richard! Cheers Mike. On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher rmarsc...@localytics.com wrote: Hi, I've gotten an application working with sbt-assembly and spark, thought I'd present an option. In my experience, trying to bundle any of the Spark libraries in your uber jar is going to be a major pain. There will be a lot of deduplication to work through and even if you resolve them it can be easy to do it incorrectly. I considered it an intractable problem. So the alternative is to not include those jars in your uber jar. For this to work you will need the same libraries on the classpath of your Spark cluster and your driver program (if you are running that as an application and not just using spark-submit). As for your NoClassDefFoundError, you either are missing Joda Time in your runtime classpath or have conflicting versions. It looks like something related to AWS wants to use it. Check your uber jar to see if its including the org/joda/time as well as the classpath of your spark cluster. For example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark 1.2 I found a conflict between httpclient versions that my uber jar pulled in for AWS libraries and the one bundled in the spark uber jar. I hand patched the spark uber jar to remove the offending httpclient bytecode to resolve the issue. You may be facing a similar situation. I hope that gives some ideas for resolving your issue. Regards, Rich On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi Vadim, After removing provided from org.apache.spark %% spark-streaming-kinesis-asl I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding provided back to spark-streaming-kinesis-asl and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml
Re: sbt-assembly spark-streaming-kinesis-asl error
I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 'filterDistinctLines' [warn] Merging 'rootdoc.txt' with strategy 'concat' [warn] Strategy 'concat' was applied to a file [warn] Strategy 'discard' was applied to 17 files [warn] Strategy 'filterDistinctLines' was applied to a file [warn] Strategy 'rename' was applied to 4 files When submitting the spark application through the command sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName target/scala-2.10/-snapshot.jar I end up the the following error, Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155) at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20) at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala) The snippet from my build.sbt file is: org.apache.spark %% spark-core % 1.2.0 % provided, org.apache.spark %% spark-streaming % 1.2.0 % provided, com.datastax.spark %% spark-cassandra-connector % 1.2.0-alpha1 % provided, org.apache.spark %% spark-streaming-kinesis-asl % 1.2.0 % provided, And the error is originating from: val kinesisClient = new AmazonKinesisClient(new
Re: sbt-assembly spark-streaming-kinesis-asl error
Thanks Mike. I was having trouble on EC2. On Apr 13, 2015, at 10:25 PM, Mike Trienis mike.trie...@orcsol.com wrote: Thanks Vadim, I can certainly consume data from a Kinesis stream when running locally. I'm currently in the processes of extending my work to a proper cluster (i.e. using a spark-submit job via uber jar). Feel free to add me to gmail chat and maybe we can help each other. On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 'filterDistinctLines' [warn] Merging 'rootdoc.txt' with strategy 'concat' [warn] Strategy 'concat' was applied to a file [warn] Strategy 'discard' was applied to 17 files [warn] Strategy 'filterDistinctLines' was applied to a file [warn] Strategy 'rename' was applied to 4 files When submitting the spark application through the command sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName target/scala-2.10/-snapshot.jar I end up the the following error, Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155) at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20
Spark Streaming and SQL
Hi all, I am using Spark Streaming to monitor an S3 bucket for objects that contain JSON. I want to import that JSON into Spark SQL DataFrame. Here's my current code: *from pyspark import SparkContext, SparkConf* *from pyspark.streaming import StreamingContext* *import json* *from pyspark.sql import SQLContext* *conf = SparkConf().setAppName('MyApp').setMaster('local[4]')* *sc = SparkContext(conf=conf)* *ssc = StreamingContext(sc, 30)* *sqlContext = SQLContext(sc)* *distFile = ssc.textFileStream(s3n://mybucket/)* *json_data = sqlContext.jsonRDD(distFile)* *json_data.printSchema()* *ssc.start()* *ssc.awaitTermination()* I am not creating DataFrame correctly as I get an error: *'TransformedDStream' object has no attribute '_jrdd'* Can someone help me out? Thanks, Vadim ᐧ
Empty RDD?
When I call *transform* or *foreachRDD *on* DStream*, I keep getting an error that I have an empty RDD, which make sense since my batch interval maybe smaller than the rate at which new data are coming in. How to guard against it? Thanks, Vadim ᐧ
Re: Spark Streaming and SQL
Hi all, I figured it out! The DataFrames and SQL example in Spark Streaming docs were useful. Best, Vadim ᐧ On Wed, Apr 8, 2015 at 2:38 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, I am using Spark Streaming to monitor an S3 bucket for objects that contain JSON. I want to import that JSON into Spark SQL DataFrame. Here's my current code: *from pyspark import SparkContext, SparkConf* *from pyspark.streaming import StreamingContext* *import json* *from pyspark.sql import SQLContext* *conf = SparkConf().setAppName('MyApp').setMaster('local[4]')* *sc = SparkContext(conf=conf)* *ssc = StreamingContext(sc, 30)* *sqlContext = SQLContext(sc)* *distFile = ssc.textFileStream(s3n://mybucket/)* *json_data = sqlContext.jsonRDD(distFile)* *json_data.printSchema()* *ssc.start()* *ssc.awaitTermination()* I am not creating DataFrame correctly as I get an error: *'TransformedDStream' object has no attribute '_jrdd'* Can someone help me out? Thanks, Vadim ᐧ
Re: Empty RDD?
Thanks TD! On Apr 8, 2015, at 9:36 PM, Tathagata Das t...@databricks.com wrote: Aah yes. The jsonRDD method needs to walk through the whole RDD to understand the schema, and does not work if there is not data in it. Making sure there is no data in it using take(1) should work. TD - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kinesis
Hey y'all, While I haven't been able to get Spark + Kinesis integration working, I pivoted to plan B: I now push data to S3 where I set up a DStream to monitor an S3 bucket with textFileStream, and that works great. I 3 Spark! Best, Vadim ᐧ On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, I am wondering, has anyone on this list been able to successfully implement Spark on top of Kinesis? Best, Vadim On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim --- Time: 142825409 ms --- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 142825409 ms.0 from job set of time 142825409 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 142825409 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 142825409 ms *** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(142825407 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */ if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming
Re: Spark + Kinesis
Hi all, I am wondering, has anyone on this list been able to successfully implement Spark on top of Kinesis? Best, Vadim ᐧ On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: ᐧ Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim --- Time: 142825409 ms --- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 142825409 ms.0 from job set of time 142825409 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 142825409 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 142825409 ms *** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(142825407 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */ if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming batch interval */val batchInterval = Milliseconds(2000)val sparkConfig = new SparkConf().setAppName(MyConsumer)val ssc = new StreamingContext(sparkConfig, batchInterval)/* Kinesis checkpoint interval. Same as batchInterval for this example. */val kinesisCheckpointInterval = batchInterval/* Create the same
Re: Spark + Kinesis
ᐧ Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim --- Time: 142825409 ms --- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 142825409 ms.0 from job set of time 142825409 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 142825409 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 142825409 ms *** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(142825407 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */ if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming batch interval */val batchInterval = Milliseconds(2000)val sparkConfig = new SparkConf().setAppName(MyConsumer)val ssc = new StreamingContext(sparkConfig, batchInterval)/* Kinesis checkpoint interval. Same as batchInterval for this example. */val kinesisCheckpointInterval = batchInterval/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST
Re: Spark + Kinesis
Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming batch interval */val batchInterval = Milliseconds(2000)val sparkConfig = new SparkConf().setAppName(MyConsumer)val ssc = new StreamingContext(sparkConfig, batchInterval)/* Kinesis checkpoint interval. Same as batchInterval for this example. */val kinesisCheckpointInterval = batchInterval/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)}/* Union all the streams */val unionStreams = ssc.union(kinesisStreams).map(byteArray = new String(byteArray))unionStreams.print()ssc.start() ssc.awaitTermination() }}* ᐧ On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies
Re: Spark + Kinesis
Remove provided and got the following error: [error] (*:assembly) deduplicate: different file contents found in the following: [error] /Users/vb/.ivy2/cache/com.esotericsoftware.kryo/kryo/bundles/kryo-2.21.jar:com/esotericsoftware/minlog/Log$Logger.class [error] /Users/vb/.ivy2/cache/com.esotericsoftware.minlog/minlog/jars/minlog-1.2.jar:com/esotericsoftware/minlog/Log$Logger.class ᐧ On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* * assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim
Re: Spark + Kinesis
Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* * assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim
Re: How to learn Spark ?
You can start with http://spark.apache.org/docs/1.3.0/index.html Also get the Learning Spark book http://amzn.to/1NDFI5x. It's great. Enjoy! Vadim ᐧ On Thu, Apr 2, 2015 at 4:19 AM, Star Guo st...@ceph.me wrote: Hi, all I am new to here. Could you give me some suggestion to learn Spark ? Thanks. Best Regards, Star Guo
Spark + Kinesis
Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0organization := com.myconsumerscalaVersion := 2.11.5libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided,org.apache.spark % spark-streaming_2.10 % 1.3.0org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* *assemblySettingsjarName in assembly := consumer-assembly.jarassemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim ᐧ
Re: How to learn Spark ?
Thanks Dean. This is great. ᐧ On Thu, Apr 2, 2015 at 9:01 AM, Dean Wampler deanwamp...@gmail.com wrote: I have a self-study workshop here: https://github.com/deanwampler/spark-workshop dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fshop.oreilly.com%2Fproduct%2F0636920033073.dosi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963 (O'Reilly) Typesafe http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Ftypesafe.com%2Fsi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963 @deanwampler http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Ftwitter.com%2Fdeanwamplersi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963 http://polyglotprogramming.com On Thu, Apr 2, 2015 at 8:33 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: You can start with http://spark.apache.org/docs/1.3.0/index.html Also get the Learning Spark book http://amzn.to/1NDFI5x. It's great. Enjoy! Vadim On Thu, Apr 2, 2015 at 4:19 AM, Star Guo st...@ceph.me wrote: Hi, all I am new to here. Could you give me some suggestion to learn Spark ? Thanks. Best Regards, Star Guo
Re: Spark + Kinesis
Thanks Jonathan. Helpful. VB On Apr 2, 2015, at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify build.sbt: --- import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0) assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) In project/assembly.sbt I have only the following line: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0) I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim ᐧ
Spark on EC2
Hi all, I just tried launching a Spark cluster on EC2 as described in http://spark.apache.org/docs/1.3.0/ec2-scripts.html I got the following response: *ResponseErrorsErrorCodePendingVerification/CodeMessageYour account is currently being verified. Verification normally takes less than 2 hours. Until your account is verified, you may not be able to launch additional instances or create additional volumes. If you are still receiving this message after more than 2 hours, please let us know by writing to aws-verificat...@amazon.com aws-verificat...@amazon.com. We appreciate your patience...* However I can see the EC2 instances in AWS console as running Any thoughts on what's going on? Thanks, Vadim ᐧ