Dominic Wetenkamp created SPARK-31795:
-----------------------------------------

             Summary: Stream Data with API to ServiceNow
                 Key: SPARK-31795
                 URL: https://issues.apache.org/jira/browse/SPARK-31795
             Project: Spark
          Issue Type: Question
          Components: DStreams
    Affects Versions: 2.4.5
            Reporter: Dominic Wetenkamp


1) Create class

2) Instantiate class 

3) Setup stream

4) Write stream. Here do I get a pickeling error, I really don't know how to 
get it work without error.

 

 

class CMDB:
 #Public Properties
 @property
 def streamDF(self):
 return spark.readStream.table(self.__source_table)
 
 #Constructor
 def __init__(self, destination_table, source_table):
 self.__destination_table = destination_table
 self.__source_table = source_table

#Private Methodes 
 def __processRow(self, row):
 #API connection info
 url = 'https://foo.service-now.com/api/now/table/' + self.__destination_table 
+ '?sysparm_display_value=true'
 user = 'username'
 password = 'psw'
 
 headers = \{"Content-Type":"application/json","Accept":"application/json"}
 response = requests.post(url, auth=(user, password), headers=headers, data = 
json.dumps(row.asDict()))
 
 return response

#Public Methodes
 def uploadStreamDF(self, df):
 return df.writeStream.foreach(self.__processRow).trigger(once=True).start()

 

################################################################################

 

cmdb = CMDB('destination_table_name','source_table_name')

streamDF = (cmdb.streamDF
 .withColumn('object_id',col('source_column_id'))
 .withColumn('name',col('source_column_name'))
 ).select('object_id','name')
#set stream works, able to display data

cmdb.uploadStreamDF(streamDF)
#cmdb.uploadStreamDF(streamDF) fails with error: PicklingError: Could not 
serialize object: Exception: It appears that you are attempting to reference 
SparkContext from a broadcast variable, action, or transformation. SparkContext 
can only be used on the driver, not in code that it run on workers. For more 
information, see SPARK-5063. See exception below:

'''
Exception Traceback (most recent call last)
/databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
 704 try:
--> 705 return cloudpickle.dumps(obj, 2)
 706 except pickle.PickleError:

/databricks/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
 862 cp = CloudPickler(file,protocol)
--> 863 cp.dump(obj)
 864 return file.getvalue()

/databricks/spark/python/pyspark/cloudpickle.py in dump(self, obj)
 259 try:
--> 260 return Pickler.dump(self, obj)
 261 except RuntimeError as e:

/databricks/python/lib/python3.7/pickle.py in dump(self, obj)
 436 self.framer.start_framing()
--> 437 self.save(obj)
 438 self.write(STOP)
'''



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to