RE: Map Question
You need to expose that variable the same way you'd expose any other variable in Python that you wanted to see across modules. As long as you share a spark context all will work as expected. http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable Sent with Good (www.good.com) -Original Message- From: Vadim Bichutskiy [vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 23, 2015 12:00 PM Eastern Standard Time To: Tathagata Das Cc: user@spark.apache.org Subject: Re: Map Question Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=d750a2b5-528a-47e7-8d0c-df37c6ff3370]ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get NameError: global name 'broadcastVar' is not defined The myfunc function is in a different module. How do I make it aware of broadcastVar? [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=cccea2c4-02b9-45f0-9e40-d25891e0ded5]ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=82843831-9ce6-4e1b-9fe8-72b9b7180fc4]ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=641ba5c3-4ac7-4614-84a9-45aafd24502f]ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=5aa8db9d-d2c8-49b1-821f-621a3d2aaf87]ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD
Re: Map Question
Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Thanks Ilya. I am having trouble doing that. Can you give me an example? ᐧ On Thu, Apr 23, 2015 at 12:06 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You need to expose that variable the same way you'd expose any other variable in Python that you wanted to see across modules. As long as you share a spark context all will work as expected. http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable Sent with Good (www.good.com) -Original Message- *From: *Vadim Bichutskiy [vadim.bichuts...@gmail.com] *Sent: *Thursday, April 23, 2015 12:00 PM Eastern Standard Time *To: *Tathagata Das *Cc: *user@spark.apache.org *Subject: *Re: Map Question Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted
Re: Map Question
Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim ᐧ
Re: Map Question
Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim