t oo created AIRFLOW-5354:
-----------------------------

             Summary: Airflow - constant CPU usage of 25% with nothing running 
and scheduling loop running too frequently
                 Key: AIRFLOW-5354
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5354
             Project: Apache Airflow
          Issue Type: Improvement
          Components: scheduler
    Affects Versions: 1.10.4
            Reporter: t oo


I've put logging level to debug, cpu utilisation is constant 25% but no dags 
are running (they have none schedule since i only externally trigger). Why is 
the scheduling loop running every 2 seconds? can I make it every 30 seconds?

 

here is some of the scheduler log

 

[2019-08-30 09:28:57,538] \{scheduler_job.py:1363} DEBUG - Starting Loop...
[2019-08-30 09:28:57,539] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
parsing results
[2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:28:57,540] \{scheduler_job.py:1376} DEBUG - Harvested 0 
SimpleDAGs
[2019-08-30 09:28:57,540] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
executor
[2019-08-30 09:28:57,540] \{base_executor.py:124} DEBUG - 0 running task 
instances
[2019-08-30 09:28:57,540] \{base_executor.py:125} DEBUG - 0 in queue
[2019-08-30 09:28:57,540] \{base_executor.py:126} DEBUG - 3 open slots
[2019-08-30 09:28:57,540] \{base_executor.py:135} DEBUG - Calling the <class 
'airflow.executors.local_executor.LocalExecutor'> sync method
[2019-08-30 09:28:57,541] \{scheduler_job.py:1432} DEBUG - Ran scheduling loop 
in 0.00 seconds
[2019-08-30 09:28:57,541] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
seconds
[2019-08-30 09:28:58,543] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 
seconds to prevent excessive logging
[2019-08-30 09:28:59,541] \{scheduler_job.py:1363} DEBUG - Starting Loop...
[2019-08-30 09:28:59,541] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
parsing results
[2019-08-30 09:28:59,541] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:28:59,542] \{scheduler_job.py:1376} DEBUG - Harvested 0 
SimpleDAGs
[2019-08-30 09:28:59,542] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
executor
[2019-08-30 09:28:59,542] \{base_executor.py:124} DEBUG - 0 running task 
instances
[2019-08-30 09:28:59,543] \{base_executor.py:125} DEBUG - 0 in queue
[2019-08-30 09:28:59,543] \{base_executor.py:126} DEBUG - 3 open slots
[2019-08-30 09:28:59,543] \{base_executor.py:135} DEBUG - Calling the <class 
'airflow.executors.local_executor.LocalExecutor'> sync method
[2019-08-30 09:28:59,544] \{scheduler_job.py:1432} DEBUG - Ran scheduling loop 
in 0.00 seconds
[2019-08-30 09:28:59,544] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
seconds
[2019-08-30 09:29:00,545] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 
seconds to prevent excessive logging
[2019-08-30 09:29:01,544] \{scheduler_job.py:1363} DEBUG - Starting Loop...
[2019-08-30 09:29:01,545] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
parsing results
[2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:29:01,546] \{scheduler_job.py:1376} DEBUG - Harvested 0 
SimpleDAGs
[2019-08-30 09:29:01,546] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
executor
[2019-08-30 09:29:01,546] \{base_executor.py:124} DEBUG - 0 running task 
instances
[2019-08-30 09:29:01,546] \{base_executor.py:125} DEBUG - 0 in queue
[2019-08-30 09:29:01,547] \{base_executor.py:126} DEBUG - 3 open slots
[2019-08-30 09:29:01,547] \{base_executor.py:135} DEBUG - Calling the <class 
'airflow.executors.local_executor.LocalExecutor'> sync method
[2019-08-30 09:29:01,548] \{scheduler_job.py:1432} DEBUG - Ran scheduling loop 
in 0.00 seconds
[2019-08-30 09:29:01,548] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
seconds
[2019-08-30 09:29:02,549] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 
seconds to prevent excessive logging
[2019-08-30 09:29:03,548] \{scheduler_job.py:1363} DEBUG - Starting Loop...
[2019-08-30 09:29:03,548] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
parsing results
[2019-08-30 09:29:03,548] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message of 
type DagParsingStat
[2019-08-30 09:29:03,549] \{scheduler_job.py:1376} DEBUG - Harvested 0 
SimpleDAGs
[2019-08-30 09:29:03,550] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
executor
[2019-08-30 09:29:03,550] \{base_executor.py:124} DEBUG - 0 running task 
instances
[2019-08-30 09:29:03,550] \{base_executor.py:125} DEBUG - 0 in queue
[2019-08-30 09:29:03,550] \{base_executor.py:126} DEBUG - 3 open slots
[2019-08-30 09:29:03,550] \{base_executor.py:135} DEBUG - Calling the <class 
'airflow.executors.local_executor.LocalExecutor'> sync method
[2019-08-30 09:29:03,551] \{scheduler_job.py:1423} DEBUG - Heartbeating the 
scheduler
[2019-08-30 09:29:03,564] \{base_job.py:197} DEBUG - [heartbeat]
[2019-08-30 09:29:03,565] \{scheduler_job.py:1432} DEBUG - Ran scheduling loop 
in 0.02 seconds
[2019-08-30 09:29:03,565] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
seconds
[2019-08-30 09:29:04,567] \{scheduler_job.py:1447} DEBUG - Sleeping for 0.98 
seconds to prevent excessive logging

 

 

