leehuwuj opened a new issue, #27554:
URL: https://github.com/apache/airflow/issues/27554

   ### Apache Airflow version
   
   2.4.2
   
   ### What happened
   
   I tried to reduce output from dynamic mapped operator by a virtual python 
operator but got an error from it when parsing the kwargs input:
   ```
   Traceback (most recent call last):
     File "***operators/python.py", line 263, in execute
       return super().execute(context=serializable_context)
     File "***operators/python.py", line 164, in execute
       return_value = self.execute_callable()
     File "***operators/python.py", line 460, in execute_callable
       return self._execute_python_callable_in_subprocess(python_path, tmp_path)
     File "***operators/python.py", line 304, in 
_execute_python_callable_in_subprocess
       self._write_args(input_path)
     File "***operators/python.py", line 274, in _write_args
       file.write_bytes(self.pickling_library.dumps({'args': self.op_args, 
'kwargs': self.op_kwargs}))
     File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 263, in 
dumps
       dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
   ....
     File "/usr/local/lib/python3.9/pickle.py", line 560, in save
       f(self, obj)  # Call unbound method with explicit self
     File "/usr/local/lib/python3.9/pickle.py", line 886, in save_tuple
       save(element)
     File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 388, in 
save
       StockPickler.save(self, obj, save_persistent_id)
     File "/usr/local/lib/python3.9/pickle.py", line 578, in save
       rv = reduce(self.proto)
   TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' 
object
   ```
   I think the issue caused by SqlAlchemy session obj in the _LazyXComAccess 
could not be shared across the threads so i tried to temporarily patch the 
python operator at pickling args and kwargs step then it runs OK:
   
https://github.com/apache/airflow/blob/4dc9b1c592497686dada05e45147b1364ec338ea/airflow/operators/python.py#L364
   ```python
   #====Old====#
   def _write_args(self, file: Path):
           if self.op_args or self.op_kwargs:
               file.write_bytes(self.pickling_library.dumps({'args': 
self.op_args, 'kwargs': self.op_kwargs}))
   
   #===Fixable===#
   def _write_args(self, file: Path):
           lazy_xcom = {k: list(value)
                       for k, value in self.op_kwargs.items() if 
isinstance(value, _LazyXComAccess)}
           new_op_kwargs = self.op_kwargs | lazy_xcom
           if self.op_args or self.op_kwargs:
               file.write_bytes(self.pickling_library.dumps({'args': 
self.op_args, 'kwargs': new_op_kwargs}))
   ```
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Dag configs:
   ```python
   with DAG(
     dag_id="abc",
     default_args=default_args,
     schedule_interval='0 * * * *',
     start_date=datetime(2022,11,8),
     render_template_as_native_obj=True,
     catchup=False
   ) as dag:
   ...
   test_python_reduce = PythonVirtualenvOperator(
       task_id = "test_python_reduce",
       python_callable = handle,
       op_kwargs = {"output": "{{ ti.xcom_pull(task_ids='an_upstream_task') 
}}"},
       requirements = [],
       use_dill = True,
       system_site_packages = True,
       dag=dag
     )
   ```
   
   
   ### Operating System
   
   Debian
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to