Stormhand opened a new issue, #31080:
URL: https://github.com/apache/airflow/issues/31080

   ### Apache Airflow version
   
   2.6.0
   
   ### What happened
   
   I am using DatabricksSqlOperator which writes the result to a file. When the 
task finishes it writes all the data correctly to the file the throws the 
following exception:
   > [2023-05-05, 07:56:22 UTC] {taskinstance.py:1847} ERROR - Task failed with 
exception
   > Traceback (most recent call last):
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 73, in wrapper
   >     return func(*args, **kwargs)
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 2377, in xcom_push
   >     XCom.set(
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 73, in wrapper
   >     return func(*args, **kwargs)
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 
237, in set
   >     value = cls.serialize_value(
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 
632, in serialize_value
   >     return json.dumps(value, cls=XComEncoder).encode("UTF-8")
   >   File "/usr/local/lib/python3.9/json/__init__.py", line 234, in dumps
   >     return cls(
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/json.py", line 
102, in encode
   >     o = self.default(o)
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/json.py", line 
91, in default
   >     return serialize(o)
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py",
 line 144, in serialize
   >     return encode(classname, version, serialize(data, depth + 1))
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py",
 line 123, in serialize
   >     return [serialize(d, depth + 1) for d in o]
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py",
 line 123, in <listcomp>
   >     return [serialize(d, depth + 1) for d in o]
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py",
 line 123, in serialize
   >     return [serialize(d, depth + 1) for d in o]
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py",
 line 123, in <listcomp>
   >     return [serialize(d, depth + 1) for d in o]
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py",
 line 132, in serialize
   >     qn = qualname(o)
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/module_loading.py",
 line 47, in qualname
   >     return f"{o.__module__}.{o.__name__}"
   >   File 
"/home/airflow/.local/lib/python3.9/site-packages/databricks/sql/types.py", 
line 161, in __getattr__
   >     raise AttributeError(item)
   > AttributeError: __name__
   
   I found that **SQLExecuteQueryOperator** always return the result(so pushing 
XCom) from its execute() method except when the parameter **do_xcom_push** is 
set to **False**. But if do_xcom_push is False then the method 
_process_output() is not executed and DatabricksSqlOperator wont write the 
results to a file.
   
   ### What you think should happen instead
   
   I am not sure if the problem should be fixed in DatabricksSqlOperator or in 
SQLExecuteQueryOperator. In any case setting do_xcom_push shouldn't 
automatically prevent the exevution of _process_output():
   ```
           if not self.do_xcom_push:
               return None
           if return_single_query_results(self.sql, self.return_last, 
self.split_statements):
               # For simplicity, we pass always list as input to 
_process_output, regardless if
               # single query results are going to be returned, and we return 
the first element
               # of the list in this case from the (always) list returned by 
_process_output
               return self._process_output([output], hook.descriptions)[-1]
           return self._process_output(output, hook.descriptions)
   ```
   What happens now is - i have in the same time big result in a file AND in 
the XCom.
   
   ### How to reproduce
   
   I suspect that the actual Exception is related to writing the XCom to the 
meta database and it might not fail on other scenarios.
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye) docker image
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==8.0.0
   apache-airflow-providers-apache-spark==4.0.1
   apache-airflow-providers-celery==3.1.0
   apache-airflow-providers-cncf-kubernetes==6.1.0
   apache-airflow-providers-common-sql==1.4.0
   apache-airflow-providers-databricks==4.1.0
   apache-airflow-providers-docker==3.6.0
   apache-airflow-providers-elasticsearch==4.4.0
   apache-airflow-providers-ftp==3.3.1
   apache-airflow-providers-google==10.0.0
   apache-airflow-providers-grpc==3.1.0
   apache-airflow-providers-hashicorp==3.3.1
   apache-airflow-providers-http==4.3.0
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-microsoft-azure==6.0.0
   apache-airflow-providers-microsoft-mssql==3.3.2
   apache-airflow-providers-mysql==5.0.0
   apache-airflow-providers-odbc==3.2.1
   apache-airflow-providers-oracle==3.6.0
   apache-airflow-providers-postgres==5.4.0
   apache-airflow-providers-redis==3.1.0
   apache-airflow-providers-samba==4.1.0
   apache-airflow-providers-sendgrid==3.1.0
   apache-airflow-providers-sftp==4.2.4
   apache-airflow-providers-slack==7.2.0
   apache-airflow-providers-snowflake==4.0.5
   apache-airflow-providers-sqlite==3.3.2
   apache-airflow-providers-ssh==3.6.0
   apache-airflow-providers-telegram==4.0.0
   
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Using extended Airflow image, LocalExecutor, Postgres 13 meta db as 
container in the same stack.
   docker-compose version 1.29.2, build 5becea4c
   Docker version 23.0.5, build bc4487a
   
   
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to