[ 
https://issues.apache.org/jira/browse/SPARK-31795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117480#comment-17117480
 ] 

Hyukjin Kwon commented on SPARK-31795:
--------------------------------------

Please take a look for https://spark.apache.org/community.html

> Stream Data with API to ServiceNow
> ----------------------------------
>
>                 Key: SPARK-31795
>                 URL: https://issues.apache.org/jira/browse/SPARK-31795
>             Project: Spark
>          Issue Type: Question
>          Components: DStreams
>    Affects Versions: 2.4.5
>            Reporter: Dominic Wetenkamp
>            Priority: Major
>
> 1) Create class
> 2) Instantiate class 
> 3) Setup stream
> 4) Write stream. Here do I get a pickeling error, I really don't know how to 
> get it work without error.
>  
>  
> class CMDB:
>  #Public Properties
>  @property
>  def streamDF(self):
>  return spark.readStream.table(self.__source_table)
>  
>  #Constructor
>  def __init__(self, destination_table, source_table):
>  self.__destination_table = destination_table
>  self.__source_table = source_table
> #Private Methodes 
>  def __processRow(self, row):
>  #API connection info
>  url = 'https://foo.service-now.com/api/now/table/' + 
> self.__destination_table + '?sysparm_display_value=true'
>  user = 'username'
>  password = 'psw'
>  
>  headers = \{"Content-Type":"application/json","Accept":"application/json"}
>  response = requests.post(url, auth=(user, password), headers=headers, data = 
> json.dumps(row.asDict()))
>  
>  return response
> #Public Methodes
>  def uploadStreamDF(self, df):
>  return df.writeStream.foreach(self.__processRow).trigger(once=True).start()
>  
> ################################################################################
>  
> cmdb = CMDB('destination_table_name','source_table_name')
> streamDF = (cmdb.streamDF
>  .withColumn('object_id',col('source_column_id'))
>  .withColumn('name',col('source_column_name'))
>  ).select('object_id','name')
> #set stream works, able to display data
> cmdb.uploadStreamDF(streamDF)
> #cmdb.uploadStreamDF(streamDF) fails with error: PicklingError: Could not 
> serialize object: Exception: It appears that you are attempting to reference 
> SparkContext from a broadcast variable, action, or transformation. 
> SparkContext can only be used on the driver, not in code that it run on 
> workers. For more information, see SPARK-5063. See exception below:
> '''
> Exception Traceback (most recent call last)
> /databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
>  704 try:
> --> 705 return cloudpickle.dumps(obj, 2)
>  706 except pickle.PickleError:
> /databricks/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
>  862 cp = CloudPickler(file,protocol)
> --> 863 cp.dump(obj)
>  864 return file.getvalue()
> /databricks/spark/python/pyspark/cloudpickle.py in dump(self, obj)
>  259 try:
> --> 260 return Pickler.dump(self, obj)
>  261 except RuntimeError as e:
> /databricks/python/lib/python3.7/pickle.py in dump(self, obj)
>  436 self.framer.start_framing()
> --> 437 self.save(obj)
>  438 self.write(STOP)
> '''



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to