aru-trackunit opened a new issue, #31499:
URL: https://github.com/apache/airflow/issues/31499

   ### Apache Airflow version
   
   2.6.1
   
   ### What happened
   
   After upgrading from Airflow 2.5.3 to 2.6.1 - the dag started to fail and 
it's related to XCom serialization.
   
   I noticed that something has changed in regards to serializing XCom:```
   key | Value Version 2.5.3 | Value Version 2.6.1 | result
   -- | -- | -- | --
   return_value | [[['Result', 'string', None, None, None, None, None]], []] | 
(['(num_affected_rows,bigint,None,None,None,None,None)', 
'(num_inserted_rows,bigint,None,None,None,None,None)'],[]) | ✅ 
   return_value | [[['Result', 'string', None, None, None, None, None]], []] | 
(['(Result,string,None,None,None,None,None)'],[]) | ✅ 
   return_value | [[['num_affected_rows', 'bigint', None, None, None, None, 
None], ['num_updated_rows', 'bigint', None, None, None, None, None], 
['num_deleted_rows', 'bigint', None, None, None, None, None], 
['num_inserted_rows', 'bigint', None, None, None, None, None]], [[1442, 605, 0, 
837]]] | `AttributeError: __name__. Did you mean: '__ne__'?` | ❌ 
   
   Query syntax that procuded an error: MERGE INTO 
https://docs.databricks.com/sql/language-manual/delta-merge-into.html
   
   Stacktrace included below:
   
   ```
   [2023-05-24, 01:12:43 UTC] {taskinstance.py:1824} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 73, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 2354, in xcom_push
       XCom.set(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 73, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", 
line 237, in set
       value = cls.serialize_value(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", 
line 632, in serialize_value
       return json.dumps(value, cls=XComEncoder).encode("UTF-8")
     File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
       **kw).encode(obj)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 
102, in encode
       o = self.default(o)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 
91, in default
       return serialize(o)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 144, in serialize
       return encode(classname, version, serialize(data, depth + 1))
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 123, in serialize
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 123, in <listcomp>
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 123, in serialize
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 123, in <listcomp>
       return [serialize(d, depth + 1) for d in o]
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py",
 line 132, in serialize
       qn = qualname(o)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/module_loading.py",
 line 47, in qualname
       return f"{o.__module__}.{o.__name__}"
     File 
"/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/types.py", 
line 161, in __getattr__
       raise AttributeError(item)
   AttributeError: __name__. Did you mean: '__ne__'?
   ```
   
   ### What you think should happen instead
   
   Serialization should finish without an exception raised.
   
   ### How to reproduce
   
   1. Dag file with declared operator:
   ```
       task = DatabricksSqlOperator(
           task_id="task",
           databricks_conn_id="databricks_conn_id",
           sql_endpoint_name="name",
           sql="file.sql"
       )
   ```
   
   file.sql
   ```
   MERGE INTO table_name
   ON condition
   WHEN MATCHED THEN UPDATE
   WHEN NOT MATCHED THEN INSERT
   ```
   https://docs.databricks.com/sql/language-manual/delta-merge-into.html
   Query output is a table
   
   num_affected_rows | num_updated_rows | num_deleted_rows | num_inserted_rows
   -- | -- | -- | --
   0 | 0 | 0 | 0
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==8.0.0
   apache-airflow-providers-cncf-kubernetes==6.1.0
   apache-airflow-providers-common-sql==1.4.0
   apache-airflow-providers-databricks==4.1.0
   apache-airflow-providers-ftp==3.3.1
   apache-airflow-providers-google==10.0.0
   apache-airflow-providers-hashicorp==3.3.1
   apache-airflow-providers-http==4.3.0
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-mysql==5.0.0
   apache-airflow-providers-postgres==5.4.0
   apache-airflow-providers-sftp==4.2.4
   apache-airflow-providers-sqlite==3.3.2
   apache-airflow-providers-ssh==3.6.0
   ```
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to