ps -ef | grep -i airf
ec2-user 5648 1 0 09:05 pts/1 00:00:05 /home/ec2-user/venv/bin/python2.7 
/home/ec2-user/venv/bin/airflow scheduler
ec2-user 5727 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 
/home/ec2-user/venv/bin/airflow scheduler
ec2-user 5734 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 
/home/ec2-user/venv/bin/airflow scheduler
ec2-user 5738 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 
/home/ec2-user/venv/bin/airflow scheduler
ec2-user 5739 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 
/home/ec2-user/venv/bin/airflow scheduler
ec2-user 5746 5648 0 09:05 pts/1 00:00:02 airflow scheduler -- 
DagFileProcessorManager
ec2-user 6161 1 0 09:05 pts/1 00:00:04 /home/ec2-user/venv/bin/python2.7 
/home/ec2-user/venv/bin/airflow webserver -p 8080 --access_logfile 
/home/ec2-user/airflow/logs/airflow-webserver-access.log
ec2-user 6243 6161 0 09:05 pts/1 00:00:01 gunicorn: master [airflow-webserver] 
ec2-user 6303 6243 0 09:05 pts/1 00:00:06 [ready] gunicorn: worker 
[airflow-webserver] 
ec2-user 6304 6243 0 09:05 pts/1 00:00:06 [ready] gunicorn: worker 
[airflow-webserver] 
ec2-user 12539 6243 0 09:16 pts/1 00:00:06 [ready] gunicorn: worker 
[airflow-webserver] 
ec2-user 18988 6243 0 09:26 pts/1 00:00:04 [ready] gunicorn: worker 
[airflow-webserver]

 

airflow.cfg

