Mathieu Cinquin created AIRFLOW-6015:
----------------------------------------
Summary: Celery needed even if LocalExecutor is configured
Key: AIRFLOW-6015
URL: https://issues.apache.org/jira/browse/AIRFLOW-6015
Project: Apache Airflow
Issue Type: Bug
Components: configuration
Affects Versions: 1.10.6
Environment: ubuntu 16.04
Python 3.5 with virtualenv
Reporter: Mathieu Cinquin
Hello,
Celery is required to start airflow worker even if the executor configured is
LocalExecutor.
If i install apache-airflow[celery], the worker daemon well starts and uses
Celery as executor.
log
{code:java}
Nov 19 19:47:27 airflow-1 systemd[1]: Started Airflow worker daemon.
Nov 19 19:47:28 airflow-1 bash[10663]: [2019-11-19 19:47:28,226]
{settings.py:252} INFO - settings.configure_orm(): Using pool settings.
pool_size=128, max_overflow=10, pool_recycle=600, pid=10668
Nov 19 19:47:29 airflow-1 bash[10663]: Traceback (most recent call last):
Nov 19 19:47:29 airflow-1 bash[10663]: File
"/home/airflow/airflow/venv/bin/airflow", line 37, in <module>
Nov 19 19:47:29 airflow-1 bash[10663]: args.func(args)
Nov 19 19:47:29 airflow-1 bash[10663]: File
"/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/utils/cli.py",
line 74, in wrapper
Nov 19 19:47:29 airflow-1 bash[10663]: return f(*args, **kwargs)
Nov 19 19:47:29 airflow-1 bash[10663]: File
"/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/bin/cli.py",
line 1075, in worker
Nov 19 19:47:29 airflow-1 bash[10663]: from
airflow.executors.celery_executor import app as celery_app
Nov 19 19:47:29 airflow-1 bash[10663]: File
"/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py",
line 27, in <module>
Nov 19 19:47:29 airflow-1 bash[10663]: from celery import Celery
Nov 19 19:47:29 airflow-1 bash[10663]: ImportError: No module named 'celery'
Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Main process
exited, code=exited, status=1/FAILURE
Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Unit entered
failed state.
Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Failed with
result 'exit-code'.
Nov 19 19:47:40 airflow-1 systemd[1]: airflow-worker.service: Service hold-off
time over, scheduling restart.
Nov 19 19:47:40 airflow-1 systemd[1]: Stopped Airflow worker daemon.
{code}
airflow.cfg
{code:java}
[core]
# The home folder for airflow, default is ~/airflow
# airflow_home = /home/airflow/airflow
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /home/airflow/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /var/log/airflow# Airflow can store logs remotely in AWS S3
or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder = None
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = mysql://airflow:xxx@localhost:3306/airflow
# The SqlAlchemy pool size is the maximum number of database connections
# in the pool.
sql_alchemy_pool_size = 128
# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite.
sql_alchemy_pool_recycle = 600
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 4
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 4
# Are DAGs paused by default at creation
dags_are_paused_at_creation = True
# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128
# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 4
# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment
load_examples = False
# Where your Airflow plugins are stored
plugins_folder = /home/airflow/airflow/plugins
# Secret key to save connection passwords in the db
fernet_key = xxx
# Whether to disable pickling dags
donot_pickle = False
# How long before timing out a python file import while filling the DagBag
dagbag_import_timeout = 30
# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner
# If set, tasks without a `run_as_user` argument will be run with this user
# Can be used to de-elevate a sudo user running Airflow when executing tasks
default_impersonation =
# What security module to use (for example kerberos):
security =
# Turn unit test mode on (overwrites many configuration options with test
# values at runtime)
unit_test_mode = False
[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
# webserver
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[api]
# How to authenticate users of the API
auth_backend = airflow.api.auth.backend.deny_all
[operators]
# The default owner assigned to each new operator, unless
# provided explicitly or passed via `default_args`
default_owner = Airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://airflow-1.example.com:8080
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080
# Paths to the SSL certificate and key for the web server. When both are
# provided SSL will be enabled. This does not change the web server port.
#web_server_ssl_cert = /etc/ssl/certs/example.com.crt
#web_server_ssl_key = /etc/ssl//etc/ssl/private/example.com.key
# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 600
web_server_master_timeout = 600
# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 0
# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30
# Secret key used to run your flask app
secret_key = xxx
# Number of workers to run the Gunicorn web server
workers = 4
# The worker class gunicorn should use. Choices include
# sync (default), eventlet, gevent
worker_class = sync
# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile = -
rbac = True
# Expose the configuration file in the web server
expose_config = False
# Set to true to turn on authentication:
# http://pythonhosted.org/airflow/security.html
#web-authentication
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
# Filter the list of dags by owner name (requires authentication to be enabled)
filter_by_owner = False
# Filtering mode. Choices include user (default) and ldapgroup.
# Ldap group filtering requires using the ldap backend
#
# Note that the ldap server needs the "memberOf" overlay to be set up
# in order to user the ldapgroup mode.
owner_mode = user
# Default DAG orientation. Valid values are:
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR
# Puts the webserver in demonstration mode; blurs the names of Operators for
# privacy.
demo_mode = False
# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5
# By default, the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False
[email]
email_backend = airflow.utils.email.send_email_smtp[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = airflow
smtp_password = xxx
smtp_port = 587
smtp_mail_from = [email protected]
[schedduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5
# after how much time should the scheduler terminate in seconds
# -1 indicates to run continuously (see also num_runs)
run_duration = -1
# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0
dag_dir_list_interval = 300
# How often should stats be printed to the logs
print_stats_interval = 30
child_process_log_directory = /var/log/airflow/scheduler
# Local task jobs periodically heartbeat to the DB. If the job has
# not heartbeat in this many seconds, the scheduler will mark the
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300
# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is False,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. However airflow will never
# use more threads than the amount of cpu cores available.
max_threads = 2
authenticate = False
[admin]
# UI to hide sensitive variable fields when set to True
hide_sensitive_variable_fields = True
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)