leehuwuj opened a new issue, #27554:
URL: https://github.com/apache/airflow/issues/27554
### Apache Airflow version
2.4.2
### What happened
I tried to reduce output from dynamic mapped operator by a virtual python
operator but got an error from it when parsing the kwargs input:
```
Traceback (most recent call last):
File "***operators/python.py", line 263, in execute
return super().execute(context=serializable_context)
File "***operators/python.py", line 164, in execute
return_value = self.execute_callable()
File "***operators/python.py", line 460, in execute_callable
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
File "***operators/python.py", line 304, in
_execute_python_callable_in_subprocess
self._write_args(input_path)
File "***operators/python.py", line 274, in _write_args
file.write_bytes(self.pickling_library.dumps({'args': self.op_args,
'kwargs': self.op_kwargs}))
File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 263, in
dumps
dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
....
File "/usr/local/lib/python3.9/pickle.py", line 560, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.9/pickle.py", line 886, in save_tuple
save(element)
File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 388, in
save
StockPickler.save(self, obj, save_persistent_id)
File "/usr/local/lib/python3.9/pickle.py", line 578, in save
rv = reduce(self.proto)
TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor'
object
```
I think the issue caused by SqlAlchemy session obj in the _LazyXComAccess
could not be shared across the threads so i tried to temporarily patch the
python operator at pickling args and kwargs step then it runs OK:
https://github.com/apache/airflow/blob/4dc9b1c592497686dada05e45147b1364ec338ea/airflow/operators/python.py#L364
```python
#====Old====#
def _write_args(self, file: Path):
if self.op_args or self.op_kwargs:
file.write_bytes(self.pickling_library.dumps({'args':
self.op_args, 'kwargs': self.op_kwargs}))
#===Fixable===#
def _write_args(self, file: Path):
lazy_xcom = {k: list(value)
for k, value in self.op_kwargs.items() if
isinstance(value, _LazyXComAccess)}
new_op_kwargs = self.op_kwargs | lazy_xcom
if self.op_args or self.op_kwargs:
file.write_bytes(self.pickling_library.dumps({'args':
self.op_args, 'kwargs': new_op_kwargs}))
```
### What you think should happen instead
_No response_
### How to reproduce
Dag configs:
```python
with DAG(
dag_id="abc",
default_args=default_args,
schedule_interval='0 * * * *',
start_date=datetime(2022,11,8),
render_template_as_native_obj=True,
catchup=False
) as dag:
...
test_python_reduce = PythonVirtualenvOperator(
task_id = "test_python_reduce",
python_callable = handle,
op_kwargs = {"output": "{{ ti.xcom_pull(task_ids='an_upstream_task')
}}"},
requirements = [],
use_dill = True,
system_site_packages = True,
dag=dag
)
```
### Operating System
Debian
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other 3rd-party Helm chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]