It is actually all Executors doing this (at least Local, Celery). And yes 
(although you description is a bit cryptic) I think you are right. 

What you call “shelling out” does not really cover what happens on a process 
level though. We execute “bash -c” with “shell=True” which probably makes the 
issue worse. Basically what happens is “<run_user_shell> -> “bash -c” -> 
“python (airflow)”. That’s three processes and then twice. 

We can just execute “python” just fine. Because it will run in a separate 
interpreter no issues will come from sys.modules as that is not inherited. Will 
still parse DAGs in a separate process then. Forking (@ash) probably does not 
work as that does share sys.modules. 

Same goes for jobs running through sudo and with cgroups. No shell is required 
at all. 

The worker of the executor we can relatively easily extend to take over what 
LocalTaskJob does. If necessary we can keep it a bit dumber and either report 
back by API or MQ instead of DB.

The way we handle SIGTERM is pretty messy anyways and not really standard (we 
need to kill all descendent processes most of them are our own, e.g. airflow 
core). It also can be handled within the executor/worker. A cleanup will 
probably increase reliability. 

I’m writing AIP-2 
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-2+Simplify+process+launching
 to work this out.

B.

Verstuurd vanaf mijn iPad

> Op 4 aug. 2018 om 19:40 heeft Ash Berlin-Taylor 
> <ash_airflowl...@firemirror.com> het volgende geschreven:
> 
> Comments inline.
> 
>> On 4 Aug 2018, at 18:28, Maxime Beauchemin <maximebeauche...@gmail.com> 
>> wrote:
>> 
>> Let me confirm I'm understanding this right, we're talking specifically
>> about the CeleryExecutor not starting and `airflow run` (not --raw)
>> command, and fire up a LocalTaskJob instead? Then we'd still have the
>> worker fire up the `airflow run --raw` command?
>> 
>> Seems reasonable. One thing to keep in mind is the fact that shelling out
>> guarantees no `sys.module` caching, which is a real issue for slowly
>> changing DAG definitions. That's the reason why we'd have to reboot the
>> scheduler periodically before it used sub-processes to evaluate DAGs. Any
>> code that needs to evaluate a DAG should probably be done in a subprocess.
> 
>> 
>> Shelling out also allows for doing things like unix impersonation and
>> applying CGROUPS. This currently happens between `airflow run` and `airflow
>> run --raw`. The parent process also does heartbeat and listen for external
>> kill signal (kill pills).
>> 
>> I think what we want is smarter executors and only one level of bash
>> command: the `airflow run --raw`, and ideally the system that fires this up
>> is not Airflow itself, and cannot be DAG-aware (or it will need to get
>> restarted to flush the cache).
> 
> Rather than shelling out to `airflow run` could we instead fork and run the 
> CLI code directly? This involves parsing the config twice, loading all of the 
> airflow and SQLAlchemy deps twice etc. This I think would account for a 
> not-insignificant speed difference for the unit tests. In the case of 
> impersonation we'd probably have no option but to exec `airflow`, but most(?) 
> people don't use that?
> 
> Avoiding the extra parsing pentalty and process when we don't need it might 
> be worth it for test speed up alone. And we've already got impersonation 
> covered in the tests so we'll know that it still works.
> 
>> 
>> To me that really brings up the whole question of what should be handled by
>> the Executor, and what belongs in core Airflow. The Executor needs to do
>> more, and Airflow core less.
> 
> I agree with the sentiment that Core should do less and Executors more -- 
> many parts of the core are reimplementing what Celery itself could do.
> 
> 
>> 
>> When you think about how this should all work on Kubernetes, it looks
>> something like this:
>> * the scheduler, through KubeExecutor, calls the k8s API, tells it to fire
>> up and Airflow task
>> * container boots up and starts an `airflow run --raw` command
>> * k8s handles heartbeats, monitors tasks, knows how to kill a running task
>> * the scheduler process (call it supervisor), talks with k8s through
>> KubeExecutor
>> and handles zombie cleanup and sending kill pills
>> 
>> Now because Celery doesn't offer as many guarantees it gets a bit more
>> tricky. Is there even a way to send a kill pill through Celery? Are there
>> other ways than using a parent process to accomplish this?
> 
> It does 
> http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks
>  (at least it does now)
> 
>> 
>> At a higher level, it seems like we need to move more logic from core
>> Airflow into the executors. For instance, the heartbeat construct should
>> probably be 100% handled by the executor, and not an assumption in the core
>> code base.
>> 
>> I think I drifted a bit, hopefully that's still helpful.
>> 
>> Max

Reply via email to