Stormhand opened a new issue, #31753:
URL: https://github.com/apache/airflow/issues/31753

   ### Apache Airflow version
   
   2.6.1
   
   ### What happened
   
   When i use _do_xcom_push=True_ in **DatabricksSqlOperator** the an exception 
with following stack trace is thrown:
   
   ```
   [2023-06-06, 08:52:24 UTC] {sql.py:375} INFO - Running statement: SELECT 
cast(max(id) as STRING) FROM prod.unified.sessions, parameters: None
   [2023-06-06, 08:52:25 UTC] {taskinstance.py:1824} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 73, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 2354, in xcom_push
       XCom.set(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 73, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", 
line 237, in set
       value = cls.serialize_value(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", 
line 632, in serialize_value
       return json.dumps(value, cls=XComEncoder).encode("UTF-8")
     File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
       **kw).encode(obj)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 
102, in encode
       o = self.default(o)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 
91, in default
       return serialize(o)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 144, in serialize
       return encode(classname, version, serialize(data, depth + 1))
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 123, in serialize
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 123, in <listcomp>
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 132, in serialize
       qn = qualname(o)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/module_loading.py",
 line 47, in qualname
       return f"{o.__module__}.{o.__name__}"
     File 
"/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/types.py", 
line 161, in __getattr__
       raise AttributeError(item)
   AttributeError: __name__. Did you mean: '__ne__'?
   
   
   ### What you think should happen instead
   
   In _process_output() if self._output_path is False a list of tuples is 
returned:
   ```
    def _process_output(self, results: list[Any], descriptions: 
list[Sequence[Sequence] | None]) -> list[Any]:
       if not self._output_path:
           return list(zip(descriptions, results))
   ```
   I suspect this breaks the serialization somehow which might be related to my 
own meta database(postgres).
   
   Replacing the Databricks SQL Operator with simple **PythonOperator** and 
**DatabricksSqlHook** works just fine:
   ```
   def get_max_id(ti):
       hook = DatabricksSqlHook(databricks_conn_id=databricks_sql_conn_id, 
sql_endpoint_name='sql_endpoint')
       sql = "SELECT cast(max(id) as STRING) FROM prod.unified.sessions"
       return str(hook.get_first(sql)[0])
   ```
   
   ### How to reproduce
   
   ```
       get_max_id_task = DatabricksSqlOperator(
           databricks_conn_id=databricks_sql_conn_id,
           sql_endpoint_name='sql_endpoint',
           task_id='get_max_id',
           sql="SELECT cast(max(id) as STRING) FROM prod.unified.sessions",
           do_xcom_push=True
       )
   ```
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye) docker image, python 3.10
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-common-sql==1.5.1
   databricks-sql-connector==2.5.2
   apache-airflow-providers-databricks==4.2.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Using extended Airflow image, LocalExecutor, Postgres 13 meta db as 
container in the same stack.
   docker-compose version 1.29.2, build 5becea4c
   Docker version 23.0.5, build bc4487a
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to