[core]
dags_folder = /home/ec2-user/airflow/dags
base_log_folder = /home/ec2-user/airflow/logs
remote_logging = False
remote_log_conn_id =
remote_base_log_folder =
encrypt_s3_logs = False
logging_level = DEBUG
fab_logging_level = DEBUG
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] 
\{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} 
%%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = 
airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] \{%%(filename)s:%%(lineno)d} %%(levelname)s - 
%%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
log_filename_template = \{{ ti.dag_id }}/\{{ ti.task_id }}/\{{ ts }}/\{{ 
try_number }}.log
log_processor_filename_template = \{{ filename }}.log
dag_processor_manager_log_location = 
/home/ec2-user/airflow/logs/dag_processor_manager/dag_processor_manager.log
hostname_callable = socket:getfqdn
default_timezone = utc
executor = LocalExecutor
sql_alchemy_conn_cmd = /home/ec2-user/airflow/scripts/get_db_conn_str.sh
sql_engine_encoding = utf-8
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_reconnect_timeout = 300
sql_alchemy_schema =
parallelism = 3
dag_concurrency = 3
dags_are_paused_at_creation = False
max_active_runs_per_dag = 350
load_examples = False
plugins_folder = /home/ec2-user/airflow/plugins
fernet_key = REDACT
donot_pickle = False
dagbag_import_timeout = 300
task_runner = StandardTaskRunner
default_impersonation =
security =
secure_mode = True
unit_test_mode = False
task_log_reader = task
enable_xcom_pickling = True
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = False
worker_precheck = False
dag_discovery_safe_mode = False
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[api]
auth_backend = airflow.api.auth.backend.deny_all
[lineage]
backend =
[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =
[operators]
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
[hive]
default_hive_mapred_queue =
[webserver]
base_url = https://REDACT:8080
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert = /home/ec2-user/certs/REDACT
web_server_ssl_key = /home/ec2-user/certs/REDACT
web_server_master_timeout = 120
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 600
secret_key = temporary_key
workers = 4
worker_class = gevent
access_logfile = -
error_logfile = -
expose_config = False
authenticate = True
auth_backend = airflow.contrib.auth.backends.ldap_auth
filter_by_owner = False
owner_mode = user
dag_default_view = tree
dag_orientation = LR
demo_mode = False
log_fetch_timeout_sec = 5
hide_paused_dags_by_default = False
page_size = 100
rbac = False
navbar_color = #007A87
default_dag_run_display_number = 25
enable_proxy_fix = False
cookie_secure = False
cookie_samesite =
default_wrap = False
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_port = 25
smtp_mail_from = [email protected]
[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16
worker_log_server_port = 8793
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
flower_host = 0.0.0.0
flower_url_prefix =
flower_port = 5555
flower_basic_auth =
default_queue = default
sync_parallelism = 0
celery_config_options = 
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
[celery_broker_transport_options]
[dask]
cluster_address = 127.0.0.1:8786
tls_ca =
tls_cert =
tls_key =
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 25
run_duration = -1
min_file_process_interval = 90
dag_dir_list_interval = 300
print_stats_interval = 30
scheduler_health_check_threshold = 30
child_process_log_directory = /home/ec2-user/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
catchup_by_default = True
max_tis_per_query = 2000
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
max_threads = 1
authenticate = False
use_job_schedule = False
[ldap]
REDACT ldap stuff
search_scope = SUBTREE
ignore_malformed_schema = False
[mesos]
master = localhost:5050
framework_name = Airflow
task_cpu = 1
task_memory = 256
checkpoint = False
authenticate = False
[kerberos]
ccache = /tmp/airflow_krb5_ccache
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
[github_enterprise]
api_rev = v3
[admin]
hide_sensitive_variable_fields = True
[elasticsearch]
host =
log_id_template = \{dag_id}-\{task_id}-\{execution_date}-\{try_number}
end_of_log_mark = end_of_log
frontend =
write_stdout = False
json_format = False
json_fields = asctime, filename, lineno, levelname, message
[kubernetes]
worker_container_repository =
worker_container_tag =
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = True
worker_pods_creation_batch_size = 1
namespace = default
airflow_configmap =
dags_in_image = False
dags_volume_subpath =
dags_volume_claim =
logs_volume_subpath =
logs_volume_claim =
dags_volume_host =
logs_volume_host =
env_from_configmap_ref =
env_from_secret_ref =
git_repo =
git_branch =
git_subpath =
git_user =
git_password =
git_sync_root = /git
git_sync_dest = repo
git_dags_folder_mount_point =
git_ssh_key_secret_name =
git_ssh_known_hosts_configmap_name =
git_sync_container_repository = k8s.gcr.io/git-sync
git_sync_container_tag = v3.1.1
git_sync_init_container_name = git-sync-clone
worker_service_account_name =
image_pull_secrets =
gcp_service_account_keys =
in_cluster = True
affinity =
tolerations =
kube_client_request_args =
run_as_user =
fs_group =
[kubernetes_node_selectors]
[kubernetes_annotations]
[kubernetes_environment_variables]
[kubernetes_secrets]
[kubernetes_labels]

 

airflow 1.10.4, localexecutor, python2, t3.large EC2



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to