MialLewis opened a new issue, #40056:
URL: https://github.com/apache/airflow/issues/40056

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.9.0
   
   ### What happened?
   
   When using an airflow connection to output a pandas dataframe to a table, I 
receive the following error:
   ```
   Error:
   [2024-06-04, 23:28:12 UTC] {taskinstance.py:2890} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 465, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 432, in _execute_callable
       return execute_callable(context=context, **execute_callable_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 400, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", 
line 265, in execute
       return_value = super().execute(context)
                      ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 400, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py",
 line 235, in execute
       return_value = self.execute_callable()
                      ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py",
 line 252, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/dags/populations/target_population.py", line 25, in 
target_population_task
       output_df_to_target_tbl(output_df, client_profile.target_table + 
"_py_tp_sql", client_profile.conn_id)
     File "/opt/airflow/dags/populations/target_population.py", line 30, in 
output_df_to_target_tbl
       output_df.to_sql(name=target_table, con=engine, schema='examples', 
if_exists='replace', chunksize=5000, index_label='id')
     File 
"/home/airflow/.local/lib/python3.11/site-packages/pandas/util/_decorators.py", 
line 333, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/pandas/core/generic.py", 
line 3008, in to_sql
       return sql.to_sql(
              ^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", 
line 787, in to_sql
       with pandasSQL_builder(con, schema=schema, need_transaction=True) as 
pandas_sql:
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", 
line 851, in pandasSQL_builder
       return SQLDatabase(con, schema, need_transaction)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", 
line 1576, in __init__
       con = self.exit_stack.enter_context(con.connect())
                                           ^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 3325, in connect
       return self._connection_cls(self, close_with_result=close_with_result)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 96, in __init__
       else engine.raw_connection()
            ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 3404, in raw_connection
       return self._wrap_pool_connect(self.pool.connect, _connection)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 3371, in _wrap_pool_connect
       return fn()
              ^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py", 
line 327, in connect
       return _ConnectionFairy._checkout(self)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py", 
line 894, in _checkout
       fairy = _ConnectionRecord.checkout(pool)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py", 
line 493, in checkout
       rec = pool._do_get()
             ^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", 
line 145, in _do_get
       with util.safe_reraise():
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", 
line 143, in _do_get
       return self._create_connection()
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py", 
line 273, in _create_connection
       return _ConnectionRecord(self)
              ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py", 
line 388, in __init__
       self.__connect()
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py", 
line 690, in __connect
       with util.safe_reraise():
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py", 
line 686, in __connect
       self.dbapi_connection = connection = pool._invoke_creator(self)
                                            ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/create.py",
 line 574, in connect
       return dialect.connect(*cargs, **cparams)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
 line 598, in connect
       return self.dbapi.connect(*cargs, **cparams)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/__init__.py", line 
121, in Connect
       return Connection(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py", 
line 195, in __init__
       super().__init__(*args, **kwargs2)
   TypeError: '__extra__' is an invalid keyword argument for connect()
   ```
   
   I can get around this issue by recreating the database url and engine, 
removing the `__extra__` parameter:
   ```
   def output_df_to_target_tbl(output_df, target_table, conn_id):
       db_hook = hook(mysql_conn_id=conn_id)
       engine = fix_engine_if_invalid_params(db_hook.get_sqlalchemy_engine())
   
       output_df.to_sql(name=target_table, con=engine, schema='examples', 
if_exists='replace', chunksize=5000, index_label='id')
   
   def fix_engine_if_invalid_params(engine):
       invalid_param = '__extra__'
       query_items = engine.url.query.items()
       if invalid_param in [k for (k, v) in query_items]:
           from sqlalchemy.engine.url import URL
           from sqlalchemy.engine import create_engine
           import logging
   
           modified_query_items = {k: v for k, v in query_items if k != 
invalid_param}
           modified_url = URL.create(
               drivername=engine.url.drivername,
               username=engine.url.username,
               password=engine.url.password,
               host=engine.url.host,
               port=engine.url.port,
               database=engine.url.database,
               query=modified_query_items
           )
           logging.info(f'Note: {invalid_param} removed from {query_items} in 
engine url')
           engine = create_engine(modified_url)
       return engine
   ```
   
   Looking at previous versions of airflow, the default value in the `extras` 
field appears to have gone from `None` to `{}`. Could this now be causing the 
`__extra__` parameter to be supplied when the field has been left empty?
   
   ### What you think should happen instead?
   
   No error
   
   ### How to reproduce
   
   The code used is:
   ```
   db_hook = hook(mysql_conn_id=conn_id)
   engine = db_hook.get_sqlalchemy_engine()
   output_df.to_sql(name=target_table, con=engine, schema='examples', 
if_exists='replace')
   ```
   
   ### Operating System
   
   Ubuntu 22.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to