MialLewis opened a new issue, #40056:
URL: https://github.com/apache/airflow/issues/40056
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.9.0
### What happened?
When using an airflow connection to output a pandas dataframe to a table, I
receive the following error:
```
Error:
[2024-06-04, 23:28:12 UTC] {taskinstance.py:2890} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py",
line 265, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py",
line 235, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py",
line 252, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/dags/populations/target_population.py", line 25, in
target_population_task
output_df_to_target_tbl(output_df, client_profile.target_table +
"_py_tp_sql", client_profile.conn_id)
File "/opt/airflow/dags/populations/target_population.py", line 30, in
output_df_to_target_tbl
output_df.to_sql(name=target_table, con=engine, schema='examples',
if_exists='replace', chunksize=5000, index_label='id')
File
"/home/airflow/.local/lib/python3.11/site-packages/pandas/util/_decorators.py",
line 333, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/pandas/core/generic.py",
line 3008, in to_sql
return sql.to_sql(
^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py",
line 787, in to_sql
with pandasSQL_builder(con, schema=schema, need_transaction=True) as
pandas_sql:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py",
line 851, in pandasSQL_builder
return SQLDatabase(con, schema, need_transaction)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py",
line 1576, in __init__
con = self.exit_stack.enter_context(con.connect())
^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 3325, in connect
return self._connection_cls(self, close_with_result=close_with_result)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 96, in __init__
else engine.raw_connection()
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 3404, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 3371, in _wrap_pool_connect
return fn()
^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 327, in connect
return _ConnectionFairy._checkout(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 894, in _checkout
fairy = _ConnectionRecord.checkout(pool)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 493, in checkout
rec = pool._do_get()
^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py",
line 145, in _do_get
with util.safe_reraise():
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py",
line 143, in _do_get
return self._create_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 273, in _create_connection
return _ConnectionRecord(self)
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 388, in __init__
self.__connect()
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 690, in __connect
with util.safe_reraise():
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 686, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/create.py",
line 574, in connect
return dialect.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
line 598, in connect
return self.dbapi.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/__init__.py", line
121, in Connect
return Connection(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py",
line 195, in __init__
super().__init__(*args, **kwargs2)
TypeError: '__extra__' is an invalid keyword argument for connect()
```
I can get around this issue by recreating the database url and engine,
removing the `__extra__` parameter:
```
def output_df_to_target_tbl(output_df, target_table, conn_id):
db_hook = hook(mysql_conn_id=conn_id)
engine = fix_engine_if_invalid_params(db_hook.get_sqlalchemy_engine())
output_df.to_sql(name=target_table, con=engine, schema='examples',
if_exists='replace', chunksize=5000, index_label='id')
def fix_engine_if_invalid_params(engine):
invalid_param = '__extra__'
query_items = engine.url.query.items()
if invalid_param in [k for (k, v) in query_items]:
from sqlalchemy.engine.url import URL
from sqlalchemy.engine import create_engine
import logging
modified_query_items = {k: v for k, v in query_items if k !=
invalid_param}
modified_url = URL.create(
drivername=engine.url.drivername,
username=engine.url.username,
password=engine.url.password,
host=engine.url.host,
port=engine.url.port,
database=engine.url.database,
query=modified_query_items
)
logging.info(f'Note: {invalid_param} removed from {query_items} in
engine url')
engine = create_engine(modified_url)
return engine
```
Looking at previous versions of airflow, the default value in the `extras`
field appears to have gone from `None` to `{}`. Could this now be causing the
`__extra__` parameter to be supplied when the field has been left empty?
### What you think should happen instead?
No error
### How to reproduce
The code used is:
```
db_hook = hook(mysql_conn_id=conn_id)
engine = db_hook.get_sqlalchemy_engine()
output_df.to_sql(name=target_table, con=engine, schema='examples',
if_exists='replace')
```
### Operating System
Ubuntu 22.04
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]