Hey Jason, Let me try to answer them for you. I hope I get everything 100% right, because I'm also pretty new to airflow. Hopefully someone on the list corrects me if it's horribly wrong.
On Wed, Nov 2, 2016 at 9:24 PM, Jason Chen <[email protected]> wrote: > Hi Airflow team, > > We are using Airflow with LocalExecutor and it works great. > We are moving toward to use CeleryExecutor and have couple of questions. > I searched the posts and cannot find some answers. > > We have 3 airflow worker nodes and uses Redis as broker. > > (1) How airflow worker determines the next task should be dispatched to ? > Is it based on the average load of the nodes ? How long the task runs > previously ? > This is more of a celery question than an airflow one. Probably you should make sure to tune "parallelism" in your airflow.cfg to match the number of cores in your worker. Beyond that, there's the pooling ability where you can define a max # of tasks per that pool. If you access shared resources through your tasks, there's a way to limit how many tasks do that at the same time. In either case with localexecutor and celeryexecutor, you're basically running tasks off a task queue. Airflow will limit the issued (TBD) tasks to how things are configured in pools. The "non-pooled" tasks (those without a pool) are default configured to 128 now (I'm looking at master). So without a specific pool setting, airflow will only send max. 128 tasks to the queue for execution. >From the code, LocalExecutor then respects the PARALLELISM setting and only starts that many workers. For Celery, it doesn't look at that, because it doesn't control how many workers are started at the other end. This means there's no real throttling on how many tasks are sent through the task queue. That's ok, because in either case the queue is just a place where workers pick off another task they do and then communicate back the result. It is useful to have it set approximately correct, because the base executor frequently responds with how many slots are open by looking at the queue and what's configured. This means it's not actively monitored through airflow, it's mostly a celery thing and it mostly depends on a proper configuration for your environment and the nature of what you're doing. Maybe someone else can chime in here to help you with tools to monitor and configure that environment better. (2) In my settings of 3 nodes, I have webserver, scheduler and worker (w/ > Celery) running on each node. Is it a recommended setting (i.e., each > service runs on each node) ? > You'd only run one webserver and one scheduler. They use the executor to distribute the tasks. The reason why it's sometimes confusing is that airflow isn't subpackaged into separate daemons... you'd take the thing as a whole and just run different things on different boxes or environments with that. It's all started through the same CLI, but depending on the arguments it just runs a different internal part of the software modules. What you need to do in all cases is to make sure that each node that runs airflow has the same software on it (in general), or, if you use queues to separate specific pieces of work, make sure the right dependencies are installed on that box. The scheduler/webserver typically have all the dependencies, because these still need to parse the DAGs and verify them regularly. > (3) When a task runs and fails, I have a on_failure_callback. Will the > on_failure_callback runs on the same node as the task ? Or, the executor is > based on task level (seems so) ? > A worker is run through the CLI , it picks up the DAG, parses it and then starts running the Task Instance. In that sense a DAG isn't a piece of software by itself, but more of a description of a workflow. The scheduler/webserver look at the design of that workflow and compose a graph for you to see what it does. The worker picks up the DAG, extracts the specific task instance it should run, instantiates that and runs it, then it returns. > (4) There is setting for the queue (default_queue = default) in > airflow.cfg. Say, I want to use different queues to control which worker should use > which queue. I think just change default_queue = other_queue. Right ? > Should I do any additional Celery related setting ? > The default queue is where task instances get routed to in celery if the queue setting for an operator is not modified. (there's always A queue in celery and this one is called 'default'). what you can do is set up an operator (task instance) of a dag to use a different queue. For example, if you do data sciency stuff you may need a machine that has much more memory. You could then route tasks that do that sciency stuff to a specific worker and use more generic stuff of the same DAG to your more generic workers. Just go through the models in the API ref: https://airflow.incubator.apache.org/code.html#models , look for 'queue'. And see the CLI reference for 'worker' command and how you can add the -q parameter : https://pythonhosted.org/airflow/cli.html As such, you can tie task instances to go to specific workers. > (5) If all workers are done and schedulers are still running, will the > tasks be queued ? > One scheduler is running. The tasks are always queued at one end. The max size of the queue (assuming zero workers) is dependent on the configuration of your task instance queues. So if you have 3 pools with 10,10,10 workers, that's 30 + 128 (default) for non-pooled tasks is 158 max. queue size. Only when workers start to become active will the queue at some time decrease. When tasks are picked up and eventually set to completed (success/fail), the scheduler will notice that something happened and see that some slots in pools have become available and it will queue more tasks. If the server fails or restarts, the task instances that have already been queued are in a QUEUED state. There's a failover mechanism with timeouts that eventually clears these instances and sets them back to SCHEDULED. (6) If we turned of the tasks, while there are tasks in the queue > previously, will it kill the un-needed tasks in the queue ? > > When you clear or kill tasks, their database status is set to SHUTDOWN (or maybe something else). When a task is sent to a worker, just when it really starts executing, it does a final check on its status. If that's not in a runnable state, the worker will return immediately and do nothing. This effectively means that the task instance execution is "voided". > Thanks. > Jason > Pleasure! Hope you enjoy working with airflow!
