mik-laj opened a new pull request #16248:
URL: https://github.com/apache/airflow/pull/16248


   We don't use this object so we don't need to send it. It tests it in my E2E 
environment: https://github.com/mik-laj/airlfow-testing-example/pull/2
   
   Sending TI by multiprocessing sometimes fails because TaskInstance is not 
always properly serialized
   ```
   /home/airflow/.local/lib/python3.6/site-packages/airflow/models/dag.py:1720: 
in run
       job.run()
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py:237: 
in run
       self._execute()
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py:70: 
in wrapper
       return func(*args, session=session, **kwargs)
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py:806:
 in _execute
       session=session,
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py:67: 
in wrapper
       return func(*args, **kwargs)
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py:727:
 in _execute_for_run_dates
       session=session,
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py:67: 
in wrapper
       return func(*args, **kwargs)
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py:602:
 in _process_backfill_task_instances
       executor.heartbeat()
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/base_executor.py:158:
 in heartbeat
       self.trigger_tasks(open_slots)
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py:263:
 in trigger_tasks
       self._process_tasks(task_tuples_to_send)
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py:272:
 in _process_tasks
       key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
   
/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py:333:
 in _send_tasks_to_celery
       send_task_to_executor, task_tuples_to_send, chunksize=chunksize
   /usr/local/lib/python3.6/multiprocessing/pool.py:266: in map
       return self._map_async(func, iterable, mapstar, chunksize).get()
   /usr/local/lib/python3.6/multiprocessing/pool.py:644: in get
       raise self._value
   /usr/local/lib/python3.6/multiprocessing/pool.py:424: in _handle_tasks
       put(task)
   /usr/local/lib/python3.6/multiprocessing/connection.py:206: in send
       self._send_bytes(_ForkingPickler.dumps(obj))
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   cls = <class 'multiprocessing.reduction.ForkingPickler'>
   obj = (0, 0, <function mapstar at 0x7f3d2c0c97b8>, ((<function 
send_task_to_executor at 0x7f3d0b8faf28>, ((TaskInstanceKey(d...sk: 
airflow.executors.celery_executor.execute_command of 
airflow.executors.celery_executor at 0x7f3d0b68dcf8>))),), {})
   protocol = None
   
       @classmethod
       def dumps(cls, obj, protocol=None):
           buf = io.BytesIO()
   >       cls(buf, protocol).dump(obj)
   E       TypeError: cannot serialize 'EncodedFile' object
   
   /usr/local/lib/python3.6/multiprocessing/reduction.py:51: TypeError
   
   ```
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to