KanikaAdik opened a new issue, #39448: URL: https://github.com/apache/airflow/issues/39448
### Apache Airflow Provider(s) databricks ### Versions of Apache Airflow Providers apache-airflow-providers-databricks==6.3.0 ### Apache Airflow version 2.7.3 ### Operating System (airflow)cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" (airflow) ### Deployment Other Docker-based deployment ### Deployment details _No response_ ### What happened I am trying to execute a few queries using DatabricksSQLOperator in an airflow dag. Seems like the output received from the Operator cannot be handled by Xcom. Hence receiving error log - [2024-05-06T20:17:44.253+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr. [2024-05-06T20:17:44.254+0000] {taskinstance.py:1937} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default return serialize(o) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize return encode(classname, version, serialize(data, depth + 1)) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in <listcomp> return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in <listcomp> return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize raise TypeError(f"cannot serialize object of type {cls}") TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push XCom.set( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set value = cls.serialize_value( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value return json.dumps(value, cls=XComEncoder).encode("UTF-8") File "/usr/local/lib/python3.8/json/__init__.py", line 234, in dumps return cls( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode o = self.default(o) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default return super().default(o) File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.__class__.__name__} ' TypeError: Object of type tuple is not JSON serializable [2024-05-06T20:17:44.259+0000] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=ct_sql_xcom, task_id=select_data, execution_date=20240506T170918, start_date=20240506T201741, end_date=20240506T201744 [2024-05-06T20:17:44.266+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 238 for task select_data (Object of type tuple is not JSON serializable; 1850) ### What you think should happen instead Xcom should be able to handle the JSON serialization or have set a standard with DatabricksSQL providers on acceptable response type to handle any generic case. ### How to reproduce 1. create a DatabricksSQLOperator airflow dag 2. set the do_xcom_push=True 3. set a separate task to parse and use values sql query result `from datetime import datetime from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator from airflow.models import Variable env = Variable.get('AIRFLOW_VAR_ENV_NAME') with DAG('ct_sql_xcom', start_date = datetime(2024, 1, 30), schedule_interval = None ) as dag: create_file = DatabricksSqlOperator( databricks_conn_id='databricks', sql_endpoint_name='Serverless', task_id="create_and_populate_from_file", sql="select table_name from system.information_schema.tables where table_catalog = 'rsg_prod'", do_xcom_push=True, ) def downstream_task(**kwargs): result = kwargs['task_instance'].xcom_pull(task_ids='create_file') print("Received result from XCom:", result) create_file >> downstream_task ` ### Anything else `[2024-05-06T19:13:57.614+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr. [2024-05-06T19:13:57.615+0000] {taskinstance.py:1937} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default return serialize(o) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize return encode(classname, version, serialize(data, depth + 1)) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in <listcomp> return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in <listcomp> return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize raise TypeError(f"cannot serialize object of type {cls}") TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push XCom.set( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set value = cls.serialize_value( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value return json.dumps(value, cls=XComEncoder).encode("UTF-8") File "/usr/local/lib/python3.8/json/__init__.py", line 234, in dumps return cls( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode o = self.default(o) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default return super().default(o) File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.__class__.__name__} ' TypeError: Object of type tuple is not JSON serializable [2024-05-06T19:13:57.620+0000] {taskinstance.py:1400} INFO - Marking task as UP_FOR_RETRY. dag_id=ct_sql_xcom, task_id=create_file , execution_date=20240506T170918, start_date=20240506T191355, end_date=20240506T191357 [2024-05-06T19:13:57.627+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 233 for task create_file (Object of type tuple is not JSON serializable; 842)` ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
