KanikaAdik opened a new issue, #39448:
URL: https://github.com/apache/airflow/issues/39448

   ### Apache Airflow Provider(s)
   
   databricks
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-databricks==6.3.0
   
   ### Apache Airflow version
   
   2.7.3
   
   ### Operating System
   
   (airflow)cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" 
NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" 
VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/"; 
SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/"; (airflow)
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   I am trying to execute a few queries using DatabricksSQLOperator in an 
airflow dag. Seems like the output received from the Operator cannot be handled 
by Xcom. 
   Hence receiving error log -
   [2024-05-06T20:17:44.253+0000] {xcom.py:661} ERROR - Object of type tuple is 
not JSON serializable. If you are using pickle instead of JSON for XCom, then 
you need to enable pickle support for XCom in your *** config or make sure to 
decorate your object with attr.
   [2024-05-06T20:17:44.254+0000] {taskinstance.py:1937} ERROR - Task failed 
with exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 
91, in default
       return serialize(o)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 145, in serialize
       return encode(classname, version, serialize(data, depth + 1))
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 124, in serialize
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 124, in <listcomp>
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 124, in serialize
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 124, in <listcomp>
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 178, in serialize
       raise TypeError(f"cannot serialize object of type {cls}")
   TypeError: cannot serialize object of type <class 
'***.providers.databricks.hooks.databricks_sql.Row'>
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 2479, in xcom_push
       XCom.set(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 
244, in set
       value = cls.serialize_value(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 
659, in serialize_value
       return json.dumps(value, cls=XComEncoder).encode("UTF-8")
     File "/usr/local/lib/python3.8/json/__init__.py", line 234, in dumps
       return cls(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 
102, in encode
       o = self.default(o)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 
93, in default
       return super().default(o)
     File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type tuple is not JSON serializable
   [2024-05-06T20:17:44.259+0000] {taskinstance.py:1400} INFO - Marking task as 
FAILED. dag_id=ct_sql_xcom, task_id=select_data, 
execution_date=20240506T170918, start_date=20240506T201741, 
end_date=20240506T201744
   [2024-05-06T20:17:44.266+0000] {standard_task_runner.py:104} ERROR - Failed 
to execute job 238 for task select_data (Object of type tuple is not JSON 
serializable; 1850)
   
   ### What you think should happen instead
   
   Xcom should be able to handle the JSON serialization or have set a standard 
with DatabricksSQL providers on acceptable response type to handle any generic 
case.
   
   ### How to reproduce
   
   1. create a DatabricksSQLOperator airflow dag
   2. set the  do_xcom_push=True
   3. set a separate task to parse and use values sql query result
   
   `from datetime import datetime
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.providers.databricks.operators.databricks_sql import 
DatabricksSqlOperator
   from airflow.models import Variable
   
   
   env = Variable.get('AIRFLOW_VAR_ENV_NAME')
   
   with DAG('ct_sql_xcom',
     start_date = datetime(2024, 1, 30),
     schedule_interval = None
   ) as dag:
   
       create_file = DatabricksSqlOperator(
           databricks_conn_id='databricks',
           sql_endpoint_name='Serverless',
           task_id="create_and_populate_from_file",
           sql="select table_name from system.information_schema.tables where 
table_catalog = 'rsg_prod'",
           do_xcom_push=True,
       )
   
       def downstream_task(**kwargs):
           result = kwargs['task_instance'].xcom_pull(task_ids='create_file')
           print("Received result from XCom:", result)
   
       
   create_file >> downstream_task
   `
   
   
   ### Anything else
   
   `[2024-05-06T19:13:57.614+0000] {xcom.py:661} ERROR - Object of type tuple 
is not JSON serializable. If you are using pickle instead of JSON for XCom, 
then you need to enable pickle support for XCom in your *** config or make sure 
to decorate your object with attr.
   [2024-05-06T19:13:57.615+0000] {taskinstance.py:1937} ERROR - Task failed 
with exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 
91, in default
       return serialize(o)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 145, in serialize
       return encode(classname, version, serialize(data, depth + 1))
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 124, in serialize
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 124, in <listcomp>
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 124, in serialize
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 124, in <listcomp>
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py",
 line 178, in serialize
       raise TypeError(f"cannot serialize object of type {cls}")
   TypeError: cannot serialize object of type <class 
'***.providers.databricks.hooks.databricks_sql.Row'>
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 2479, in xcom_push
       XCom.set(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 
244, in set
       value = cls.serialize_value(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 
659, in serialize_value
       return json.dumps(value, cls=XComEncoder).encode("UTF-8")
     File "/usr/local/lib/python3.8/json/__init__.py", line 234, in dumps
       return cls(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 
102, in encode
       o = self.default(o)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 
93, in default
       return super().default(o)
     File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type tuple is not JSON serializable
   [2024-05-06T19:13:57.620+0000] {taskinstance.py:1400} INFO - Marking task as 
UP_FOR_RETRY. dag_id=ct_sql_xcom, task_id=create_file , 
execution_date=20240506T170918, start_date=20240506T191355, 
end_date=20240506T191357
   [2024-05-06T19:13:57.627+0000] {standard_task_runner.py:104} ERROR - Failed 
to execute job 233 for task create_file (Object of type tuple is not JSON 
serializable; 842)`
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to