potiuk commented on issue #8908: URL: https://github.com/apache/airflow/issues/8908#issuecomment-630704926
The problem is that it is transient - sometimes it works, sometimes it does not. I also think it's an issue in the Airflow Code - likely connected to the optimizations recently made by@mik-laj or @ashb . * I looked at the history of requirements.txt and celery version did not change since 20th of March celery==4.4.2 * The error started to appear only few days ago. I believe it is likely caused by commit fc862a3edd010e65b9b3fe586855fe81807ee4e8 Author: Kamil BreguĊa <[email protected]> Date: Thu May 14 06:01:13 2020 +0200 Do not create a separate process for one task in CeleryExecutor (#8855) or maybe this: commit fe4219112a63d6debf84bcfd644e1681854db85c Author: Ash Berlin-Taylor <[email protected]> Date: Thu May 14 16:49:12 2020 +0100 Don't use ProcessorAgent to test ProcessorManager (#8871) Some of our tests (when I was looking at another change) were using the ProcessorAgent to run and test the behaviour of our ProcessorManager in certain cases. Having that extra process in the middle is not critical for the tests, and makes it harder to debug the problem when if something breaks. To make this possible I have made a small refactor to the loop of DagFileProcessorManager (to give us a method we can call in tests that doesn't do `os.setsid`). or this: commit 82de6f74aee199d916d113a65a40821ec8ca0c43 Author: Ash Berlin-Taylor <[email protected]> Date: Fri May 15 22:17:55 2020 +0100 Spend less time waiting for DagFileProcessor processes to complete (#8814) In debugging another test I noticed that the scheduler was spending a long time waiting for a "simple" dag to be parsed. But upon closer inspection the parsing process itself was done in a few milliseconds, but we just weren't harvesting the results in a timely fashion. This change uses the `sentinel` attribute of multiprocessing.Connection (added in Python 3.3) to be able to wait for all the processes, so that as soon as one has finished we get woken up and can immediately harvest and pass on the parsed dags. This makes test_scheduler_job.py about twice as quick, and also reduces the time the scheduler spends between tasks . In real work loads, or where there are lots of dags this likely won't equate to much such a huge speed up, but for our (synthetic) elastic performance test dag. These were the timings for the dag to run all the tasks in a single dag run to completion., with PERF_SCHEDULE_INTERVAL='1d' PERF_DAGS_COUNT=1 I also have PERF_SHAPE=linear PERF_TASKS_COUNT=12: **Before**: 45.4166s **After**: 16.9499s PERF_SHAPE=linear PERF_TASKS_COUNT=24: **Before**: 82.6426s **After**: 34.0672s PERF_SHAPE=binary_tree PERF_TASKS_COUNT=24: **Before**: 20.3802s **After**: 9.1400s PERF_SHAPE=grid PERF_TASKS_COUNT=24: **Before**: 27.4735s **After**: 11.5607s If you have many more dag **files**, this likely won't be your bottleneck. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
