Hey Jason,

Let me try to answer them for you. I hope I get everything 100% right,
because I'm also pretty new to airflow.
Hopefully someone on the list corrects me if it's horribly wrong.

On Wed, Nov 2, 2016 at 9:24 PM, Jason Chen <[email protected]>
wrote:

> Hi Airflow team,
>
> We are using Airflow with LocalExecutor and it works great.
> We are moving toward to use CeleryExecutor and have couple of questions.
> I searched the posts and cannot find some answers.
>
> We have 3 airflow worker nodes and uses Redis as broker.
>
> (1) How airflow worker determines the next task should be dispatched to ?
> Is it based on the average load of the nodes ? How long the task runs
> previously ?
>

This is more of a celery question than an airflow one. Probably you should
make sure to tune
"parallelism" in your airflow.cfg to match the number of cores in your
worker.

Beyond that, there's the pooling ability where you can define a max # of
tasks per that pool. If you access
shared resources through your tasks, there's a way to limit how many tasks
do that at the same time.

In either case with localexecutor and celeryexecutor, you're basically
running tasks off a task queue.
Airflow will limit the issued (TBD) tasks to how things are configured in
pools. The "non-pooled" tasks (those without a pool)
are default configured to 128 now (I'm looking at master). So without a
specific pool setting, airflow will only send max. 128
tasks to the queue for execution.

>From the code, LocalExecutor then respects the PARALLELISM setting and only
starts that many workers.
For Celery, it doesn't look at that, because it doesn't control how many
workers are started at the other end.
This means there's no real throttling on how many tasks are sent through
the task queue.

That's ok, because in either case the queue is just a place where workers
pick off another task they do and then
communicate back the result. It is useful to have it set approximately
correct, because the base executor frequently responds
with how many slots are open by looking at the queue and what's configured.

This means it's not actively monitored through airflow, it's mostly a
celery thing and it mostly depends on a proper
configuration for your environment and the nature of what you're doing.
Maybe someone else can chime in here to
help you with tools to monitor and configure that environment better.

(2) In my settings of 3 nodes, I have webserver, scheduler and worker (w/
> Celery) running on each node. Is it a recommended setting (i.e., each
> service runs on each node) ?
>

You'd only run one webserver and one scheduler. They use the executor to
distribute the tasks.

The reason why it's sometimes confusing is that airflow isn't subpackaged
into separate daemons... you'd take the thing
as a whole and just run different things on different boxes or environments
with that.

It's all started through the same CLI, but depending on the arguments it
just runs a different internal part of the software modules.
What you need to do in all cases is to make sure that each node that runs
airflow has the same software on it (in general), or,
if you use queues to separate specific pieces of work, make sure the right
dependencies are installed on that box.

The scheduler/webserver typically have all the dependencies, because these
still need to parse the DAGs and verify them
regularly.



> (3) When a task runs and fails, I have a on_failure_callback. Will the
> on_failure_callback runs on the same node as the task ? Or, the executor is
> based on task level (seems so) ?
>

A worker is run through the CLI , it picks up the DAG, parses it and then
starts running the Task Instance.

In that sense a DAG isn't a piece of software by itself, but more of a
description of a workflow.
The scheduler/webserver look at the design of that workflow and compose a
graph for you to see what it does.
The worker picks up the DAG, extracts the specific task instance it should
run, instantiates that and runs it,
then it returns.


> (4) There is setting for the queue (default_queue = default) in
> airflow.cfg.

Say, I want to use different queues to control which worker should use
> which queue. I think just change default_queue = other_queue. Right ?
> Should I do any additional Celery related setting ?
>

The default queue is where task instances get routed to in celery if the
queue setting for an operator is not modified.
(there's always A queue in celery and this one is called 'default').

what you can do is set up an operator (task instance) of a dag to use a
different queue. For example, if you do data sciency
stuff you may need a machine that has much more memory. You could then
route tasks that do that sciency stuff to a specific
worker and use more generic stuff of the same DAG to your more generic
workers.

Just go through the models in the API ref:
https://airflow.incubator.apache.org/code.html#models  , look for 'queue'.

And see the CLI reference for 'worker'  command and how you can add the -q
parameter :

https://pythonhosted.org/airflow/cli.html

As such, you can tie task instances to go to specific workers.


> (5) If all workers are done and schedulers are still running, will the
> tasks be queued ?
>

One scheduler is running. The tasks are always queued at one end. The max
size of the queue (assuming zero workers) is dependent
on the configuration of your task instance queues. So if you have 3 pools
with 10,10,10 workers, that's 30 + 128 (default) for non-pooled tasks
is 158 max. queue size. Only when workers start to become active will the
queue at some time decrease.

When tasks are picked up and eventually set to completed (success/fail),
the scheduler will notice that something happened and see that
some slots in pools have become available and it will queue more tasks.

If the server fails or restarts, the task instances that have already been
queued are in a QUEUED state. There's a failover mechanism with
timeouts that eventually clears these instances and sets them back to
SCHEDULED.

(6) If we turned of the tasks, while there are tasks in the queue
> previously,  will it kill the un-needed tasks in the queue ?
>
>
When you clear or kill tasks, their database status is set to SHUTDOWN (or
maybe something else). When a task is sent to a worker,
just when it really starts executing, it does a final check on its status.
If that's not in a runnable state, the worker will return immediately
and do nothing. This effectively means that the task instance execution is
"voided".


> Thanks.
> Jason
>

Pleasure!

Hope you enjoy working with airflow!

Reply via email to